From 6add85c851eafdd9c8da8a2b0066768e239cb961 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Fri, 21 Mar 2025 15:06:48 +0800 Subject: [PATCH 1/2] Init filesystem by default hive config --- .../org/apache/spark/util/HadoopFSUtils.scala | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 4fcc163cc..5cd827364 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -18,8 +18,7 @@ package org.apache.spark.util -import java.io.FileNotFoundException - +import java.io.{FileNotFoundException, IOException} import scala.collection.mutable import org.apache.hadoop.conf.Configuration @@ -31,10 +30,42 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics +import scala.collection.JavaConverters.asScalaIteratorConverter + /** * Utility functions to simplify and speed-up file listing. */ private[spark] object HadoopFSUtils extends Logging { + + private lazy val hiveConfKeys = { + val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") + if (configFile != null) { + val conf = new Configuration(false) + conf.addResource(configFile) + conf.iterator().asScala.toSeq + } else { + Nil + } + } + + private def appendHiveConfigs(hadoopConf: Configuration): Unit = { + hiveConfKeys.foreach { kv => + hadoopConf.set(kv.getKey, kv.getValue) + } + } + + { + try { + val hadoopConf = new Configuration() + appendHiveConfigs(hadoopConf) + FileSystem.get(hadoopConf) + logInfo("Init filesystem by default hive config.") + } catch { + case e: IOException => + logInfo("Failed to init filesystem by default hive config." ) + } + } + /** * Lists a collection of paths recursively. Picks the listing strategy adaptively depending * on the number of paths to list. -- Gitee From 432b92d967d36df49a5e2645f019fcd85a708933 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Fri, 21 Mar 2025 15:11:49 +0800 Subject: [PATCH 2/2] avoid redundant conf init --- .../sql/execution/datasources/orc/OmniOrcFileFormat.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala index 0db345e92..bea8e3865 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc import com.huawei.boostkit.spark.ColumnarPluginConfig.ENABLE_VEC_PREDICATE_FILTER import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -83,7 +84,7 @@ class OmniOrcFileFormat extends FileFormat with DataSourceRegister with Serializ // ORC predicate pushdown val pushed = filters.reduceOption(And(_, _)) - val taskConf = new Configuration(conf) + val taskConf = new JobConf(conf) val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) -- Gitee