diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java index 5f8f48b9420a763edc5e18bc5ac5bdf78ec56fc0..55907551ac0ce4a6ba20c7969c1e971dfdadef7c 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java @@ -134,7 +134,7 @@ public class DataIoAdapter { private Map columnNameMap = new HashMap<>(); - private Set partitionColumnName = new HashSet<>(); + private Map partitionColumnValues = new HashMap<>(); private List listAtt = new ArrayList<>(); @@ -170,7 +170,6 @@ public class DataIoAdapter { * * @param pageCandidate file split info * @param sparkOutPut data schema - * @param partitionColumn partition column * @param filterOutPut filter schema * @param pushDownOperators push down expressions * @param domains domain map @@ -181,17 +180,17 @@ public class DataIoAdapter { public Iterator getPageIterator( PageCandidate pageCandidate, Seq sparkOutPut, - Seq partitionColumn, Seq filterOutPut, PushDownInfo pushDownOperators, ImmutableMap domains, Boolean isColumnVector, - String omniGroupId) throws TaskExecutionException, UnknownHostException { + String omniGroupId, + Map partitionValues) throws TaskExecutionException, UnknownHostException { // initCandidates initCandidates(pageCandidate, filterOutPut); // add partition column - JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> partitionColumnName.add(a.name())); + partitionColumnValues = partitionValues; // init column info if (pushDownOperators.aggExecutions().size() == 0) { @@ -376,8 +375,8 @@ public class DataIoAdapter { aggProjectionId = fieldMap.size(); fieldMap.put(aggColumnName, aggProjectionId); omnidataTypes.add(prestoType); - boolean isPartitionKey = partitionColumnName.contains(aggColumnName); - String partitionValue = NdpUtils.getPartitionValue(filePath, aggColumnName); + boolean isPartitionKey = partitionColumnValues.containsKey(aggColumnName); + String partitionValue = NdpUtils.getPartitionValue(aggColumnName, isPartitionKey, partitionColumnValues); omnidataColumns.add(new Column(columnId, aggColumnName, prestoType, isPartitionKey, partitionValue)); columnNameSet.add(aggColumnName); @@ -489,9 +488,9 @@ public class DataIoAdapter { columnNameMap.put(aggColumnName, field); int columnId = NdpUtils .getColumnId(expression.toString()) - columnOffset; - boolean isPartitionKey = partitionColumnName.contains(aggColumnName); + boolean isPartitionKey = partitionColumnValues.containsKey(aggColumnName); String partitionValue = NdpUtils - .getPartitionValue(filePath, aggColumnName); + .getPartitionValue(aggColumnName, isPartitionKey, partitionColumnValues); omnidataColumns.add( new Column(columnId, aggColumnName, prestoType, isPartitionKey, partitionValue)); @@ -776,11 +775,11 @@ public class DataIoAdapter { if (null != fieldMap.get(filterColumnName)) { return fieldMap.get(filterColumnName); } - boolean isPartitionKey = partitionColumnName.contains(filterColumnName); + boolean isPartitionKey = partitionColumnValues.containsKey(filterColumnName); int filterProjectionId = fieldMap.size(); fieldMap.put(filterColumnName, filterProjectionId); - String partitionValue = NdpUtils.getPartitionValue(filePath, filterColumnName); + String partitionValue = NdpUtils.getPartitionValue(filterColumnName, isPartitionKey, partitionColumnValues); columnNameSet.add(filterColumnName); omnidataColumns.add(new Column(columnId, filterColumnName, prestoType, isPartitionKey, partitionValue)); @@ -815,9 +814,9 @@ public class DataIoAdapter { Long fileStartPos = pageCandidate.getStartPos(); Long fileLen = pageCandidate.getSplitLen(); if ("ORC".equalsIgnoreCase(fileFormat)) { - dataSource = new HdfsOrcDataSource(filePath, fileStartPos, fileLen, false); + dataSource = new HdfsOrcDataSource(filePath, fileStartPos, fileLen, true); } else if ("PARQUET".equalsIgnoreCase(fileFormat)) { - dataSource = new HdfsParquetDataSource(filePath, fileStartPos, fileLen, false); + dataSource = new HdfsParquetDataSource(filePath, fileStartPos, fileLen, true); } else { throw new UnsupportedOperationException("unsupported data format : " + fileFormat); } @@ -861,8 +860,8 @@ public class DataIoAdapter { String columnName = resAttribute.name().toLowerCase(Locale.ENGLISH); Type type = NdpUtils.transOlkDataType(resAttribute.dataType(), resAttribute, false); int columnId = NdpUtils.getColumnId(resAttribute.toString()) - columnOffset; - isPartitionKey = partitionColumnName.contains(columnName); - String partitionValue = NdpUtils.getPartitionValue(filePath, columnName); + isPartitionKey = partitionColumnValues.containsKey(columnName); + String partitionValue = NdpUtils.getPartitionValue(columnName, isPartitionKey, partitionColumnValues); omnidataColumns.add(new Column(columnId, columnName, type, isPartitionKey, partitionValue)); omnidataTypes.add(type); @@ -890,7 +889,7 @@ public class DataIoAdapter { filterOrdersList.clear(); columnNameMap.clear(); columnOrder = 0; - partitionColumnName.clear(); + partitionColumnValues.clear(); listAtt = JavaConverters.seqAsJavaList(filterOutPut); isPushDownAgg = true; } @@ -905,7 +904,8 @@ public class DataIoAdapter { initCandidatesBeforeDomain(filterOutPut); // add partition column - JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> partitionColumnName.add(a.name())); + JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> + partitionColumnValues.put(a.name().toLowerCase(Locale.ENGLISH), "")); // init column info if (pushDownOperators.aggExecutions().size() == 0) { diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java index 1e787d7c0567f8fe5fe0267b10ab23fa236d4331..3cdc3c5bcc73182899325470bf1de35a39eb3c4f 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java @@ -450,19 +450,17 @@ public class NdpUtils { : OptionalLong.of(limitExeInfo.get().limit()); } - public static String getPartitionValue(String filePath, String columnName) { - String[] filePathStrArray = filePath.split("\\/"); + public static String getPartitionValue( + String columnName, + boolean isPartitionKey, + Map partitionColumnValues) { String partitionValue = ""; - Pattern pn = Pattern.compile(columnName + "\\="); - for (String strColumn : filePathStrArray) { - Matcher matcher = pn.matcher(strColumn); - if (matcher.find()) { - partitionValue = strColumn.split("\\=")[1]; - if (partitionValue.contains("__HIVE_DEFAULT_PARTITION__")) { - partitionValue = null; - } - break; - } + if (!isPartitionKey) { + return partitionValue; + } + partitionValue = partitionColumnValues.get(columnName); + if (partitionValue.contains("__HIVE_DEFAULT_PARTITION__")) { + partitionValue = null; } return partitionValue; } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala index 1d63ab4dac56a72fcc0aab81a5291998afe496c5..ebaf2d4c325a2c77f48afc045c49a5c022f89b4b 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala @@ -1,10 +1,12 @@ package org.apache.spark.sql.execution.command -import scala.collection.mutable +import org.apache.hadoop.fs.Path +import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -14,6 +16,8 @@ import org.apache.spark.sql.functions.countDistinct import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import java.util.Locale + object NdpCommandUtils extends Logging { private[sql] def computeColumnStats( @@ -200,4 +204,29 @@ object NdpCommandUtils extends Logging { cs.copy(histogram = Some(histogram)) } } + + private def partitionPathExpression(partitionColumns: Seq[Attribute], timeZoneId: String): Expression = { + Concat( + partitionColumns.zipWithIndex.flatMap { case (c, i) => + val partitionName = ScalaUDF( + ExternalCatalogUtils.getPartitionPathString _, + StringType, + Seq(Literal(c.name), Cast(c, StringType, Option(timeZoneId)))) + if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR_CHAR), partitionName) + }) + } + + def getPartitionPath(partitionColumns: Seq[Attribute], + timeZoneId: String, + row: InternalRow): java.util.Map[String, String] = { + val proj = UnsafeProjection.create(Seq(partitionPathExpression(partitionColumns, timeZoneId)), partitionColumns) + val partitionValues: java.util.Map[String, String] = new java.util.HashMap[String, String]() + proj(row).getString(0).split(Path.SEPARATOR_CHAR).foreach { line => + val values = line.split("=") + if (values.length == 2) { + partitionValues.put(values(0).toLowerCase(Locale.ENGLISH), ExternalCatalogUtils.unescapePathName(values(1))) + } + } + partitionValues + } } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala index fadb8eae150bf6da38421be1a19a12b9aeea6cc4..fcb07b74e856a2a6a5a4873f3a48ee885ea740e9 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources import com.google.common.collect.ImmutableMap import com.huawei.boostkit.omnidata.exception.OmniDataException - import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.parquet.io.ParquetDecodingException @@ -32,16 +32,18 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.{DataIoAdapter, NdpUtils, PageCandidate, PageToColumnar, PushDownManager, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Attribute, BasePredicate, Expression, Predicate, UnsafeProjection} +import org.apache.spark.sql.execution.command.NdpCommandUtils import org.apache.spark.sql.execution.ndp.NdpSupport.filterStripEnd import org.apache.spark.sql.execution.{QueryExecutionException, RowToColumnConverter} import org.apache.spark.sql.execution.ndp.{FilterExeInfo, NdpConf, PushDownInfo} import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.NextIterator - import java.io.{FileNotFoundException, IOException} + import scala.util.Random @@ -516,8 +518,9 @@ class FileScanRDDPushDown( val pageCandidate = new PageCandidate(currentFile.filePath, currentFile.start, currentFile.length, columnOffset, sdiHosts, fileFormat.toString, maxFailedTimes, taskTimeout,operatorCombineEnabled) - val dataIoPage = dataIoClass.getPageIterator(pageCandidate, output, - partitionColumns, filterOutput, pushDownOperators, domains, isColumnVector, omniGroupId) + val dataIoPage = dataIoClass.getPageIterator(pageCandidate, output, filterOutput, pushDownOperators, domains, + isColumnVector, omniGroupId, NdpCommandUtils.getPartitionPath( + partitionColumns, SQLConf.get.sessionLocalTimeZone, currentFile.partitionValues)) currentIterator = pageToColumnarClass.transPageToColumnar(dataIoPage, isColumnVector, dataIoClass.isOperatorCombineEnabled, output, orcImpl).asScala.iterator iteHasNext()