diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala index 0db345e92a34c99e7540061162a6078ca4bbc427..bea8e38655505204ba596fcece8a21ec7b190316 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc import com.huawei.boostkit.spark.ColumnarPluginConfig.ENABLE_VEC_PREDICATE_FILTER import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -83,7 +84,7 @@ class OmniOrcFileFormat extends FileFormat with DataSourceRegister with Serializ // ORC predicate pushdown val pushed = filters.reduceOption(And(_, _)) - val taskConf = new Configuration(conf) + val taskConf = new JobConf(conf) val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 4fcc163ccf4339c8f0048ff2f6b34fb4e4188df9..5cd8273649f069d4bb23a51e19a6db906fcb622c 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -18,8 +18,7 @@ package org.apache.spark.util -import java.io.FileNotFoundException - +import java.io.{FileNotFoundException, IOException} import scala.collection.mutable import org.apache.hadoop.conf.Configuration @@ -31,10 +30,42 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics +import scala.collection.JavaConverters.asScalaIteratorConverter + /** * Utility functions to simplify and speed-up file listing. */ private[spark] object HadoopFSUtils extends Logging { + + private lazy val hiveConfKeys = { + val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") + if (configFile != null) { + val conf = new Configuration(false) + conf.addResource(configFile) + conf.iterator().asScala.toSeq + } else { + Nil + } + } + + private def appendHiveConfigs(hadoopConf: Configuration): Unit = { + hiveConfKeys.foreach { kv => + hadoopConf.set(kv.getKey, kv.getValue) + } + } + + { + try { + val hadoopConf = new Configuration() + appendHiveConfigs(hadoopConf) + FileSystem.get(hadoopConf) + logInfo("Init filesystem by default hive config.") + } catch { + case e: IOException => + logInfo("Failed to init filesystem by default hive config." ) + } + } + /** * Lists a collection of paths recursively. Picks the listing strategy adaptively depending * on the number of paths to list.