diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java index 2a0a67b40c20b58ca23e24c2f0c144ff8ac0f956..911048c42297d137b4a17551b829f56232d93bae 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java @@ -166,6 +166,7 @@ public class DataIoAdapter { * @param partitionColumn partition column * @param filterOutPut filter schema * @param pushDownOperators push down expressions + * @param domains domain map * @return WritableColumnVector data result info * @throws TaskExecutionException connect to omni-data-server failed exception * @notice 3rd parties api throws Exception, function has to catch basic Exception @@ -175,7 +176,8 @@ public class DataIoAdapter { Seq sparkOutPut, Seq partitionColumn, Seq filterOutPut, - PushDownInfo pushDownOperators) throws TaskExecutionException, UnknownHostException { + PushDownInfo pushDownOperators, + ImmutableMap domains) throws TaskExecutionException, UnknownHostException { // initCandidates initCandidates(pageCandidate, filterOutPut); @@ -202,7 +204,7 @@ public class DataIoAdapter { Predicate predicate = new Predicate( omnidataTypes, omnidataColumns, filterRowExpression, omnidataProjections, - buildDomains(filterRowExpression), ImmutableMap.of(), aggregations, limitLong); + domains, ImmutableMap.of(), aggregations, limitLong); TaskSource taskSource = new TaskSource(dataSource, predicate, MAX_PAGE_SIZE_IN_BYTES); // create deserializer @@ -211,11 +213,20 @@ public class DataIoAdapter { PageDeserializer deserializer = initPageDeserializer(); // get available host - String[] pushDownHostArray = pageCandidate.getpushDownHosts().split(","); - List pushDownHostList = new ArrayList<>(Arrays.asList(pushDownHostArray)); - Optional availablePushDownHost = getRandomAvailablePushDownHost(pushDownHostArray, - JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); - availablePushDownHost.ifPresent(pushDownHostList::add); + List pushDownHostList = new ArrayList<>(); + String[] pushDownHostArray; + if (pageCandidate.getpushDownHosts().length() == 0) { + Optional availablePushDownHost = getRandomAvailablePushDownHost(new String[]{}, + JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); + availablePushDownHost.ifPresent(pushDownHostList::add); + pushDownHostArray = pushDownHostList.toArray(new String[]{}); + } else { + pushDownHostArray = pageCandidate.getpushDownHosts().split(","); + pushDownHostList = new ArrayList<>(Arrays.asList(pushDownHostArray)); + Optional availablePushDownHost = getRandomAvailablePushDownHost(pushDownHostArray, + JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); + availablePushDownHost.ifPresent(pushDownHostList::add); + } return getIterator(pushDownHostList.iterator(), taskSource, pushDownHostArray, deserializer, pushDownHostList.size()); } @@ -275,11 +286,12 @@ public class DataIoAdapter { private Optional getRandomAvailablePushDownHost(String[] pushDownHostArray, Map fpuHosts) { List existingHosts = Arrays.asList(pushDownHostArray); - List allHosts = new ArrayList<>(fpuHosts.values()); + List allHosts = new ArrayList<>(fpuHosts.keySet()); allHosts.removeAll(existingHosts); if (allHosts.size() > 0) { - LOG.info("Add another available host: " + allHosts.get(0)); - return Optional.of(allHosts.get(0)); + int randomIndex = (int) (Math.random() * allHosts.size()); + LOG.info("Add another available host: " + allHosts.get(randomIndex)); + return Optional.of(allHosts.get(randomIndex)); } else { return Optional.empty(); } @@ -304,24 +316,11 @@ public class DataIoAdapter { } private void initCandidates(PageCandidate pageCandidate, Seq filterOutPut) { - omnidataTypes.clear(); - omnidataColumns.clear(); - omnidataProjections.clear(); - fieldMap.clear(); - columnNameSet.clear(); - columnTypesList.clear(); - columnOrdersList.clear(); - filterTypesList.clear(); - filterOrdersList.clear(); - partitionColumnName.clear(); - columnNameMap.clear(); - columnOrder = 0; + initCandidatesBeforeDomain(filterOutPut); filePath = pageCandidate.getFilePath(); columnOffset = pageCandidate.getColumnOffset(); - listAtt = JavaConverters.seqAsJavaList(filterOutPut); TASK_FAILED_TIMES = pageCandidate.getMaxFailedTimes(); taskTimeout = pageCandidate.getTaskTimeout(); - isPushDownAgg = true; } private RowExpression extractNamedExpression(NamedExpression namedExpression) { @@ -904,7 +903,44 @@ public class DataIoAdapter { return isOperatorCombineEnabled; } - public ImmutableMap buildDomains(Optional filterRowExpression) { + private void initCandidatesBeforeDomain(Seq filterOutPut) { + omnidataTypes.clear(); + omnidataColumns.clear(); + omnidataProjections.clear(); + columnNameSet.clear(); + columnTypesList.clear(); + columnOrdersList.clear(); + fieldMap.clear(); + filterTypesList.clear(); + filterOrdersList.clear(); + columnNameMap.clear(); + columnOrder = 0; + partitionColumnName.clear(); + listAtt = JavaConverters.seqAsJavaList(filterOutPut); + isPushDownAgg = true; + } + + public ImmutableMap buildDomains( + Seq sparkOutPut, + Seq partitionColumn, + Seq filterOutPut, + PushDownInfo pushDownOperators) { + + // initCandidates + initCandidatesBeforeDomain(filterOutPut); + + // add partition column + JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> partitionColumnName.add(a.name())); + + // init column info + if (pushDownOperators.aggExecutions().size() == 0) { + isPushDownAgg = false; + initColumnInfo(sparkOutPut); + } + + // create filter + Optional filterRowExpression = initFilter(pushDownOperators.filterExecutions()); + long startTime = System.currentTimeMillis(); ImmutableMap.Builder domains = ImmutableMap.builder(); if (filterRowExpression.isPresent() && NdpConf.getNdpDomainGenerateEnable(TaskContext.get())) { diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala index 21fd6a29c2ca274818354ef05786645d08f331f5..4471ff6f16c7dd08505484acd0581350b996b88d 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.datasources + import com.google.common.collect.ImmutableMap -import io.prestosql.spi.relation.RowExpression import java.util import scala.collection.JavaConverters._ @@ -31,15 +31,16 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.{DataIoAdapter, NdpUtils, PageCandidate, PageToColumnar, PushDownManager, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Attribute, BasePredicate, Expression, Predicate, UnsafeProjection} +import org.apache.spark.sql.execution.ndp.NdpSupport.filterStripEnd import org.apache.spark.sql.execution.{QueryExecutionException, RowToColumnConverter} import org.apache.spark.sql.execution.ndp.{FilterExeInfo, NdpConf, PushDownInfo} import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator -import java.io.FileNotFoundException -import java.util.Optional +import java.io.{FileNotFoundException, IOException} import scala.util.Random @@ -60,7 +61,8 @@ class FileScanRDDPushDown( partialCondition: Boolean, partialPdRate: Double, zkPdRate: Double, - partialChildOutput: Seq[Attribute]) + partialChildOutput: Seq[Attribute], + isFakePushDown: Boolean = false) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { var columnOffset = -1 @@ -77,7 +79,7 @@ class FileScanRDDPushDown( columnOffset = NdpUtils.getColumnOffset(dataSchema, output) filterOutput = output } - var fpuMap = pushDownOperators.fpuHosts + var fpuMap = pushDownOperators.fpuHosts.map(term => (term._2, term._1)) var fpuList : Seq[String] = Seq() for (key <- fpuMap.keys) { fpuList = fpuList :+ key @@ -103,21 +105,219 @@ class FileScanRDDPushDown( private val zkAddress = NdpConf.getNdpZookeeperAddress(sparkSession) private val taskTimeout = NdpConf.getTaskTimeout(sparkSession) private val operatorCombineEnabled = NdpConf.getNdpOperatorCombineEnabled(sparkSession) - val orcImpl = sparkSession.sessionState.conf.getConf(ORC_IMPLEMENTATION) + val orcImpl: String = sparkSession.sessionState.conf.getConf(ORC_IMPLEMENTATION) + + private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + + var pushDownIterator : PushDownIterator = null + var forceOmniDataPushDown : Boolean = false override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { + if(isFakePushDown){ + log.info("Fake push down\n") + computeSparkRDDAndFakePushDown(split, context) + } else { + log.info("Really push down\n") + computePushDownRDD(split, context) + } + } + + def computePushDownRDD(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { val pageToColumnarClass = new PageToColumnar(requiredSchema, output) - var iterator : PushDownIterator = null - if (isPartialPushDown(partialCondition, partialPdRate, zkPdRate)) { + if (!forceOmniDataPushDown && isPartialPushDown(partialCondition, partialPdRate, zkPdRate)) { logDebug("partial push down task on spark") val partialFilterCondition = pushDownOperators.filterExecutions.reduce((a, b) => FilterExeInfo(And(a.filter, b.filter), partialChildOutput)) - val predicate = Predicate.create(partialFilterCondition.filter, partialChildOutput) + var partialFilter : Expression = null + if (orcImpl.equals("hive")) { + partialFilter = partialFilterCondition.filter + } else { + partialFilter = filterStripEnd(partialFilterCondition.filter) + } + val predicate = Predicate.create(partialFilter, partialChildOutput) predicate.initialize(0) - iterator = new PartialPushDownIterator(split, context, pageToColumnarClass, predicate) + pushDownIterator = new PartialPushDownIterator(split, context, pageToColumnarClass, predicate) } else { logDebug("partial push down task on omnidata") - iterator = new PushDownIterator(split, context, pageToColumnarClass) + pushDownIterator = new PushDownIterator(split, context, pageToColumnarClass) + } + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener[Unit](_ => pushDownIterator.close()) + + pushDownIterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. + } + + class FakePushDownThread(sparkThread: Thread, + split: RDDPartition, + context: TaskContext, + scan : FileScanRDDPushDown, + sparkLog : org.slf4j.Logger) extends Thread { + var times = 0 + scan.forceOmniDataPushDown = true + val iter: Iterator[Any] = scan.computePushDownRDD(split, context) + override def run(): Unit = { + while (!context.isCompleted() && sparkThread.isAlive && times <= 5 && iter.hasNext) { + sparkLog.info(">>>>>>Fake push down Thread [running]>>>>>") + Thread.sleep(200) + times = times + 1 + val currentValue = iter.next() + currentValue match { + case batch: ColumnarBatch => batch.close() + case _ => + } + } + sparkLog.info(">>>>>>Fake push down Thread [end]>>>>>") + pushDownIterator.close() + sparkLog.info("pushDownIterator close") + this.interrupt() + } + } + + def doFakePush(split: RDDPartition, context: TaskContext, scan : FileScanRDDPushDown): Unit ={ + val fakePushDownThread = new FakePushDownThread(Thread.currentThread(), split, context, scan, log) + fakePushDownThread.start() + } + + def computeSparkRDDAndFakePushDown(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { + //this code (computeSparkRDDAndFakePushDown) from spark FileScanRDD + doFakePush(split, context, this) + val iterator = new Iterator[Object] with AutoCloseable { + private val inputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // apply readFunction, because it might read some bytes. + private val getBytesReadCallback = + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + + // We get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). + private def incTaskInputMetricsBytesRead(): Unit = { + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) + } + + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + private[this] var currentFile: PartitionedFile = null + private[this] var currentIterator: Iterator[Object] = null + + def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + def next(): Object = { + val nextElement = currentIterator.next() + // TODO: we should have a better separation of row based and batch based scan, so that we + // don't need to run this `if` for every record. + val preNumRecordsRead = inputMetrics.recordsRead + if (nextElement.isInstanceOf[ColumnarBatch]) { + incTaskInputMetricsBytesRead() + inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) + } else { + // too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } + inputMetrics.incRecordsRead(1) + } + nextElement + } + + private def readCurrentFile(): Iterator[InternalRow] = { + try { + readFunction(currentFile) + } catch { + case e: FileNotFoundException => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + } + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + private def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + logInfo(s"Reading File $currentFile") + // Sets InputFileBlockHolder for the file block's information + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + if (ignoreMissingFiles || ignoreCorruptFiles) { + currentIterator = new NextIterator[Object] { + // The readFunction may read some bytes before consuming the iterator, e.g., + // vectorized Parquet reader. Here we use lazy val to delay the creation of + // iterator so that we will throw exception in `getNext`. + private lazy val internalIter = readCurrentFile() + + override def getNext(): AnyRef = { + try { + if (internalIter.hasNext) { + internalIter.next() + } else { + finished = true + null + } + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentFile", e) + finished = true + null + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentFile", e) + finished = true + null + } + } + + override def close(): Unit = {} + } + } else { + currentIterator = readCurrentFile() + } + + try { + hasNext + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + case e: ParquetDecodingException => + if (e.getCause.isInstanceOf[SparkUpgradeException]) { + throw e.getCause + } else if (e.getMessage.contains("Can not read value at")) { + val message = "Encounter error while reading parquet files. " + + "One possible cause: Parquet column cannot be converted in the " + + "corresponding files. Details: " + throw new QueryExecutionException(message, e) + } + throw e + } + } else { + currentFile = null + InputFileBlockHolder.unset() + false + } + } + + override def close(): Unit = { + incTaskInputMetricsBytesRead() + InputFileBlockHolder.unset() + } } + // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener[Unit](_ => iterator.close()) @@ -134,6 +334,14 @@ class FileScanRDDPushDown( } override protected def getPartitions: Array[RDDPartition] = { + if(isFakePushDown) { + getSparkPartitions + } else { + getPushDownPartitions + } + } + + def getPushDownPartitions: Array[RDDPartition] = { filePartitions.map { partitionFile => { val retHost = mutable.HashMap.empty[String, Long] partitionFile.files.foreach { partitionMap => { @@ -178,6 +386,8 @@ class FileScanRDDPushDown( filePartitions.toArray } + def getSparkPartitions: Array[RDDPartition] = filePartitions.toArray + override protected def getPreferredLocations(split: RDDPartition): Seq[String] = { split.asInstanceOf[FilePartition].preferredLocations() } @@ -200,6 +410,7 @@ class FileScanRDDPushDown( var currentIterator: Iterator[Object] = null val sdiHosts: String = split.asInstanceOf[FilePartition].sdi val dataIoClass = new DataIoAdapter() + val domains: ImmutableMap[_, _] = dataIoClass.buildDomains(output,partitionColumns, filterOutput, pushDownOperators) def hasNext: Boolean = { // Kill the task in case it has been marked as killed. This logic is from @@ -255,7 +466,7 @@ class FileScanRDDPushDown( currentFile.length, columnOffset, sdiHosts, fileFormat.toString, maxFailedTimes, taskTimeout,operatorCombineEnabled) val dataIoPage = dataIoClass.getPageIterator(pageCandidate, output, - partitionColumns, filterOutput, pushDownOperators) + partitionColumns, filterOutput, pushDownOperators, domains) currentIterator = pageToColumnarClass.transPageToColumnar(dataIoPage, isColumnVector, dataIoClass.isOperatorCombineEnabled, output, orcImpl).asScala.iterator iteHasNext()