diff --git a/omnicache/omnicache-spark-extension/build.sh b/omnicache/omnicache-spark-extension/build.sh
index ad436cddd524c22781e0c934c675085ff0537516..6c2bb474492f1649f4370ddd4f74fd3e7548aae0 100644
--- a/omnicache/omnicache-spark-extension/build.sh
+++ b/omnicache/omnicache-spark-extension/build.sh
@@ -1,2 +1,3 @@
#!/bin/bash
-mvn clean package
\ No newline at end of file
+cpu_name="-"$(lscpu | grep Architecture | awk '{print $2}')
+mvn clean package -Ddep.os.arch="${cpu_name}"
\ No newline at end of file
diff --git a/omnicache/omnicache-spark-extension/log-parser/pom.xml b/omnicache/omnicache-spark-extension/log-parser/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..75f93ca239dde0b92a2ea43e91d56e56444b9560
--- /dev/null
+++ b/omnicache/omnicache-spark-extension/log-parser/pom.xml
@@ -0,0 +1,240 @@
+
+
+
+
+ com.huawei.kunpeng
+ boostkit-omnicache-spark-parent
+ 3.1.1-1.0.0
+
+ 4.0.0
+
+ boostkit-omnicache-logparser-spark
+ jar
+ 3.1.1-1.0.0
+
+ log-parser
+
+
+ 14.0.1
+
+
+
+
+ com.huawei.kunpeng
+ boostkit-omnicache-spark
+ 3.1.1-1.0.0
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ test-jar
+ test
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.curator
+ curator-recipes
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.scalatest
+ scalatest_${scala.binary.version}
+ test
+
+
+ org.scala-lang
+ scala-library
+
+
+
+
+ ${artifactId}-${version}${dep.os.arch}
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+ org.antlr
+ antlr4-maven-plugin
+
+
+
+ antlr4
+
+
+
+
+ true
+ src/main/antlr4
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ ${java.version}
+ ${java.version}
+
+
+
+ compile
+
+ compile
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+ ${scala.recompile.mode}
+
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+ org.scalastyle
+ scalastyle-maven-plugin
+
+ false
+ true
+ true
+ false
+ ${project.basedir}/src/main/scala
+ ${project.basedir}/src/test/scala
+ ${user.dir}/scalastyle-config.xml
+ ${project.basedir}/target/scalastyle-output.xml
+ ${project.build.sourceEncoding}
+ ${project.reporting.outputEncoding}
+
+
+
+
+ check
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+ false
+ true
+
+ ${project.basedir}/src/main/java
+ ${project.basedir}/src/main/scala
+
+
+ ${project.basedir}/src/test/java
+ ${project.basedir}/src/test/scala
+
+ dev/checkstyle.xml
+ ${project.basedir}/target/checkstyle-output.xml
+ ${project.build.sourceEncoding}
+ ${project.reporting.outputEncoding}
+
+
+
+ com.puppycrawl.tools
+ checkstyle
+ ${puppycrawl.checkstyle.version}
+
+
+
+
+
+ check
+
+
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+
+ false
+
+ ${project.build.directory}/surefire-reports
+ .
+ TestSuite.txt
+
+
+
+ test
+
+ test
+
+
+
+
+
+ org.scoverage
+ scoverage-maven-plugin
+
+
+ test
+ test
+
+ report
+
+
+
+
+ true
+ true
+ ${project.build.sourceEncoding}
+ true
+
+
+
+
+
\ No newline at end of file
diff --git a/omnicache/omnicache-spark-extension/log-parser/src/main/scala/org/apache/spark/deploy/history/LogsParser.scala b/omnicache/omnicache-spark-extension/log-parser/src/main/scala/org/apache/spark/deploy/history/LogsParser.scala
new file mode 100644
index 0000000000000000000000000000000000000000..fa44697206d909229320a6075bd37d2129848785
--- /dev/null
+++ b/omnicache/omnicache-spark-extension/log-parser/src/main/scala/org/apache/spark/deploy/history/LogsParser.scala
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.ServiceLoader
+import java.util.regex.Pattern
+
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Json
+import scala.collection.JavaConverters.iterableAsScalaIterableConverter
+import scala.collection.mutable
+import scala.util.control.Breaks
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Status.ASYNC_TRACKING_ENABLED
+import org.apache.spark.scheduler.ReplayListenerBus
+import org.apache.spark.scheduler.ReplayListenerBus.{ReplayEventsFilter, SELECT_ALL_FILTER}
+import org.apache.spark.sql.catalyst.optimizer.rules._
+import org.apache.spark.sql.execution.ui._
+import org.apache.spark.status._
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
+
+class LogsParser(conf: SparkConf, eventLogDir: String, outPutDir: String) extends Logging {
+
+ private val LINE_SEPARATOR = "\n"
+ private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ // Visible for testing
+ private[history] val fs: FileSystem = new Path(eventLogDir).getFileSystem(hadoopConf)
+
+ /**
+ * parseAppHistoryLog
+ *
+ * @param appId appId
+ * @param fileName fileName
+ */
+ def parseAppHistoryLog(appId: String, fileName: String): Unit = {
+ val inMemoryStore = new InMemoryStore()
+ val reader = EventLogFileReader(fs, new Path(eventLogDir, appId))
+ rebuildAppStore(inMemoryStore, reader.get)
+
+ val sqlStatusStore = new SQLAppStatusStore(inMemoryStore)
+ val mvRewriteSuccessInfo = getMVRewriteSuccessInfo(inMemoryStore)
+
+ // create OutputDir
+ if (!fs.exists(new Path(outPutDir))) {
+ fs.mkdirs(new Path(outPutDir))
+ }
+
+ // continue for curLoop
+ val curLoop = new Breaks
+
+ var jsons = Seq.empty[Map[String, String]]
+ sqlStatusStore.executionsList().foreach { execution =>
+ curLoop.breakable {
+ // skip unNormal execution
+ val isRunning = execution.completionTime.isEmpty ||
+ execution.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
+ val isFailed = execution
+ .jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
+ if (isRunning || isFailed) {
+ curLoop.break()
+ }
+
+ val uiData = execution
+ val executionId = uiData.executionId
+ val planDesc = uiData.physicalPlanDescription
+ val query = uiData.description
+ var mvs = mvRewriteSuccessInfo.getOrElse(query, "")
+ if (mvs.nonEmpty) {
+ mvs.split(";").foreach { mv =>
+ if (!planDesc.contains(mv)) {
+ mvs = ""
+ }
+ }
+ }
+
+ // write dot
+ val graph: SparkPlanGraph = sqlStatusStore.planGraph(executionId)
+ sqlStatusStore.planGraph(executionId)
+ val metrics = sqlStatusStore.executionMetrics(executionId)
+ val node = getNodeInfo(graph)
+
+ val jsonMap = Map(
+ "executionId" -> executionId.toString,
+ "original query" -> query,
+ "materialized views" -> mvs,
+ "physical plan" -> planDesc,
+ "dot metrics" -> graph.makeDotFile(metrics),
+ "node metrics" -> node)
+ jsons :+= jsonMap
+ }
+ }
+ // write json file into hdfs
+ val jsonFile: String = Json(DefaultFormats).write(jsons)
+ writeFile(fs, outPutDir + "/" + fileName + ".json", jsonFile)
+ }
+
+ /**
+ * getMVRewriteSuccessInfo
+ *
+ * @param store KVStore
+ * @return {sql:mvs}
+ */
+ def getMVRewriteSuccessInfo(store: KVStore): mutable.Map[String, String] = {
+ val infos = mutable.Map.empty[String, String]
+ try {
+ // The ApplicationInfo may not be available when Spark is starting up.
+ Utils.tryWithResource(
+ store.view(classOf[SparkListenerMVRewriteSuccess])
+ .closeableIterator()
+ ) { it =>
+ while (it.hasNext) {
+ val info = it.next()
+ infos += (info.sql -> info.usingMvs)
+ }
+ }
+ } catch {
+ case e: NoSuchElementException =>
+ logWarning("getMVRewriteSuccessInfo is failed for ", e)
+ }
+ infos
+ }
+
+ /**
+ * getNodeInfo from graph
+ *
+ * @param graph SparkPlanGraph
+ * @return NodeInfo
+ */
+ def getNodeInfo(graph: SparkPlanGraph): String = {
+ // write node
+ val tmpContext = new StringBuilder
+ tmpContext.append("[PlanMetric]")
+ nextLine(tmpContext)
+ graph.allNodes.foreach { node =>
+ tmpContext.append(s"id:${node.id} name:${node.name} desc:${node.desc}")
+ nextLine(tmpContext)
+ node.metrics.foreach { metric =>
+ metric.toString
+ tmpContext.append("SQLPlanMetric(")
+ tmpContext.append(metric.name)
+ tmpContext.append(",")
+ if (metric.metricType == "timing") {
+ tmpContext.append(s"${metric.accumulatorId * 1000000} ns, ")
+ } else if (metric.metricType == "nsTiming") {
+ tmpContext.append(s"${metric.accumulatorId} ns, ")
+ } else {
+ tmpContext.append(s"${metric.accumulatorId}, ")
+ }
+ tmpContext.append(metric.metricType)
+ tmpContext.append(")")
+ nextLine(tmpContext)
+ }
+ nextLine(tmpContext)
+ nextLine(tmpContext)
+ nextLine(tmpContext)
+ }
+
+ graph.edges.foreach { edges =>
+ tmpContext.append(edges.makeDotEdge)
+ nextLine(tmpContext)
+ }
+
+ tmpContext.append("[SubGraph]")
+ nextLine(tmpContext)
+ graph.allNodes.foreach {
+ case cluster: SparkPlanGraphCluster =>
+ tmpContext.append(s"cluster ${cluster.id} : ")
+ for (i <- cluster.nodes.indices) {
+ tmpContext.append(s"${cluster.nodes(i).id} ")
+ }
+ nextLine(tmpContext)
+ case node =>
+ }
+ nextLine(tmpContext)
+ tmpContext.toString()
+ }
+
+ def nextLine(context: StringBuilder): Unit = {
+ context.append(LINE_SEPARATOR)
+ }
+
+ /**
+ * rebuildAppStore
+ *
+ * @param store KVStore
+ * @param reader EventLogFileReader
+ */
+ private[spark] def rebuildAppStore(store: KVStore, reader: EventLogFileReader): Unit = {
+ // Disable async updates, since they cause higher memory usage, and it's ok to take longer
+ // to parse the event logs in the SHS.
+ val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false)
+ val trackingStore = new ElementTrackingStore(store, replayConf)
+ val replayBus = new ReplayListenerBus()
+ val listener = new AppStatusListener(trackingStore, replayConf, false)
+ replayBus.addListener(listener)
+ replayBus.addListener(new MVRewriteSuccessListener(trackingStore))
+
+ for {
+ plugin <- loadPlugins()
+ listener <- plugin.createListeners(conf, trackingStore)
+ } replayBus.addListener(listener)
+
+ try {
+ val eventLogFiles = reader.listEventLogFiles
+
+ // parse event log
+ parseAppEventLogs(eventLogFiles, replayBus, !reader.completed)
+ trackingStore.close(false)
+ } catch {
+ case e: Exception =>
+ Utils.tryLogNonFatalError {
+ trackingStore.close()
+ }
+ throw e
+ }
+ }
+
+ /**
+ * loadPlugins
+ *
+ * @return Plugins
+ */
+ private def loadPlugins(): Iterable[AppHistoryServerPlugin] = {
+ val plugins = ServiceLoader.load(classOf[AppHistoryServerPlugin],
+ Utils.getContextOrSparkClassLoader).asScala
+ plugins
+ }
+
+ /**
+ * parseAppEventLogs
+ *
+ * @param logFiles Seq[FileStatus]
+ * @param replayBus ReplayListenerBus
+ * @param maybeTruncated Boolean
+ * @param eventsFilter ReplayEventsFilter
+ */
+ private def parseAppEventLogs(logFiles: Seq[FileStatus],
+ replayBus: ReplayListenerBus,
+ maybeTruncated: Boolean,
+ eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+ // stop replaying next log files if ReplayListenerBus indicates some error or halt
+ var continueReplay = true
+ logFiles.foreach { file =>
+ if (continueReplay) {
+ Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath, fs)) { in =>
+ continueReplay = replayBus.replay(in, file.getPath.toString,
+ maybeTruncated = maybeTruncated, eventsFilter = eventsFilter)
+ }
+ }
+ }
+ }
+
+ /**
+ * write parsed logInfo to logPath
+ *
+ * @param fs FileSystem
+ * @param logPath logPath
+ * @param context logInfo
+ */
+ private def writeFile(fs: FileSystem, logPath: String, context: String): Unit = {
+ val os = fs.create(new Path(logPath))
+ os.write(context.getBytes())
+ os.close()
+ }
+}
+
+/*
+arg0: spark.eventLog.dir, eg. hdfs://server1:9000/spark2-history
+arg1: output dir in hdfs, eg. hdfs://server1:9000/logParser
+arg2: log file to be parsed, eg. application_1646816941391_0115.lz4
+ */
+object ParseLog extends Logging {
+ def main(args: Array[String]): Unit = {
+ if (args == null || args.length != 3) {
+ throw new RuntimeException("input params is invalid,such as below\n" +
+ "arg0: spark.eventLog.dir, eg. hdfs://server1:9000/spark2-history\n" +
+ "arg1: output dir in hdfs, eg. hdfs://server1:9000/logParser\n" +
+ "arg2: log file to be parsed, eg. application_1646816941391_0115.lz4\n")
+ }
+ val sparkEventLogDir = args(0)
+ val outputDir = args(1)
+ val logName = args(2)
+
+ val conf = new SparkConf
+ // spark.eventLog.dir
+ conf.set("spark.eventLog.dir", sparkEventLogDir)
+ // spark.eventLog.compress
+ conf.set("spark.eventLog.compress", "true")
+ // fs.hdfs.impl
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
+ val logParser = new LogsParser(conf, sparkEventLogDir, outputDir)
+
+ // file pattern
+ val regex = "application_[0-9]+._[0-9]+.lz4"
+ val pattern = Pattern.compile(regex)
+ val matcher = pattern.matcher(logName)
+ if (matcher.find) {
+ val appId = matcher.group
+ logParser.parseAppHistoryLog(appId, logName)
+ } else {
+ throw new RuntimeException(logName + " is illegal")
+ }
+ }
+}
diff --git a/omnicache/omnicache-spark-extension/log-parser/src/test/resources/application_1663257594501_0003.lz4 b/omnicache/omnicache-spark-extension/log-parser/src/test/resources/application_1663257594501_0003.lz4
new file mode 100644
index 0000000000000000000000000000000000000000..f2b568905c4a786b9eec43dc8c5f05d1eee68e18
Binary files /dev/null and b/omnicache/omnicache-spark-extension/log-parser/src/test/resources/application_1663257594501_0003.lz4 differ
diff --git a/omnicache/omnicache-spark-extension/log-parser/src/test/scala/org/apache/spark/deploy/history/LogsParserSuite.scala b/omnicache/omnicache-spark-extension/log-parser/src/test/scala/org/apache/spark/deploy/history/LogsParserSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..d686b0e379726bda722f48b681dfb90e2cd828f2
--- /dev/null
+++ b/omnicache/omnicache-spark-extension/log-parser/src/test/scala/org/apache/spark/deploy/history/LogsParserSuite.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{FileInputStream, FileNotFoundException}
+
+import org.apache.commons.io.IOUtils
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Json
+
+import org.apache.spark.SparkFunSuite
+
+class LogsParserSuite extends SparkFunSuite {
+
+ test("parse") {
+ val path = this.getClass.getResource("/").getPath
+ val args: Array[String] = Array(path, "eventlog/parse"
+ , "application_1663257594501_0003.lz4")
+ ParseLog.main(args)
+ val fis = new FileInputStream("eventlog/parse/application_1663257594501_0003.lz4.json")
+ val lines = IOUtils.readLines(fis, "UTF-8")
+ IOUtils.closeQuietly(fis)
+ val json: Seq[Map[String, String]] = Json(DefaultFormats)
+ .read[Seq[Map[String, String]]](lines.get(0))
+ assert(json.exists(map =>
+ map.contains("materialized views") &&
+ map("physical plan").contains(map("materialized views"))
+ ))
+ }
+
+ test("error_invalid_param") {
+ assertThrows[RuntimeException] {
+ val path = this.getClass.getResource("/").getPath
+ val args: Array[String] = Array(path, "eventlog/parse")
+ ParseLog.main(args)
+ }
+ assertThrows[RuntimeException] {
+ val path = this.getClass.getResource("/").getPath
+ val args: Array[String] = Array(path, "eventlog/parse"
+ , "application_1663257594501_0003.lz4", "1")
+ ParseLog.main(args)
+ }
+ }
+
+ test("error_invalid_logname") {
+ assertThrows[RuntimeException] {
+ val path = this.getClass.getResource("/").getPath
+ val args: Array[String] = Array(path, "eventlog/parse"
+ , "xxx.lz4")
+ ParseLog.main(args)
+ }
+ }
+
+ test("error_log_not_exist") {
+ assertThrows[FileNotFoundException] {
+ val path = this.getClass.getResource("/").getPath
+ val args: Array[String] = Array(path, "eventlog/parse"
+ , "application_1663257594501_00031.lz4")
+ ParseLog.main(args)
+ }
+ }
+}
diff --git a/omnicache/omnicache-spark-extension/plugin/pom.xml b/omnicache/omnicache-spark-extension/plugin/pom.xml
index 3879b778d28705f732a288b6877f0acea813e340..721879bddbf536cc7e37ca8d63dbd4cec3f93b0e 100644
--- a/omnicache/omnicache-spark-extension/plugin/pom.xml
+++ b/omnicache/omnicache-spark-extension/plugin/pom.xml
@@ -73,6 +73,7 @@
+ ${artifactId}-${version}${dep.os.arch}
target/scala-${scala.binary.version}/classes
target/scala-${scala.binary.version}/test-classes
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/conf/OmniCachePluginConfig.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/conf/OmniCachePluginConfig.scala
index d9fc9b31781400d7069b114ad52e4e955e3cad2b..77cacf0670244d3b0293796cb7b963cd266882b1 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/conf/OmniCachePluginConfig.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/conf/OmniCachePluginConfig.scala
@@ -17,9 +17,13 @@
package com.huawei.boostkit.spark.conf
+import java.util.Locale
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
class OmniCachePluginConfig(conf: SQLConf) {
@@ -48,6 +52,10 @@ class OmniCachePluginConfig(conf: SQLConf) {
.getConfString("spark.sql.omnicache.default.datasource", "orc")
val dataSourceSet: Set[String] = Set("orc", "parquet")
+
+ def logLevel: String = conf
+ .getConfString("spark.sql.omnicache.logLevel", "DEBUG")
+ .toUpperCase(Locale.ROOT)
}
object OmniCachePluginConfig {
@@ -86,4 +94,10 @@ object OmniCachePluginConfig {
val catalogTable = spark.sessionState.catalog.getTableMetadata(mv)
!catalogTable.properties.getOrElse(MV_UPDATE_REWRITE_ENABLED, "true").toBoolean
}
+
+ def isMVInUpdate(viewTablePlan: LogicalPlan): Boolean = {
+ val logicalRelation = viewTablePlan.asInstanceOf[LogicalRelation]
+ !logicalRelation.catalogTable.get
+ .properties.getOrElse(MV_UPDATE_REWRITE_ENABLED, "true").toBoolean
+ }
}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ExprSimplifier.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ExprSimplifier.scala
index 13b1a607d34f6fda8d8bfb8002be9d58175e0602..5cb7d19258cfc56f95ea0f7bebfe4847e566ef40 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ExprSimplifier.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ExprSimplifier.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, OneRowRelation}
import org.apache.spark.sql.types.{BooleanType, DataType, NullType}
-
case class ExprSimplifier(unknownAsFalse: Boolean,
pulledUpPredicates: Set[Expression]) {
@@ -476,14 +475,14 @@ case class ExprSimplifier(unknownAsFalse: Boolean,
for (term <- terms) {
// Excluding self-simplification
if (!term.eq(orOp)) {
- // Simplification between a orExpression and a orExpression.
+ // Simplification between a OrExpression and a OrExpression.
if (term.isInstanceOf[Or]) {
if (containsAllSql(ors, decomposeDisjunctions(term).toSet)) {
terms.-=(orOp)
breaks3.break()
}
} else if (containsSql(ors, term)) {
- // Simplification between a otherExpression and a orExpression.
+ // Simplification between a otherExpression and a OrExpression.
terms.-=(orOp)
breaks3.break()
}
@@ -615,7 +614,7 @@ case class ExprSimplifier(unknownAsFalse: Boolean,
case c@Not(c1@EqualNullSafe(_, Literal.TrueLiteral)) =>
Not(c1.copy(left = child2))
case c =>
- throw new RuntimeException("unSupport type is predict simplify :%s".format(c))
+ throw new RuntimeException("unSupport type is predicate simplify :%s".format(c))
}
return condition2
}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteHelper.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteHelper.scala
index 38b7d40a32402136e4cad14a369e724e3f6539b8..2ef67e5361bb8f6920bce59c8519e53eedfd1c1b 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteHelper.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteHelper.scala
@@ -28,14 +28,14 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
-class RewriteHelper extends PredicateHelper {
+trait RewriteHelper extends PredicateHelper with RewriteLogger {
val SESSION_CATALOG_NAME: String = "spark_catalog"
- val EMPTY_BITMAP: HashBiMap[String, String] = HashBiMap.create[String, String]()
+ val EMPTY_BIMAP: HashBiMap[String, String] = HashBiMap.create[String, String]()
val EMPTY_MAP: Map[ExpressionEqual,
mutable.Set[ExpressionEqual]] = Map[ExpressionEqual, mutable.Set[ExpressionEqual]]()
- val EMPTY_MULTIMAP: Multimap[Int, Int] = ArrayListMultimap.create[Int, Int]
+ val EMPTY_MULTIMAP: Multimap[Int, Int] = ArrayListMultimap.create[Int, Int]()
def mergeConjunctiveExpressions(e: Seq[Expression]): Expression = {
if (e.isEmpty) {
@@ -81,8 +81,9 @@ class RewriteHelper extends PredicateHelper {
for ((project, column) <- topProjectList.zip(viewTablePlan.output)) {
project match {
// only map attr
- case _@Alias(attr@AttributeReference(_, _, _, _), _) =>
- exprIdToQualifier += (column.exprId -> attr)
+ case a@Alias(attr@AttributeReference(_, _, _, _), _) =>
+ exprIdToQualifier += (column.exprId ->
+ attr.copy(name = a.name)(exprId = attr.exprId, qualifier = attr.qualifier))
case a@AttributeReference(_, _, _, _) =>
exprIdToQualifier += (column.exprId -> a)
// skip function
@@ -204,11 +205,11 @@ class RewriteHelper extends PredicateHelper {
(mappedQuery, mappedTables)
}
- def swapTableColumnReferences[T <: Iterable[Expression]](expression: T,
+ def swapTableColumnReferences[T <: Iterable[Expression]](expressions: T,
tableMapping: BiMap[String, String],
columnMapping: Map[ExpressionEqual,
mutable.Set[ExpressionEqual]]): T = {
- var result: T = expression
+ var result: T = expressions
if (!tableMapping.isEmpty) {
result = result.map { expr =>
expr.transform {
@@ -243,28 +244,28 @@ class RewriteHelper extends PredicateHelper {
result
}
- def swapColumnTableReferences[T <: Iterable[Expression]](expression: T,
+ def swapColumnTableReferences[T <: Iterable[Expression]](expressions: T,
tableMapping: BiMap[String, String],
columnMapping: Map[ExpressionEqual,
mutable.Set[ExpressionEqual]]): T = {
- var result = swapTableColumnReferences(expression, EMPTY_BITMAP, columnMapping)
+ var result = swapTableColumnReferences(expressions, EMPTY_BIMAP, columnMapping)
result = swapTableColumnReferences(result, tableMapping, EMPTY_MAP)
result
}
- def swapTableReferences[T <: Iterable[Expression]](expression: T,
+ def swapTableReferences[T <: Iterable[Expression]](expressions: T,
tableMapping: BiMap[String, String]): T = {
- swapTableColumnReferences(expression, tableMapping, EMPTY_MAP)
+ swapTableColumnReferences(expressions, tableMapping, EMPTY_MAP)
}
- def swapColumnReferences[T <: Iterable[Expression]](expression: T,
+ def swapColumnReferences[T <: Iterable[Expression]](expressions: T,
columnMapping: Map[ExpressionEqual,
mutable.Set[ExpressionEqual]]): T = {
- swapTableColumnReferences(expression, EMPTY_BITMAP, columnMapping)
+ swapTableColumnReferences(expressions, EMPTY_BIMAP, columnMapping)
}
}
-object RewriteHelper extends PredicateHelper {
+object RewriteHelper extends PredicateHelper with RewriteLogger {
/**
* Rewrite [[EqualTo]] and [[EqualNullSafe]] operator to keep order. The following cases will be
* equivalent:
@@ -431,6 +432,36 @@ object RewriteHelper extends PredicateHelper {
def disableCachePlugin(): Unit = {
SQLConf.get.setConfString("spark.sql.omnicache.enable", "false")
}
+
+ def checkAttrsValid(logicalPlan: LogicalPlan): Boolean = {
+ logicalPlan.foreachUp {
+ case _: LeafNode =>
+ case plan =>
+ val attributeSets = plan.expressions.map { expression =>
+ AttributeSet.fromAttributeSets(
+ expression.collect {
+ case s: SubqueryExpression =>
+ var res = s.references
+ s.plan.transformAllExpressions {
+ case e@OuterReference(ar) =>
+ res ++= AttributeSet(ar.references)
+ e
+ case e => e
+ }
+ res
+ case e => e.references
+ })
+ }
+ val request = AttributeSet.fromAttributeSets(attributeSets)
+ val input = plan.inputSet
+ val missing = request -- input
+ if (missing.nonEmpty) {
+ logBasedOnLevel("checkAttrsValid failed for missing:%s".format(missing))
+ return false
+ }
+ }
+ true
+ }
}
case class ExpressionEqual(expression: Expression) {
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteLogger.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteLogger.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f2b63f5ca0591b2dd807c20148fd4e4ceb0c0f33
--- /dev/null
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteLogger.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.huawei.boostkit.spark.util
+
+import com.huawei.boostkit.spark.conf.OmniCachePluginConfig
+
+import org.apache.spark.internal.Logging
+
+trait RewriteLogger extends Logging {
+
+ private def logLevel: String = OmniCachePluginConfig.getConf.logLevel
+
+ def logBasedOnLevel(f: => String): Unit = {
+ logLevel match {
+ case "TRACE" => logTrace(f)
+ case "DEBUG" => logDebug(f)
+ case "INFO" => logInfo(f)
+ case "WARN" => logWarning(f)
+ case "ERROR" => logError(f)
+ case _ => logTrace(f)
+ }
+ }
+}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ViewMetadata.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ViewMetadata.scala
index 8074184001d55c8f99ee686ae0499bc35d7f0f47..a3ab16e76d12e1d5a6903a238ae029be8b523486 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ViewMetadata.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ViewMetadata.scala
@@ -17,15 +17,15 @@
package com.huawei.boostkit.spark.util
-import com.google.common.collect.Lists
import com.huawei.boostkit.spark.conf.OmniCachePluginConfig._
import java.util.concurrent.ConcurrentHashMap
-import org.apache.calcite.util.graph._
+import scala.collection.mutable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.optimizer.rules.RewriteTime
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RepartitionByExpression, SubqueryAlias}
object ViewMetadata extends RewriteHelper {
@@ -35,9 +35,7 @@ object ViewMetadata extends RewriteHelper {
val viewToContainsTables = new ConcurrentHashMap[String, Set[TableEqual]]()
- val usesGraph: DirectedGraph[String, DefaultEdge] = DefaultDirectedGraph.create()
-
- var frozenGraph: Graphs.FrozenGraph[String, DefaultEdge] = Graphs.makeImmutable(usesGraph)
+ val tableToViews = new ConcurrentHashMap[String, mutable.Set[String]]()
var spark: SparkSession = _
@@ -52,10 +50,6 @@ object ViewMetadata extends RewriteHelper {
status = STATUS_LOADING
}
- def usesGraphTopologicalOrderIterator: java.lang.Iterable[String] = {
- TopologicalOrderIterator.of[String, DefaultEdge](usesGraph)
- }
-
def saveViewMetadataToMap(catalogTable: CatalogTable): Unit = this.synchronized {
// if QUERY_REWRITE_ENABLED is false, doesn't load ViewMetadata
if (!catalogTable.properties.getOrElse(MV_REWRITE_ENABLED, "false").toBoolean) {
@@ -80,40 +74,58 @@ object ViewMetadata extends RewriteHelper {
// db.table
val tableName = catalogTable.identifier.quotedString
- val viewTablePlan = spark.table(tableName).queryExecution.analyzed match {
- case SubqueryAlias(_, child) => child
- case a@_ => a
+ val viewTablePlan = RewriteTime
+ .withTimeStat("viewTablePlan") {
+ spark.table(tableName).queryExecution.analyzed match {
+ case SubqueryAlias(_, child) => child
+ case a@_ => a
+ }
+ }
+ var viewQueryPlan = RewriteTime
+ .withTimeStat("viewQueryPlan") {
+ spark.sql(viewQuerySql).queryExecution.analyzed
+ }
+ viewQueryPlan = viewQueryPlan match {
+ case RepartitionByExpression(_, child, _) =>
+ child
+ case _ =>
+ viewQueryPlan
}
- val viewQueryPlan = spark.sql(viewQuerySql).queryExecution.analyzed
// reset preDatabase
spark.sessionState.catalogManager.setCurrentNamespace(Array(preDatabase))
// spark_catalog.db.table
val viewName = catalogTable.identifier.toString()
- // mappedViewQueryPlan and mappedViewContainsTable
- val (mappedViewQueryPlan, mappedViewContainsTables) = extractTables(viewQueryPlan)
+ // mappedViewQueryPlan and mappedViewContainsTables
+ val (mappedViewQueryPlan, mappedViewContainsTables) = RewriteTime
+ .withTimeStat("extractTables") {
+ extractTables(viewQueryPlan)
+ }
- usesGraph.addVertex(viewName)
mappedViewContainsTables
.foreach { mappedViewContainsTable =>
val name = mappedViewContainsTable.tableName
- usesGraph.addVertex(name)
- usesGraph.addEdge(name, viewName)
+ val views = tableToViews.getOrDefault(name, mutable.Set.empty)
+ views += viewName
+ tableToViews.put(name, views)
}
// extract view query project's Attr and replace view table's Attr by query project's Attr
// match function is attributeReferenceEqualSimple, by name and data type
// Attr of table cannot used, because same Attr in view query and view table,
// it's table is different.
- val mappedViewTablePlan = mapTablePlanAttrToQuery(viewTablePlan, mappedViewQueryPlan)
+ val mappedViewTablePlan = RewriteTime
+ .withTimeStat("mapTablePlanAttrToQuery") {
+ mapTablePlanAttrToQuery(viewTablePlan, mappedViewQueryPlan)
+ }
viewToContainsTables.put(viewName, mappedViewContainsTables)
viewToViewQueryPlan.putIfAbsent(viewName, mappedViewQueryPlan)
viewToTablePlan.putIfAbsent(viewName, mappedViewTablePlan)
} catch {
case e: Throwable =>
- logDebug(s"Failed to saveViewMetadataToMap. errmsg: ${e.getMessage}")
+ logDebug(s"Failed to saveViewMetadataToMap,errmsg: ${e.getMessage}")
// reset preDatabase
spark.sessionState.catalogManager.setCurrentNamespace(Array(preDatabase))
}
@@ -129,21 +141,19 @@ object ViewMetadata extends RewriteHelper {
def addCatalogTableToCache(table: CatalogTable): Unit = this.synchronized {
saveViewMetadataToMap(table)
- rebuildGraph()
- }
-
- def rebuildGraph(): Unit = {
- frozenGraph = Graphs.makeImmutable(usesGraph)
}
def removeMVCache(tableName: TableIdentifier): Unit = this.synchronized {
val viewName = tableName.toString()
- usesGraph.removeAllVertices(Lists.newArrayList(viewName))
viewToContainsTables.remove(viewName)
viewToViewQueryPlan.remove(viewName)
viewToTablePlan.remove(viewName)
- viewToContainsTables.remove(viewName)
- rebuildGraph()
+ tableToViews.forEach { (key, value) =>
+ if (value.contains(viewName)) {
+ value -= viewName
+ tableToViews.put(key, value)
+ }
+ }
}
def init(sparkSession: SparkSession): Unit = {
@@ -158,14 +168,16 @@ object ViewMetadata extends RewriteHelper {
def forceLoad(): Unit = this.synchronized {
val catalog = spark.sessionState.catalog
- // val db = OmniCachePluginConfig.getConf.OmniCacheDB
// load from all db
for (db <- catalog.listDatabases()) {
- val tables = omniCacheFilter(catalog, db)
- tables.foreach(tableData => saveViewMetadataToMap(tableData))
+ val tables = RewriteTime.withTimeStat("loadTable") {
+ omniCacheFilter(catalog, db)
+ }
+ RewriteTime.withTimeStat("saveViewMetadataToMap") {
+ tables.foreach(tableData => saveViewMetadataToMap(tableData))
+ }
}
- rebuildGraph()
}
def omniCacheFilter(catalog: SessionCatalog,
@@ -176,7 +188,7 @@ object ViewMetadata extends RewriteHelper {
tableData.properties.contains(MV_QUERY_ORIGINAL_SQL)
}
} catch {
- // if db exists a table hive materialized view, will throw annalysis exception
+ // if db exists a table hive materialized view, will throw analysis exception
case e: Throwable =>
logDebug(s"Failed to listTables in $mvDataBase, errmsg: ${e.getMessage}")
Seq.empty[CatalogTable]
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/AbstractMaterializedViewRule.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/AbstractMaterializedViewRule.scala
index df26d192fb00e1597723f3b978afb4754bb45ed7..7d398c2cf07a952fb9bea27a9792a6e394b044e2 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/AbstractMaterializedViewRule.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/AbstractMaterializedViewRule.scala
@@ -49,7 +49,9 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
if (ViewMetadata.status == ViewMetadata.STATUS_LOADING) {
return finalPlan
}
- ViewMetadata.init(sparkSession)
+ RewriteTime.withTimeStat("viewMetadata") {
+ ViewMetadata.init(sparkSession)
+ }
// 1.check query sql is match current rule
if (ViewMetadata.isEmpty || !isValidPlan(plan)) {
return finalPlan
@@ -60,8 +62,10 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
val (queryExpr, queryTables) = extractTables(finalPlan)
// 3.use all tables to fetch views(may match) from ViewMetaData
- val candidateViewPlans = getApplicableMaterializations(queryTables.map(t => t.tableName))
- .filter(x => !OmniCachePluginConfig.isMVInUpdate(sparkSession, x._1))
+ val candidateViewPlans = RewriteTime.withTimeStat("getApplicableMaterializations") {
+ getApplicableMaterializations(queryTables.map(t => t.tableName))
+ .filter(x => !OmniCachePluginConfig.isMVInUpdate(x._2))
+ }
if (candidateViewPlans.isEmpty) {
return finalPlan
}
@@ -112,16 +116,21 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
val (newViewTablePlan, newViewQueryPlan, newTopViewProject) = newViewPlans.get
viewTablePlan = newViewTablePlan
viewQueryPlan = newViewQueryPlan
+ if (newTopViewProject.isEmpty) {
+ viewQueryExpr = newViewQueryPlan
+ }
topViewProject = newTopViewProject
}
// 4.5.extractPredictExpressions from viewQueryPlan and mappedQueryPlan
- val queryPredictExpression = extractPredictExpressions(queryExpr, EMPTY_BITMAP)
+ val queryPredictExpression = RewriteTime.withTimeStat("extractPredictExpressions") {
+ extractPredictExpressions(queryExpr, EMPTY_BIMAP)
+ }
val viewProjectList = extractTopProjectList(viewQueryExpr)
val viewTableAttrs = viewTablePlan.output
- // 4.6.if a table emps used >=2 times in s sql (query and view)
+ // 4.6.if a table emps used >=2 times in a sql (query and view)
// we should try the combination,switch the seq
// view:SELECT V1.locationid,V2.empname FROM emps V1 JOIN emps V2
// ON V1.deptno='1' AND V2.deptno='2' AND V1.empname = V2.empname;
@@ -132,25 +141,31 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
flatListMappings.foreach { queryToViewTableMapping =>
mappingLoop.breakable {
val inverseTableMapping = queryToViewTableMapping.inverse()
- val viewPredictExpression = extractPredictExpressions(viewQueryExpr,
- inverseTableMapping)
+ val viewPredictExpression = RewriteTime.withTimeStat("extractPredictExpressions") {
+ extractPredictExpressions(viewQueryExpr,
+ inverseTableMapping)
+ }
// 4.7.compute compensationPredicates between viewQueryPlan and queryPlan
- var newViewTablePlan = computeCompensationPredicates(viewTablePlan,
- queryPredictExpression, viewPredictExpression, inverseTableMapping,
- viewPredictExpression._1.getEquivalenceClassesMap,
- viewProjectList, viewTableAttrs)
- // 4.8.compensationPredicates isEmpty, because view's row data cannot satify query
+ var newViewTablePlan = RewriteTime.withTimeStat("computeCompensationPredicates") {
+ computeCompensationPredicates(viewTablePlan,
+ queryPredictExpression, viewPredictExpression, inverseTableMapping,
+ viewPredictExpression._1.getEquivalenceClassesMap,
+ viewProjectList, viewTableAttrs)
+ }
+ // 4.8.compensationPredicates isEmpty, because view's row data cannot satisfy query
if (newViewTablePlan.isEmpty) {
mappingLoop.break()
}
// 4.9.use viewTablePlan(join compensated), query project,
// compensationPredicts to rewrite final plan
- newViewTablePlan = rewriteView(newViewTablePlan.get, viewQueryExpr,
- queryExpr, inverseTableMapping,
- queryPredictExpression._1.getEquivalenceClassesMap,
- viewProjectList, viewTableAttrs)
+ newViewTablePlan = RewriteTime.withTimeStat("rewriteView") {
+ rewriteView(newViewTablePlan.get, viewQueryExpr,
+ queryExpr, inverseTableMapping,
+ queryPredictExpression._1.getEquivalenceClassesMap,
+ viewProjectList, viewTableAttrs)
+ }
if (newViewTablePlan.isEmpty) {
mappingLoop.break()
}
@@ -165,13 +180,19 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
}
/**
- * cehck plan if match current rule
+ * check plan if match current rule
*
* @param logicalPlan LogicalPlan
* @return true:matched ; false:unMatched
*/
def isValidPlan(logicalPlan: LogicalPlan): Boolean
+ /**
+ * basic check for all rule
+ *
+ * @param logicalPlan LogicalPlan
+ * @return true:matched ; false:unMatched
+ */
def isValidLogicalPlan(logicalPlan: LogicalPlan): Boolean = {
logicalPlan.foreach {
case _: LogicalRelation =>
@@ -199,19 +220,19 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
LogicalPlan, LogicalPlan)] = {
// viewName, viewTablePlan, viewQueryPlan
var viewPlans = Seq.empty[(String, LogicalPlan, LogicalPlan)]
+ val viewNames = mutable.Set.empty[String]
// 1.topological iterate graph
- ViewMetadata.usesGraphTopologicalOrderIterator.forEach { viewName =>
- // 2.check this node is mv
- if (ViewMetadata.viewToTablePlan.containsKey(viewName)
- // 3.iterate mv used tables and check edge of (table,mv) in graph
- && usesTable(viewName, ViewMetadata
- .viewToContainsTables.get(viewName), ViewMetadata.frozenGraph)) {
- // 4.add plan info
- val viewQueryPlan = ViewMetadata.viewToViewQueryPlan.get(viewName)
- val viewTablePlan = ViewMetadata.viewToTablePlan.get(viewName)
- viewPlans +:= (viewName, viewTablePlan, viewQueryPlan)
+ tableNames.foreach { tableName =>
+ if (ViewMetadata.tableToViews.containsKey(tableName)) {
+ viewNames ++= ViewMetadata.tableToViews.get(tableName)
}
}
+ viewNames.foreach { viewName =>
+ // 4.add plan info
+ val viewQueryPlan = ViewMetadata.viewToViewQueryPlan.get(viewName)
+ val viewTablePlan = ViewMetadata.viewToTablePlan.get(viewName)
+ viewPlans +:= (viewName, viewTablePlan, viewQueryPlan)
+ }
viewPlans
}
@@ -239,7 +260,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
* @param viewTablePlan viewTablePlan
* @param viewQueryPlan viewQueryPlan
* @param topViewProject topViewProject
- * @param needTables needTables
+ * @param needTables need join compensate tables
* @return join compensated viewTablePlan
*/
def compensateViewPartial(viewTablePlan: LogicalPlan,
@@ -248,6 +269,14 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
needTables: Set[TableEqual]):
Option[(LogicalPlan, LogicalPlan, Option[Project])] = None
+ /**
+ * We map every table in the query to a table with the same qualified
+ * name (all query tables are contained in the view, thus this is equivalent
+ * to mapping every table in the query to a view table).
+ *
+ * @param queryTables queryTables
+ * @return
+ */
def generateTableMappings(queryTables: Set[TableEqual]): Seq[BiMap[String, String]] = {
val multiMapTables: Multimap[String, String] = ArrayListMultimap.create()
for (t1 <- queryTables) {
@@ -269,7 +298,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
}
// continue
} else {
- // Multiple reference: flatten
+ // Multiple references: flatten
val newResult: ImmutableList.Builder[BiMap[String, String]] = ImmutableList.builder()
t2s.forEach { target =>
result.forEach { m =>
@@ -284,6 +313,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
}
}
JavaConverters.asScalaIteratorConverter(result.iterator()).asScala.toSeq
+ .sortWith((map1, map2) => map1.toString < map2.toString)
}
/**
@@ -346,10 +376,17 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
Some(compensationPredicate)
}
+ /**
+ * extractPossibleMapping {queryEquivalenceClasses:[contained viewEquivalenceClasses]}
+ *
+ * @param queryEquivalenceClasses queryEquivalenceClasses
+ * @param viewEquivalenceClasses viewEquivalenceClasses
+ * @return {queryEquivalenceClasses:[contained viewEquivalenceClasses]}
+ */
def extractPossibleMapping(queryEquivalenceClasses: List[mutable.Set[ExpressionEqual]],
viewEquivalenceClasses: List[mutable.Set[ExpressionEqual]]): Option[Multimap[Int, Int]] = {
// extractPossibleMapping {queryEquivalenceClasses:[contained viewEquivalenceClasses]}
- // query:c1=c2=c3=c4 view:c1=c2 ,c3=c4
+ // query:c1=c2=c3=c4 view:c1=c2 , c3=c4
val mapping = ArrayListMultimap.create[Int, Int]()
val breakLoop = new Breaks
@@ -381,6 +418,12 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
Some(mapping)
}
+ /**
+ *
+ * @param queryExpression queryExpression
+ * @param viewExpression viewExpression
+ * @return compensate Expression
+ */
def splitFilter(queryExpression: Expression, viewExpression: Expression): Option[Expression] = {
// 1.canonicalize expression,main for reorder
val queryExpression2 = RewriteHelper.canonicalize(ExprSimplifier.simplify(queryExpression))
@@ -422,7 +465,6 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
None
}
-
/**
* split expression by or,then compute compensation
*
@@ -434,7 +476,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
val queries = ExprOptUtil.disjunctions(queryExpression)
val views = ExprOptUtil.disjunctions(viewExpression)
- // 1.compare difference which queries residue
+ // 1.compute difference which queries residue
val difference = queries.map(ExpressionEqual) -- views.map(ExpressionEqual)
// 2.1.queries equal to views,just return true
@@ -533,9 +575,8 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
true
}
-
/**
- * compute compensationPredicts between viewQueryPlan and mappedQueryPlan
+ * compute compensationPredicates between viewQueryPlan and mappedQueryPlan
*
* @param viewTablePlan viewTablePlan
* @param queryPredict queryPredict
@@ -594,18 +635,6 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
if (columnsEquiPredictsResult.isEmpty) {
return None
}
- val viewTableAttrSet = viewTableAttrs.map(ExpressionEqual).toSet
- columnsEquiPredictsResult.get.foreach { expr =>
- expr.foreach {
- case attr: AttributeReference =>
- if (!viewTableAttrSet.contains(ExpressionEqual(attr))) {
- logDebug(s"attr:%s cannot found in view:%s"
- .format(attr, OmniCachePluginConfig.getConf.curMatchMV))
- return None
- }
- case _ =>
- }
- }
// 5.rewrite rangeCompensation,residualCompensation by viewTableAttrs
val otherPredictsResult = rewriteExpressions(Seq(compensationRangePredicts.get,
@@ -627,10 +656,10 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
* @param swapTableColumn true:swapTableColumn;false:swapColumnTable
* @param tableMapping tableMapping
* @param columnMapping columnMapping
- * @param viewProjectList viewProjectList
+ * @param viewProjectList viewProjectList/viewAggExpression
* @param viewTableAttrs viewTableAttrs
* @tparam T T <: Iterable[Expression]
- * @return rewriteExprs
+ * @return rewritedExprs
*/
def rewriteExpressions[T <: Iterable[Expression]](
exprsToRewrite: T, swapTableColumn: Boolean,
@@ -642,7 +671,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
val swapProjectList = if (swapTableColumn) {
swapTableColumnReferences(viewProjectList, tableMapping, columnMapping)
} else {
- swapColumnTableReferences(viewProjectList, tableMapping, columnMapping)
+ swapTableColumnReferences(viewProjectList, tableMapping, columnMapping)
}
val swapTableAttrs = swapTableReferences(viewTableAttrs, tableMapping)
@@ -665,12 +694,12 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
}.asInstanceOf[T]
// 4.iterate result and dfs check every AttributeReference in ViewTableAttributeReference
- val viewTableAttrSet = swapTableAttrs.map(ExpressionEqual).toSet
+ val viewTableAttrsSet = swapTableAttrs.map(_.exprId).toSet
result.foreach { expr =>
expr.foreach {
case attr: AttributeReference =>
- if (!viewTableAttrSet.contains(ExpressionEqual(attr))) {
- logDebug(s"attr:%s cannot found in view:%s"
+ if (!viewTableAttrsSet.contains(attr.exprId)) {
+ logBasedOnLevel(s"attr:%s cannot found in view:%s"
.format(attr, OmniCachePluginConfig.getConf.curMatchMV))
return None
}
@@ -680,7 +709,6 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
Some(result)
}
-
/**
* if the rewrite expression exprId != origin expression exprId,
* replace by Alias(rewrite expression,origin.name)(exprId=origin.exprId)
@@ -706,6 +734,16 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
/**
* replace and alias expression or attr by viewTableAttr
+ *
+ * @param exprsToRewrite exprsToRewrite
+ * @param swapTableColumn true:swapTableColumn;false:swapColumnTable
+ * @param tableMapping tableMapping
+ * @param columnMapping columnMapping
+ * @param viewProjectList viewProjectList/viewAggExpression
+ * @param viewTableAttrs viewTableAttrs
+ * @param originExpressions originExpressions
+ * @tparam T T <: Iterable[Expression]
+ * @return rewrited and alias expression
*/
def rewriteAndAliasExpressions[T <: Iterable[Expression]](
exprsToRewrite: T, swapTableColumn: Boolean,
@@ -742,5 +780,4 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession)
columnMapping: Map[ExpressionEqual, mutable.Set[ExpressionEqual]],
viewProjectList: Seq[Expression], viewTableAttrs: Seq[Attribute]):
Option[LogicalPlan]
-
}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MVRewriteRule.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MVRewriteRule.scala
index 98989c460b240765365b9870078a6679beb4e010..ba0c17f227485169f817532807f85ee1e16ef764 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MVRewriteRule.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MVRewriteRule.scala
@@ -17,19 +17,23 @@
package org.apache.spark.sql.catalyst.optimizer.rules
+import com.fasterxml.jackson.annotation.JsonIgnore
import com.huawei.boostkit.spark.conf.OmniCachePluginConfig
-import com.huawei.boostkit.spark.util.RewriteHelper
+import com.huawei.boostkit.spark.util.{RewriteHelper, RewriteLogger}
import scala.collection.mutable
import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.OmniCacheCreateMvCommand
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.kvstore.KVIndex
-class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] with Logging {
+class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] with RewriteLogger {
val omniCacheConf: OmniCachePluginConfig = OmniCachePluginConfig.getConf
val joinRule = new MaterializedViewJoinRule(session)
@@ -55,6 +59,8 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] with Loggin
def tryRewritePlan(plan: LogicalPlan): LogicalPlan = {
val usingMvs = mutable.Set.empty[String]
+ RewriteTime.clear()
+ val rewriteStartSecond = System.currentTimeMillis()
val res = plan.transformDown {
case p: Project =>
joinRule.perform(Some(p), p.child, usingMvs)
@@ -66,23 +72,71 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] with Loggin
RewriteHelper.extractAllAttrsFromExpression(a.aggregateExpressions).toSeq, a.child)
val rewritedChild = joinRule.perform(Some(child), child.child, usingMvs)
if (rewritedChild != child) {
- rewritedPlan = a.copy(child = rewritedChild)
+ val projectChild = rewritedChild.asInstanceOf[Project]
+ rewritedPlan = a.copy(child = Project(
+ projectChild.projectList ++ projectChild.child.output, projectChild.child))
}
}
rewritedPlan
case p => p
}
if (usingMvs.nonEmpty) {
+ RewriteTime.withTimeStat("checkAttrsValid") {
+ if (!RewriteHelper.checkAttrsValid(res)) {
+ return plan
+ }
+ }
val sql = session.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
val mvs = usingMvs.mkString(";").replaceAll("`", "")
- val log = "logicalPlan MVRewrite success,using materialized view:[%s],original sql:%s"
- .format(mvs, sql)
- logDebug(log)
+ val costSecond = (System.currentTimeMillis() - rewriteStartSecond).toString
+ val log = ("logicalPlan MVRewrite success," +
+ "using materialized view:[%s],cost %s milliseconds,original sql:%s")
+ .format(mvs, costSecond, sql)
+ logBasedOnLevel(log)
session.sparkContext.listenerBus.post(SparkListenerMVRewriteSuccess(sql, mvs))
}
+ RewriteTime.statFromStartTime("total", rewriteStartSecond)
+ logBasedOnLevel(RewriteTime.timeStat.toString())
res
}
}
+@DeveloperApi
case class SparkListenerMVRewriteSuccess(sql: String, usingMvs: String) extends SparkListenerEvent {
+ @JsonIgnore
+ @KVIndex
+ def id: String = (System.currentTimeMillis() + "%s%s".format(sql, usingMvs).hashCode).toString
+}
+
+class MVRewriteSuccessListener(
+ kvStore: ElementTrackingStore) extends SparkListener with Logging {
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case _: SparkListenerMVRewriteSuccess =>
+ kvStore.write(event)
+ case _ =>
+ }
+ }
+}
+
+object RewriteTime {
+ val timeStat: mutable.Map[String, Long] = mutable.HashMap[String, Long]()
+
+ def statFromStartTime(key: String, startTime: Long): Unit = {
+ timeStat += (key -> (timeStat.getOrElse(key, 0L) + System.currentTimeMillis() - startTime))
+ }
+
+ def clear(): Unit = {
+ timeStat.clear()
+ }
+
+ def withTimeStat[T](key: String)(f: => T): T = {
+ val startTime = System.currentTimeMillis()
+ try {
+ f
+ } finally {
+ statFromStartTime(key, startTime)
+ }
+ }
}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRule.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRule.scala
index fe82c4744b05fb70e0f1f03d4356221b930e7dee..6dda8689106d62c13603ea2d40ff544d9be9b079 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRule.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRule.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
class MaterializedViewAggregateRule(sparkSession: SparkSession)
extends AbstractMaterializedViewRule(sparkSession: SparkSession) {
/**
- * cehck plan if match current rule
+ * check plan if match current rule
*
* @param logicalPlan LogicalPlan
* @return true:matched ; false:unMatched
@@ -43,7 +43,6 @@ class MaterializedViewAggregateRule(sparkSession: SparkSession)
logicalPlan.children.forall(isValidLogicalPlan)
}
-
/**
* queryTableInfo!=viewTableInfo , need do join compensate
*
@@ -208,7 +207,6 @@ class MaterializedViewAggregateRule(sparkSession: SparkSession)
// 4.rewrite and alias queryAggExpressions
// if the rewrite expression exprId != origin expression exprId,
// replace by Alias(rewrite expression,origin.name)(exprId=origin.exprId)
-
val rewritedQueryAggExpressions = rewriteAndAliasExpressions(newQueryAggExpressions,
swapTableColumn = true, tableMapping, columnMapping,
viewProjectList, viewTableAttrs, queryAgg.aggregateExpressions)
@@ -240,5 +238,4 @@ class MaterializedViewAggregateRule(sparkSession: SparkSession)
qualifier = qualifier, explicitMetadata = alias.explicitMetadata,
nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys)
}
-
}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRule.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRule.scala
index 15717fa168107be193a0e3a020b9646f9593df0a..5c7c477dda7bf2f03dca11d212222b7fcf75c829 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRule.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRule.scala
@@ -22,14 +22,14 @@ import com.huawei.boostkit.spark.util.{ExpressionEqual, TableEqual}
import scala.collection.mutable
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
-import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical._
class MaterializedViewJoinRule(sparkSession: SparkSession)
extends AbstractMaterializedViewRule(sparkSession: SparkSession) {
/**
- * cehck plan if match current rule
+ * check plan if match current rule
*
* @param logicalPlan LogicalPlan
* @return true:matched ; false:unMatched
@@ -38,7 +38,6 @@ class MaterializedViewJoinRule(sparkSession: SparkSession)
isValidLogicalPlan(logicalPlan)
}
-
/**
* queryTableInfo!=viewTableInfo , need do join compensate
*
@@ -66,11 +65,20 @@ class MaterializedViewJoinRule(sparkSession: SparkSession)
topViewProject.get
}
+ var projectList: Seq[NamedExpression] = newViewQueryPlan match {
+ case p: Project =>
+ p.projectList
+ case _ =>
+ newViewQueryPlan.output
+ }
+
needTables.foreach { needTable =>
newViewQueryPlan = Join(newViewQueryPlan, needTable.logicalPlan,
Inner, None, JoinHint.NONE)
+ projectList ++= needTable.logicalPlan.output
}
- Some(newViewTablePlan, viewQueryPlan, None)
+ newViewQueryPlan = Project(projectList, newViewQueryPlan)
+ Some(newViewTablePlan, newViewQueryPlan, None)
}
/**
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionAstBuilder.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionAstBuilder.scala
index a4168024488e088be045989b25fc8ffed11a21d1..414afa602b5e5a4244ba9ca5d3fa4002e39574ad 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionAstBuilder.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionAstBuilder.scala
@@ -35,6 +35,11 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
+/**
+ * Parse extended AST to LogicalPlan
+ *
+ * @param delegate Spark default ParserInterface
+ */
class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterface)
extends OmniCacheSqlExtensionsBaseVisitor[AnyRef] with SQLConfHelper with Logging {
@@ -61,8 +66,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac
case Seq(mv) => (None, mv)
case Seq(database, mv) => (Some(database), mv)
case _ => throw new AnalysisException(
- "The mv name is not valid: %s".format(identifier.mkString("."))
- )
+ "The mv name is not valid: %s".format(identifier.mkString(".")))
}
try {
@@ -102,8 +106,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac
case Seq(mv) => (None, mv)
case Seq(database, mv) => (Some(database), mv)
case _ => throw new AnalysisException(
- "The mv name is not valid: %s".format(tableIdent.mkString("."))
- )
+ "The mv name is not valid: %s".format(tableIdent.mkString(".")))
}
val tableIdentifier = TableIdentifier(name, databaseName)
@@ -119,7 +122,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac
throw new RuntimeException("cannot refresh a table with refresh mv")
}
- // preserver preDatabase and set curDatabase
+ // preserve preDatabase and set curDatabase
val preDatabase = spark.catalog.currentDatabase
val curDatabase = catalogTable.properties.getOrElse(MV_QUERY_ORIGINAL_SQL_CUR_DB, "")
if (curDatabase.isEmpty) {
@@ -179,6 +182,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac
RewriteHelper.enableCachePlugin()
throw e
} finally {
+ // reset preDatabase
spark.sessionState.catalogManager.setCurrentNamespace(Array(preDatabase))
}
}
@@ -221,8 +225,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac
case Seq(database, mv) => DropMaterializedViewCommand(
TableIdentifier(mv, Some(database)),
ifExists.isDefined,
- purge = true
- )
+ purge = true)
case _ => throw new AnalysisException(
"The mv name is not valid: %s".format(identifier.mkString(".")))
}
@@ -297,6 +300,9 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac
visit(ctx.statement).asInstanceOf[LogicalPlan]
}
+ /**
+ * alias tuple2
+ */
type OmniCacheHeader = (Seq[String], Boolean)
/**
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionSqlParser.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionSqlParser.scala
index 65077216a9a48bdedfa159593165ade7b46101a5..aeab8423244438dbcca481edbbdd8196f992ba30 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionSqlParser.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionSqlParser.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
-import org.antlr.v4.runtime.{CharStreams, CommonTokenStream}
+import org.antlr.v4.runtime._
import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.ParseCancellationException
diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/execution/command/OmniCacheCommand.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/execution/command/OmniCacheCommand.scala
index 801fffbd23c887242cc197bcc147a77d8dd486ba..c053b625236f6a731d175c5ebc8de7abd5c10ef9 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/execution/command/OmniCacheCommand.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/execution/command/OmniCacheCommand.scala
@@ -54,6 +54,7 @@ case class OmniCacheCreateMvCommand(
partitioning: Seq[String],
query: LogicalPlan,
outputColumnNames: Seq[String]) extends DataWritingCommand {
+
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
try {
ViewMetadata.init(sparkSession)
@@ -84,7 +85,7 @@ case class OmniCacheCreateMvCommand(
throw new AnalysisException(
s"Materialized View $tableIdentWithDB already exists. You need to drop it first")
} else {
- // Since the table already exists and the save mode is Ignore,we will just return.
+ // Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty
}
} else {
@@ -102,10 +103,11 @@ case class OmniCacheCreateMvCommand(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
- // provider (for example,see org.apache.spark.sql.parquet.DefaultSource).
+ // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
schema = tableSchema)
// Table location is already validated. No need to check it again during table creation.
sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
+
result match {
case _: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
@@ -153,10 +155,19 @@ case class OmniCacheCreateMvCommand(
}
}
+/**
+ * Drops a materialized view from the metastore and removes it if it is cached.
+ *
+ * The syntax of this command is:
+ * {{{
+ * DROP MATERIALIZED VIEW [IF EXISTS] view_name;
+ * }}}
+ */
case class DropMaterializedViewCommand(
tableName: TableIdentifier,
ifExists: Boolean,
purge: Boolean) extends RunnableCommand {
+
override def run(sparkSession: SparkSession): Seq[Row] = {
ViewMetadata.init(sparkSession)
val catalog = sparkSession.sessionState.catalog
@@ -201,13 +212,18 @@ case class DropMaterializedViewCommand(
}
}
+/**
+ * ShowMaterializedViewCommand RunnableCommand
+ *
+ */
case class ShowMaterializedViewCommand(
databaseName: Option[String],
tableIdentifierPattern: Option[String]) extends RunnableCommand {
+ // The result of SHOW MaterializedView has three basic columns:
+ // database, tableName and originalSql.
override val output: Seq[Attribute] = {
val tableExtendedInfo = Nil
-
AttributeReference("database", StringType, nullable = false)() ::
AttributeReference("mvName", StringType, nullable = false)() ::
AttributeReference("rewriteEnable", StringType, nullable = false)() ::
@@ -254,6 +270,7 @@ case class ShowMaterializedViewCommand(
case class AlterRewriteMaterializedViewCommand(
tableName: TableIdentifier,
enableRewrite: Boolean) extends RunnableCommand {
+
override def run(sparkSession: SparkSession): Seq[Row] = {
ViewMetadata.init(sparkSession)
val catalog = sparkSession.sessionState.catalog
@@ -379,7 +396,6 @@ case class RefreshMaterializedViewCommand(
}
if (doInsertion) {
-
def refreshUpdatedPartitions(updatedPartitionPaths: Set[String]): Unit = {
val updatedPartitions = updatedPartitionPaths.map(PartitioningUtils.parsePathFragment)
if (partitionsTrackedByCatalog) {
@@ -446,7 +462,6 @@ case class RefreshMaterializedViewCommand(
if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
}
-
} else {
logInfo("Skipping insertion into a relation that already exists.")
}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/resources/tpcds_ddl.sql b/omnicache/omnicache-spark-extension/plugin/src/test/resources/tpcds_ddl.sql
new file mode 100644
index 0000000000000000000000000000000000000000..ae18bd4b0fc19c6b88821d3dbc53dd31240e91e9
--- /dev/null
+++ b/omnicache/omnicache-spark-extension/plugin/src/test/resources/tpcds_ddl.sql
@@ -0,0 +1,479 @@
+create table if not exists store_sales(
+ ss_sold_date_sk bigint
+, ss_sold_time_sk bigint
+, ss_item_sk bigint
+, ss_customer_sk bigint
+, ss_cdemo_sk bigint
+, ss_hdemo_sk bigint
+, ss_addr_sk bigint
+, ss_store_sk bigint
+, ss_promo_sk bigint
+, ss_ticket_number bigint
+, ss_quantity int
+, ss_wholesale_cost decimal(7,2)
+, ss_list_price decimal(7,2)
+, ss_sales_price decimal(7,2)
+, ss_ext_discount_amt decimal(7,2)
+, ss_ext_sales_price decimal(7,2)
+, ss_ext_wholesale_cost decimal(7,2)
+, ss_ext_list_price decimal(7,2)
+, ss_ext_tax decimal(7,2)
+, ss_coupon_amt decimal(7,2)
+, ss_net_paid decimal(7,2)
+, ss_net_paid_inc_tax decimal(7,2)
+, ss_net_profit decimal(7,2)
+);
+
+create table if not exists store_returns(
+ sr_returned_date_sk bigint
+, sr_return_time_sk bigint
+, sr_item_sk bigint
+, sr_customer_sk bigint
+, sr_cdemo_sk bigint
+, sr_hdemo_sk bigint
+, sr_addr_sk bigint
+, sr_store_sk bigint
+, sr_reason_sk bigint
+, sr_ticket_number bigint
+, sr_return_quantity int
+, sr_return_amt decimal(7,2)
+, sr_return_tax decimal(7,2)
+, sr_return_amt_inc_tax decimal(7,2)
+, sr_fee decimal(7,2)
+, sr_return_ship_cost decimal(7,2)
+, sr_refunded_cash decimal(7,2)
+, sr_reversed_charge decimal(7,2)
+, sr_store_credit decimal(7,2)
+, sr_net_loss decimal(7,2)
+);
+
+create table if not exists catalog_sales(
+ cs_sold_date_sk bigint
+, cs_sold_time_sk bigint
+, cs_ship_date_sk bigint
+, cs_bill_customer_sk bigint
+, cs_bill_cdemo_sk bigint
+, cs_bill_hdemo_sk bigint
+, cs_bill_addr_sk bigint
+, cs_ship_customer_sk bigint
+, cs_ship_cdemo_sk bigint
+, cs_ship_hdemo_sk bigint
+, cs_ship_addr_sk bigint
+, cs_call_center_sk bigint
+, cs_catalog_page_sk bigint
+, cs_ship_mode_sk bigint
+, cs_warehouse_sk bigint
+, cs_item_sk bigint
+, cs_promo_sk bigint
+, cs_order_number bigint
+, cs_quantity int
+, cs_wholesale_cost decimal(7,2)
+, cs_list_price decimal(7,2)
+, cs_sales_price decimal(7,2)
+, cs_ext_discount_amt decimal(7,2)
+, cs_ext_sales_price decimal(7,2)
+, cs_ext_wholesale_cost decimal(7,2)
+, cs_ext_list_price decimal(7,2)
+, cs_ext_tax decimal(7,2)
+, cs_coupon_amt decimal(7,2)
+, cs_ext_ship_cost decimal(7,2)
+, cs_net_paid decimal(7,2)
+, cs_net_paid_inc_tax decimal(7,2)
+, cs_net_paid_inc_ship decimal(7,2)
+, cs_net_paid_inc_ship_tax decimal(7,2)
+, cs_net_profit decimal(7,2)
+);
+
+create table if not exists catalog_returns(
+ cr_returned_date_sk bigint
+, cr_returned_time_sk bigint
+, cr_item_sk bigint
+, cr_refunded_customer_sk bigint
+, cr_refunded_cdemo_sk bigint
+, cr_refunded_hdemo_sk bigint
+, cr_refunded_addr_sk bigint
+, cr_returning_customer_sk bigint
+, cr_returning_cdemo_sk bigint
+, cr_returning_hdemo_sk bigint
+, cr_returning_addr_sk bigint
+, cr_call_center_sk bigint
+, cr_catalog_page_sk bigint
+, cr_ship_mode_sk bigint
+, cr_warehouse_sk bigint
+, cr_reason_sk bigint
+, cr_order_number bigint
+, cr_return_quantity int
+, cr_return_amount decimal(7,2)
+, cr_return_tax decimal(7,2)
+, cr_return_amt_inc_tax decimal(7,2)
+, cr_fee decimal(7,2)
+, cr_return_ship_cost decimal(7,2)
+, cr_refunded_cash decimal(7,2)
+, cr_reversed_charge decimal(7,2)
+, cr_store_credit decimal(7,2)
+, cr_net_loss decimal(7,2)
+);
+
+create table if not exists web_sales(
+ ws_sold_date_sk bigint
+, ws_sold_time_sk bigint
+, ws_ship_date_sk bigint
+, ws_item_sk bigint
+, ws_bill_customer_sk bigint
+, ws_bill_cdemo_sk bigint
+, ws_bill_hdemo_sk bigint
+, ws_bill_addr_sk bigint
+, ws_ship_customer_sk bigint
+, ws_ship_cdemo_sk bigint
+, ws_ship_hdemo_sk bigint
+, ws_ship_addr_sk bigint
+, ws_web_page_sk bigint
+, ws_web_site_sk bigint
+, ws_ship_mode_sk bigint
+, ws_warehouse_sk bigint
+, ws_promo_sk bigint
+, ws_order_number bigint
+, ws_quantity int
+, ws_wholesale_cost decimal(7,2)
+, ws_list_price decimal(7,2)
+, ws_sales_price decimal(7,2)
+, ws_ext_discount_amt decimal(7,2)
+, ws_ext_sales_price decimal(7,2)
+, ws_ext_wholesale_cost decimal(7,2)
+, ws_ext_list_price decimal(7,2)
+, ws_ext_tax decimal(7,2)
+, ws_coupon_amt decimal(7,2)
+, ws_ext_ship_cost decimal(7,2)
+, ws_net_paid decimal(7,2)
+, ws_net_paid_inc_tax decimal(7,2)
+, ws_net_paid_inc_ship decimal(7,2)
+, ws_net_paid_inc_ship_tax decimal(7,2)
+, ws_net_profit decimal(7,2)
+);
+
+create table if not exists web_returns(
+ wr_returned_date_sk bigint
+, wr_returned_time_sk bigint
+, wr_item_sk bigint
+, wr_refunded_customer_sk bigint
+, wr_refunded_cdemo_sk bigint
+, wr_refunded_hdemo_sk bigint
+, wr_refunded_addr_sk bigint
+, wr_returning_customer_sk bigint
+, wr_returning_cdemo_sk bigint
+, wr_returning_hdemo_sk bigint
+, wr_returning_addr_sk bigint
+, wr_web_page_sk bigint
+, wr_reason_sk bigint
+, wr_order_number bigint
+, wr_return_quantity int
+, wr_return_amt decimal(7,2)
+, wr_return_tax decimal(7,2)
+, wr_return_amt_inc_tax decimal(7,2)
+, wr_fee decimal(7,2)
+, wr_return_ship_cost decimal(7,2)
+, wr_refunded_cash decimal(7,2)
+, wr_reversed_charge decimal(7,2)
+, wr_account_credit decimal(7,2)
+, wr_net_loss decimal(7,2)
+);
+
+create table if not exists inventory(
+ inv_date_sk bigint
+, inv_item_sk bigint
+, inv_warehouse_sk bigint
+, inv_quantity_on_hand int
+);
+create table if not exists store(
+ s_store_sk bigint
+, s_store_id char(16)
+, s_rec_start_date date
+, s_rec_end_date date
+, s_closed_date_sk bigint
+, s_store_name varchar(50)
+, s_number_employees int
+, s_floor_space int
+, s_hours char(20)
+, S_manager varchar(40)
+, S_market_id int
+, S_geography_class varchar(100)
+, S_market_desc varchar(100)
+, s_market_manager varchar(40)
+, s_division_id int
+, s_division_name varchar(50)
+, s_company_id int
+, s_company_name varchar(50)
+, s_street_number varchar(10)
+, s_street_name varchar(60)
+, s_street_type char(15)
+, s_suite_number char(10)
+, s_city varchar(60)
+, s_county varchar(30)
+, s_state char(2)
+, s_zip char(10)
+, s_country varchar(20)
+, s_gmt_offset decimal(5,2)
+, s_tax_percentage decimal(5,2)
+);
+create table if not exists call_center(
+ cc_call_center_sk bigint
+, cc_call_center_id char(16)
+, cc_rec_start_date date
+, cc_rec_end_date date
+, cc_closed_date_sk bigint
+, cc_open_date_sk bigint
+, cc_name varchar(50)
+, cc_class varchar(50)
+, cc_employees int
+, cc_sq_ft int
+, cc_hours char(20)
+, cc_manager varchar(40)
+, cc_mkt_id int
+, cc_mkt_class char(50)
+, cc_mkt_desc varchar(100)
+, cc_market_manager varchar(40)
+, cc_division int
+, cc_division_name varchar(50)
+, cc_company int
+, cc_company_name char(50)
+, cc_street_number char(10)
+, cc_street_name varchar(60)
+, cc_street_type char(15)
+, cc_suite_number char(10)
+, cc_city varchar(60)
+, cc_county varchar(30)
+, cc_state char(2)
+, cc_zip char(10)
+, cc_country varchar(20)
+, cc_gmt_offset decimal(5,2)
+, cc_tax_percentage decimal(5,2)
+);
+create table if not exists catalog_page(
+ cp_catalog_page_sk bigint
+, cp_catalog_page_id char(16)
+, cp_start_date_sk bigint
+, cp_end_date_sk bigint
+, cp_department varchar(50)
+, cp_catalog_number int
+, cp_catalog_page_number int
+, cp_description varchar(100)
+, cp_type varchar(100)
+);
+create table if not exists web_site(
+ web_site_sk bigint
+, web_site_id char(16)
+, web_rec_start_date date
+, web_rec_end_date date
+, web_name varchar(50)
+, web_open_date_sk bigint
+, web_close_date_sk bigint
+, web_class varchar(50)
+, web_manager varchar(40)
+, web_mkt_id int
+, web_mkt_class varchar(50)
+, web_mkt_desc varchar(100)
+, web_market_manager varchar(40)
+, web_company_id int
+, web_company_name char(50)
+, web_street_number char(10)
+, web_street_name varchar(60)
+, web_street_type char(15)
+, web_suite_number char(10)
+, web_city varchar(60)
+, web_county varchar(30)
+, web_state char(2)
+, web_zip char(10)
+, web_country varchar(20)
+, web_gmt_offset decimal(5,2)
+, web_tax_percentage decimal(5,2)
+);
+create table if not exists web_page(
+ wp_web_page_sk bigint
+, wp_web_page_id char(16)
+, wp_rec_start_date date
+, wp_rec_end_date date
+, wp_creation_date_sk bigint
+, wp_access_date_sk bigint
+, wp_autogen_flag char(1)
+, wp_customer_sk bigint
+, wp_url varchar(100)
+, wp_type char(50)
+, wp_char_count int
+, wp_link_count int
+, wp_image_count int
+, wp_max_ad_count int
+);
+create table if not exists warehouse(
+ w_warehouse_sk bigint
+, w_warehouse_id char(16)
+, w_warehouse_name varchar(20)
+, w_warehouse_sq_ft int
+, w_street_number char(10)
+, w_street_name varchar(60)
+, w_street_type char(15)
+, w_suite_number char(10)
+, w_city varchar(60)
+, w_county varchar(30)
+, w_state char(2)
+, w_zip char(10)
+, w_country varchar(20)
+, w_gmt_offset decimal(5,2)
+);
+create table if not exists customer(
+ c_customer_sk bigint
+, c_customer_id char(16)
+, c_current_cdemo_sk bigint
+, c_current_hdemo_sk bigint
+, c_current_addr_sk bigint
+, c_first_shipto_date_sk bigint
+, c_first_sales_date_sk bigint
+, c_salutation char(10)
+, c_first_name char(20)
+, c_last_name char(30)
+, c_preferred_cust_flag char(1)
+, c_birth_day int
+, c_birth_month int
+, c_birth_year int
+, c_birth_country varchar(20)
+, c_login char(13)
+, c_email_address char(50)
+, c_last_review_date_sk bigint
+);
+create table if not exists customer_address(
+ ca_address_sk bigint
+, ca_address_id char(16)
+, ca_street_number char(10)
+, ca_street_name varchar(60)
+, ca_street_type char(15)
+, ca_suite_number char(10)
+, ca_city varchar(60)
+, ca_county varchar(30)
+, ca_state char(2)
+, ca_zip char(10)
+, ca_country varchar(20)
+, ca_gmt_offset decimal(5,2)
+, ca_location_type char(20)
+);
+create table if not exists customer_demographics(
+ cd_demo_sk bigint
+, cd_gender char(1)
+, cd_marital_status char(1)
+, cd_education_status char(20)
+, cd_purchase_estimate int
+, cd_credit_rating char(10)
+, cd_dep_count int
+, cd_dep_employed_count int
+, cd_dep_college_count int
+);
+create table if not exists date_dim(
+ d_date_sk bigint
+, d_date_id char(16)
+, d_date date
+, d_month_seq int
+, d_week_seq int
+, d_quarter_seq int
+, d_year int
+, d_dow int
+, d_moy int
+, d_dom int
+, d_qoy int
+, d_fy_year int
+, d_fy_quarter_seq int
+, d_fy_week_seq int
+, d_day_name char(9)
+, d_quarter_name char(6)
+, d_holiday char(1)
+, d_weekend char(1)
+, d_following_holiday char(1)
+, d_first_dom int
+, d_last_dom int
+, d_same_day_ly int
+, d_same_day_lq int
+, d_current_day char(1)
+, d_current_week char(1)
+, d_current_month char(1)
+, d_current_quarter char(1)
+, d_current_year char(1)
+);
+create table if not exists household_demographics(
+ hd_demo_sk bigint
+, hd_income_band_sk bigint
+, hd_buy_potential char(15)
+, hd_dep_count int
+, hd_vehicle_count int
+);
+create table if not exists item(
+ i_item_sk bigint
+, i_item_id char(16)
+, i_rec_start_date date
+, i_rec_end_date date
+, i_item_desc varchar(200)
+, i_current_price decimal(7,2)
+, i_wholesale_cost decimal(7,2)
+, i_brand_id int
+, i_brand char(50)
+, i_class_id int
+, i_class char(50)
+, i_category_id int
+, i_category char(50)
+, i_manufact_id int
+, i_manufact char(50)
+, i_size char(20)
+, i_formulation char(20)
+, i_color char(20)
+, i_units char(10)
+, i_container char(10)
+, i_manager_id int
+, i_product_name char(50)
+);
+create table if not exists income_band(
+ ib_income_band_sk bigint
+, ib_lower_bound int
+, ib_upper_bound int
+);
+create table if not exists promotion(
+ p_promo_sk bigint
+, p_promo_id char(16)
+, p_start_date_sk bigint
+, p_end_date_sk bigint
+, p_item_sk bigint
+, p_cost decimal(15,2)
+, p_response_target int
+, p_promo_name char(50)
+, p_channel_dmail char(1)
+, p_channel_email char(1)
+, p_channel_catalog char(1)
+, p_channel_tv char(1)
+, p_channel_radio char(1)
+, p_channel_press char(1)
+, p_channel_event char(1)
+, p_channel_demo char(1)
+, p_channel_details varchar(100)
+, p_purpose char(15)
+, p_discount_active char(1)
+);
+create table if not exists reason(
+ r_reason_sk bigint
+, r_reason_id char(16)
+, r_reason_desc char(100)
+);
+create table if not exists ship_mode(
+ sm_ship_mode_sk bigint
+, sm_ship_mode_id char(16)
+, sm_type char(30)
+, sm_code char(10)
+, sm_carrier char(20)
+, sm_contract char(20)
+);
+create table if not exists time_dim(
+ t_time_sk bigint
+, t_time_id char(16)
+, t_time int
+, t_hour int
+, t_minute int
+, t_second int
+, t_am_pm char(2)
+, t_shift char(20)
+, t_sub_shift char(20)
+, t_meal_time char(20)
+);
\ No newline at end of file
diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRuleSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRuleSuite.scala
index 23a7831287f13f3651fa51c453c5305cb4b63939..a173eba2f0bd11f56a6dccdd6297813279e28c52 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRuleSuite.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRuleSuite.scala
@@ -162,7 +162,7 @@ class MaterializedViewAggregateRuleSuite extends RewriteSuite {
}
test("mv_agg3_3") {
- // group by column(is empty) is subset of view,,additional agg on view column
+ // group by column(is empty) is subset of view,additional agg on view column
val sql =
"""
|SELECT sum(c.integertype) as _sum,
@@ -232,8 +232,112 @@ class MaterializedViewAggregateRuleSuite extends RewriteSuite {
comparePlansAndRows(sql, "default", "mv_agg4", noData = false)
}
+ test("mv_agg4_3") {
+ // group by column,agg is same to view, join more
+ val sql =
+ """
+ |SELECT c.empid,c.deptno,c.locationid,sum(c.integertype) as _sum,
+ |max(c.longtype) as _max,min(c.floattype) as _min,
+ |count(c.doubletype) as _count,count(distinct c.datetype) as _count_distinct,
+ |avg(c.decimaltype) as _avg
+ |FROM column_type c JOIN emps e JOIN locations l
+ |ON c.empid=e.empid
+ |AND c.locationid=l.locationid
+ |AND c.empid=1
+ |GROUP BY c.empid,c.deptno,c.locationid;
+ |""".stripMargin
+ comparePlansAndRows(sql, "default", "mv_agg4", noData = false)
+ }
+
test("mv_agg4_disable") {
val sql = "ALTER MATERIALIZED VIEW mv_agg4 DISABLE REWRITE;"
spark.sql(sql).show()
}
+
+ test("mv_agg5") {
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_agg5;
+ |""".stripMargin
+ )
+ spark.sql(
+ """
+ |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_agg5
+ |AS
+ |SELECT c.empid,c.deptno,c.locationid,sum(c.integertype) as _sum,
+ |max(c.longtype) as _max,min(c.floattype) as _min,
+ |count(c.doubletype) as _count,count(distinct c.datetype) as _count_distinct,
+ |avg(c.decimaltype) as _avg
+ |FROM column_type c JOIN emps e
+ |ON c.empid=e.empid
+ |AND c.empid=1
+ |GROUP BY c.empid,c.deptno,c.locationid;
+ |""".stripMargin
+ )
+ }
+
+ test("mv_agg5_1") {
+ val sql =
+ """
+ |SELECT c.empid,c.deptno,sum(c.integertype) as _sum,
+ |max(c.longtype) as _max,min(c.floattype) as _min,
+ |count(c.doubletype) as _count
+ |FROM column_type c JOIN emps e
+ |ON c.empid=e.empid
+ |AND c.empid=1
+ |GROUP BY c.empid,c.deptno;
+ |""".stripMargin
+ comparePlansAndRows(sql, "default", "mv_agg5", noData = false)
+ }
+
+ test("mv_agg5_2") {
+ val sql =
+ """
+ |SELECT c.empid,c.deptno,count(distinct c.datetype) as _count_distinct
+ |FROM column_type c JOIN emps e
+ |ON c.empid=e.empid
+ |AND c.empid=1
+ |GROUP BY c.empid,c.deptno;
+ |""".stripMargin
+ comparePlansAndRows(sql, "default", "mv_agg5", noData = false)
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_agg5;
+ |""".stripMargin
+ )
+ }
+
+ test("mv_agg6") {
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_agg6;
+ |""".stripMargin
+ )
+ spark.sql(
+ """
+ |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_agg6
+ |AS
+ |SELECT c.empid,c.deptno,c.locationid,sum(distinct c.integertype) as _sum,
+ |max(distinct c.longtype) as _max,min(distinct c.floattype) as _min,
+ |count(distinct c.doubletype) as _count
+ |FROM column_type c
+ |GROUP BY c.empid,c.deptno,c.locationid;
+ |""".stripMargin
+ )
+
+ val sql =
+ """
+ |SELECT c.empid,c.deptno,sum(distinct c.integertype) as _sum,
+ |max(distinct c.longtype) as _max,min(distinct c.floattype) as _min,
+ |count(distinct c.doubletype) as _count
+ |FROM column_type c
+ |GROUP BY c.empid,c.deptno;
+ |""".stripMargin
+ comparePlansAndRows(sql, "default", "mv_agg6", noData = false)
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_agg6;
+ |""".stripMargin
+ )
+ }
}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewFilterRuleSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewFilterRuleSuite.scala
index 473d3fde12dce5e5e3a8710f727a53c2b28ee001..f34e2b74e2bbdadeca072d6a0166cb030e3433b0 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewFilterRuleSuite.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewFilterRuleSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.optimizer.rules
+import com.huawei.boostkit.spark.util.RewriteHelper
+
class MaterializedViewFilterRuleSuite extends RewriteSuite {
test("mv_filter1") {
@@ -273,7 +275,100 @@ class MaterializedViewFilterRuleSuite extends RewriteSuite {
}
test("mv_filter2_disable") {
- val sql = "ALTER MATERIALIZED VIEW mv_filter1 DISABLE REWRITE;"
+ val sql = "ALTER MATERIALIZED VIEW mv_filter2 DISABLE REWRITE;"
spark.sql(sql).show()
}
+
+ test("mv_filter_rand") {
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_filter_rand;
+ |""".stripMargin
+ )
+ spark.sql(
+ """
+ |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_filter_rand
+ |AS
+ |SELECT * FROM COLUMN_TYPE WHERE doubletype=rand(1)
+ |""".stripMargin
+ )
+ val sql =
+ """
+ |SELECT * FROM COLUMN_TYPE WHERE doubletype=rand()
+ |""".stripMargin
+ compareNotRewriteAndRows(sql, noData = true)
+
+ RewriteHelper.enableCachePlugin()
+ val sql2 =
+ """
+ |SELECT * FROM COLUMN_TYPE WHERE doubletype=rand(1)
+ |""".stripMargin
+ comparePlansAndRows(sql2, "default", "mv_filter_rand", noData = true)
+
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_filter_rand;
+ |""".stripMargin
+ )
+ }
+
+ test("mv_filter_if") {
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_filter_if;
+ |""".stripMargin
+ )
+ spark.sql(
+ """
+ |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_filter_if
+ |AS
+ |SELECT e.empid,e.deptno,if(e.deptno<2,e.empid,e.deptno) as _if FROM emps e;
+ |""".stripMargin
+ )
+ val sql = "SELECT if(e.deptno<3,e.empid,e.deptno) as _if FROM emps e"
+ comparePlansAndRows(sql, "default", "mv_filter_if", noData = false)
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_filter_if;
+ |""".stripMargin
+ )
+ }
+
+ test("mv_filter3") {
+ // or
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_filter3;
+ |""".stripMargin
+ )
+
+ spark.sql(
+ """
+ |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_filter3
+ |AS
+ |SELECT * FROM COLUMN_TYPE WHERE empid=3
+ |OR bytetype>1 OR shorttype<5 OR integertype>=1 OR longtype<=5
+ |OR floattype in(3.0) OR doubletype not in(2.0)
+ |OR datetype between DATE '2021-01-01' and DATE '2023-01-01'
+ |OR stringtype='stringtype3'
+ |OR timestamptype is not null OR decimaltype is null;
+ |""".stripMargin
+ )
+
+ val sql =
+ """
+ |SELECT * FROM COLUMN_TYPE WHERE empid=3
+ |OR bytetype>1 OR shorttype<5 OR integertype>=1 OR longtype<=5
+ |OR floattype in(3.0) OR doubletype not in(2.0)
+ |OR datetype between DATE '2021-01-01' and DATE '2023-01-01'
+ |OR stringtype='stringtype3'
+ |OR timestamptype is not null OR decimaltype is null;
+ |""".stripMargin
+ comparePlansAndRows(sql, "default", "mv_filter3", noData = false)
+ spark.sql(
+ """
+ |DROP MATERIALIZED VIEW IF EXISTS mv_filter3;
+ |""".stripMargin
+ )
+ }
}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRuleSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRuleSuite.scala
index df7a1f2559908636068ee5fe6d0a1f1b8f1e3e32..da9e4faf2961fd039181e9e449d044efcded889b 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRuleSuite.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRuleSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.optimizer.rules
+import com.huawei.boostkit.spark.util.RewriteHelper
+
class MaterializedViewJoinRuleSuite extends RewriteSuite {
test("mv_join1") {
@@ -150,13 +152,15 @@ class MaterializedViewJoinRuleSuite extends RewriteSuite {
// view tables is same to query, equal tables
val sql =
"""
- |SELECT e.*,c1.stringtype
+ |SELECT e.*,c2.stringtype
|FROM emps e JOIN column_type c1 JOIN column_type c2
|ON e.deptno=c1.deptno AND e.deptno=c2.deptno
|AND c1.deptno!=2
|AND c2.deptno=1;
|""".stripMargin
comparePlansAndRows(sql, "default", "mv_join2", noData = false)
+ RewriteHelper.enableCachePlugin()
+ comparePlansAndRows(sql, "default", "mv_join2", noData = false)
}
test("mv_join2_disable") {
diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/RewriteSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/RewriteSuite.scala
index 8ddc9dac1602892fa37eaef57d06ac529c93e272..e913b59ac4329cbd4a9765a20264cc4869f8d5dc 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/RewriteSuite.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/RewriteSuite.scala
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-
package org.apache.spark.sql.catalyst.optimizer.rules
import com.huawei.boostkit.spark.conf.OmniCachePluginConfig
@@ -44,6 +43,7 @@ class RewriteSuite extends SparkFunSuite with PredicateHelper {
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.ui.port", "4050")
// .config("spark.sql.planChangeLog.level","WARN")
+ .config("spark.sql.omnicache.logLevel", "WARN")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/TpcdsSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/TpcdsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..42adf96cce46b18647958a63928638673a00fa05
--- /dev/null
+++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/TpcdsSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer.rules
+
+import org.apache.commons.io.IOUtils
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+
+class TpcdsSuite extends RewriteSuite {
+
+ def createTable(): Unit = {
+ if (catalog.tableExists(TableIdentifier("store_sales"))) {
+ return
+ }
+ val fis = this.getClass.getResourceAsStream("/tpcds_ddl.sql")
+ val lines = IOUtils.readLines(fis, "UTF-8")
+ IOUtils.closeQuietly(fis)
+
+ var sqls = Seq.empty[String]
+ val sql = mutable.StringBuilder.newBuilder
+ lines.forEach { line =>
+ sql.append(line)
+ sql.append(" ")
+ if (line.contains(';')) {
+ sqls +:= sql.toString()
+ sql.clear()
+ }
+ }
+ sqls.foreach { sql =>
+ spark.sql(sql)
+ }
+ }
+
+ createTable()
+
+ test("subQuery outReference") {
+ spark.sql("DROP MATERIALIZED VIEW IF EXISTS mv536")
+ spark.sql(
+ """
+ |CREATE MATERIALIZED VIEW IF NOT EXISTS mv536 PARTITIONED BY (ws_sold_date_sk) AS
+ | SELECT
+ | web_sales.ws_ext_discount_amt,
+ | item.i_item_sk,
+ | web_sales.ws_sold_date_sk,
+ | web_sales.ws_item_sk,
+ | item.i_manufact_id
+ |FROM
+ | web_sales,
+ | item
+ |WHERE
+ | item.i_manufact_id = 350
+ | AND web_sales.ws_item_sk = item.i_item_sk
+ |distribute by ws_sold_date_sk;
+ |""".stripMargin
+ )
+ val sql =
+ """
+ |SELECT sum(ws_ext_discount_amt) AS `Excess Discount Amount `
+ |FROM web_sales, item, date_dim
+ |WHERE i_manufact_id = 350
+ | AND i_item_sk = ws_item_sk
+ | AND d_date BETWEEN '2000-01-27' AND (cast('2000-01-27' AS DATE) + INTERVAL 90 days)
+ | AND d_date_sk = ws_sold_date_sk
+ | AND ws_ext_discount_amt >
+ | (
+ | SELECT 1.3 * avg(ws_ext_discount_amt)
+ | FROM web_sales, date_dim
+ | WHERE ws_item_sk = i_item_sk
+ | AND d_date BETWEEN '2000-01-27' AND (cast('2000-01-27' AS DATE) + INTERVAL 90 days)
+ | AND d_date_sk = ws_sold_date_sk
+ | )
+ |ORDER BY sum(ws_ext_discount_amt)
+ |LIMIT 100
+ |
+ |""".stripMargin
+ compareNotRewriteAndRows(sql, noData = true)
+ spark.sql("DROP MATERIALIZED VIEW IF EXISTS mv536")
+ }
+}
diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlParserSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlParserSuite.scala
index 7b40f3111d1b50cf43c3c92287c0d4930977ecf9..273197c91f3a572c246f645bfc7411a6caa51c68 100644
--- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlParserSuite.scala
+++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlParserSuite.scala
@@ -560,6 +560,7 @@ class SqlParserSuite extends RewriteSuite {
}
test("mv_is_cached") {
+ spark.sql("DROP MATERIALIZED VIEW IF EXISTS mv_create2;")
spark.sql(
"""
|ALTER MATERIALIZED VIEW default.mv_create1 ENABLE REWRITE
@@ -568,10 +569,10 @@ class SqlParserSuite extends RewriteSuite {
).show()
val sql1 = "SELECT * FROM column_type"
comparePlansAndRows(sql1, "default", "mv_create1", noData = false)
-
+ RewriteHelper.enableCachePlugin()
spark.sql(
"""
- |ALTER MATERIALIZED VIEW default.mv_create1 ENABLE REWRITE
+ |ALTER MATERIALIZED VIEW default.mv_create1 DISABLE REWRITE
|;
|""".stripMargin
).show()
diff --git a/omnicache/omnicache-spark-extension/pom.xml b/omnicache/omnicache-spark-extension/pom.xml
index 1e068b0af35e0d623bc2023af4742be9658548fc..28712374061c30dc3b86ffa62244d1c851fd807e 100644
--- a/omnicache/omnicache-spark-extension/pom.xml
+++ b/omnicache/omnicache-spark-extension/pom.xml
@@ -12,6 +12,7 @@
plugin
+ log-parser
BoostKit Spark MaterializedView Sql Engine Extension Parent Pom
@@ -34,6 +35,7 @@
3.1.2
1.4.11
8.29
+