diff --git a/omnicache/omnicache-spark-extension/build.sh b/omnicache/omnicache-spark-extension/build.sh index ad436cddd524c22781e0c934c675085ff0537516..6c2bb474492f1649f4370ddd4f74fd3e7548aae0 100644 --- a/omnicache/omnicache-spark-extension/build.sh +++ b/omnicache/omnicache-spark-extension/build.sh @@ -1,2 +1,3 @@ #!/bin/bash -mvn clean package \ No newline at end of file +cpu_name="-"$(lscpu | grep Architecture | awk '{print $2}') +mvn clean package -Ddep.os.arch="${cpu_name}" \ No newline at end of file diff --git a/omnicache/omnicache-spark-extension/log-parser/pom.xml b/omnicache/omnicache-spark-extension/log-parser/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..75f93ca239dde0b92a2ea43e91d56e56444b9560 --- /dev/null +++ b/omnicache/omnicache-spark-extension/log-parser/pom.xml @@ -0,0 +1,240 @@ + + + + + com.huawei.kunpeng + boostkit-omnicache-spark-parent + 3.1.1-1.0.0 + + 4.0.0 + + boostkit-omnicache-logparser-spark + jar + 3.1.1-1.0.0 + + log-parser + + + 14.0.1 + + + + + com.huawei.kunpeng + boostkit-omnicache-spark + 3.1.1-1.0.0 + + + com.google.guava + guava + ${guava.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + + + org.apache.spark + spark-catalyst_${scala.binary.version} + + + org.apache.spark + spark-hive_${scala.binary.version} + + + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + test + + + org.apache.hadoop + hadoop-client + + + org.apache.curator + curator-recipes + + + + + junit + junit + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scala-lang + scala-library + + + + + ${artifactId}-${version}${dep.os.arch} + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.antlr + antlr4-maven-plugin + + + + antlr4 + + + + + true + src/main/antlr4 + true + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${java.version} + ${java.version} + + + + compile + + compile + + + + + + net.alchim31.maven + scala-maven-plugin + + ${scala.recompile.mode} + + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.scalastyle + scalastyle-maven-plugin + + false + true + true + false + ${project.basedir}/src/main/scala + ${project.basedir}/src/test/scala + ${user.dir}/scalastyle-config.xml + ${project.basedir}/target/scalastyle-output.xml + ${project.build.sourceEncoding} + ${project.reporting.outputEncoding} + + + + + check + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + false + true + + ${project.basedir}/src/main/java + ${project.basedir}/src/main/scala + + + ${project.basedir}/src/test/java + ${project.basedir}/src/test/scala + + dev/checkstyle.xml + ${project.basedir}/target/checkstyle-output.xml + ${project.build.sourceEncoding} + ${project.reporting.outputEncoding} + + + + com.puppycrawl.tools + checkstyle + ${puppycrawl.checkstyle.version} + + + + + + check + + + + + + + + org.scalatest + scalatest-maven-plugin + + false + + ${project.build.directory}/surefire-reports + . + TestSuite.txt + + + + test + + test + + + + + + org.scoverage + scoverage-maven-plugin + + + test + test + + report + + + + + true + true + ${project.build.sourceEncoding} + true + + + + + \ No newline at end of file diff --git a/omnicache/omnicache-spark-extension/log-parser/src/main/scala/org/apache/spark/deploy/history/LogsParser.scala b/omnicache/omnicache-spark-extension/log-parser/src/main/scala/org/apache/spark/deploy/history/LogsParser.scala new file mode 100644 index 0000000000000000000000000000000000000000..fa44697206d909229320a6075bd37d2129848785 --- /dev/null +++ b/omnicache/omnicache-spark-extension/log-parser/src/main/scala/org/apache/spark/deploy/history/LogsParser.scala @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.ServiceLoader +import java.util.regex.Pattern + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.DefaultFormats +import org.json4s.jackson.Json +import scala.collection.JavaConverters.iterableAsScalaIterableConverter +import scala.collection.mutable +import scala.util.control.Breaks + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Status.ASYNC_TRACKING_ENABLED +import org.apache.spark.scheduler.ReplayListenerBus +import org.apache.spark.scheduler.ReplayListenerBus.{ReplayEventsFilter, SELECT_ALL_FILTER} +import org.apache.spark.sql.catalyst.optimizer.rules._ +import org.apache.spark.sql.execution.ui._ +import org.apache.spark.status._ +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} + +class LogsParser(conf: SparkConf, eventLogDir: String, outPutDir: String) extends Logging { + + private val LINE_SEPARATOR = "\n" + private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + // Visible for testing + private[history] val fs: FileSystem = new Path(eventLogDir).getFileSystem(hadoopConf) + + /** + * parseAppHistoryLog + * + * @param appId appId + * @param fileName fileName + */ + def parseAppHistoryLog(appId: String, fileName: String): Unit = { + val inMemoryStore = new InMemoryStore() + val reader = EventLogFileReader(fs, new Path(eventLogDir, appId)) + rebuildAppStore(inMemoryStore, reader.get) + + val sqlStatusStore = new SQLAppStatusStore(inMemoryStore) + val mvRewriteSuccessInfo = getMVRewriteSuccessInfo(inMemoryStore) + + // create OutputDir + if (!fs.exists(new Path(outPutDir))) { + fs.mkdirs(new Path(outPutDir)) + } + + // continue for curLoop + val curLoop = new Breaks + + var jsons = Seq.empty[Map[String, String]] + sqlStatusStore.executionsList().foreach { execution => + curLoop.breakable { + // skip unNormal execution + val isRunning = execution.completionTime.isEmpty || + execution.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING } + val isFailed = execution + .jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } + if (isRunning || isFailed) { + curLoop.break() + } + + val uiData = execution + val executionId = uiData.executionId + val planDesc = uiData.physicalPlanDescription + val query = uiData.description + var mvs = mvRewriteSuccessInfo.getOrElse(query, "") + if (mvs.nonEmpty) { + mvs.split(";").foreach { mv => + if (!planDesc.contains(mv)) { + mvs = "" + } + } + } + + // write dot + val graph: SparkPlanGraph = sqlStatusStore.planGraph(executionId) + sqlStatusStore.planGraph(executionId) + val metrics = sqlStatusStore.executionMetrics(executionId) + val node = getNodeInfo(graph) + + val jsonMap = Map( + "executionId" -> executionId.toString, + "original query" -> query, + "materialized views" -> mvs, + "physical plan" -> planDesc, + "dot metrics" -> graph.makeDotFile(metrics), + "node metrics" -> node) + jsons :+= jsonMap + } + } + // write json file into hdfs + val jsonFile: String = Json(DefaultFormats).write(jsons) + writeFile(fs, outPutDir + "/" + fileName + ".json", jsonFile) + } + + /** + * getMVRewriteSuccessInfo + * + * @param store KVStore + * @return {sql:mvs} + */ + def getMVRewriteSuccessInfo(store: KVStore): mutable.Map[String, String] = { + val infos = mutable.Map.empty[String, String] + try { + // The ApplicationInfo may not be available when Spark is starting up. + Utils.tryWithResource( + store.view(classOf[SparkListenerMVRewriteSuccess]) + .closeableIterator() + ) { it => + while (it.hasNext) { + val info = it.next() + infos += (info.sql -> info.usingMvs) + } + } + } catch { + case e: NoSuchElementException => + logWarning("getMVRewriteSuccessInfo is failed for ", e) + } + infos + } + + /** + * getNodeInfo from graph + * + * @param graph SparkPlanGraph + * @return NodeInfo + */ + def getNodeInfo(graph: SparkPlanGraph): String = { + // write node + val tmpContext = new StringBuilder + tmpContext.append("[PlanMetric]") + nextLine(tmpContext) + graph.allNodes.foreach { node => + tmpContext.append(s"id:${node.id} name:${node.name} desc:${node.desc}") + nextLine(tmpContext) + node.metrics.foreach { metric => + metric.toString + tmpContext.append("SQLPlanMetric(") + tmpContext.append(metric.name) + tmpContext.append(",") + if (metric.metricType == "timing") { + tmpContext.append(s"${metric.accumulatorId * 1000000} ns, ") + } else if (metric.metricType == "nsTiming") { + tmpContext.append(s"${metric.accumulatorId} ns, ") + } else { + tmpContext.append(s"${metric.accumulatorId}, ") + } + tmpContext.append(metric.metricType) + tmpContext.append(")") + nextLine(tmpContext) + } + nextLine(tmpContext) + nextLine(tmpContext) + nextLine(tmpContext) + } + + graph.edges.foreach { edges => + tmpContext.append(edges.makeDotEdge) + nextLine(tmpContext) + } + + tmpContext.append("[SubGraph]") + nextLine(tmpContext) + graph.allNodes.foreach { + case cluster: SparkPlanGraphCluster => + tmpContext.append(s"cluster ${cluster.id} : ") + for (i <- cluster.nodes.indices) { + tmpContext.append(s"${cluster.nodes(i).id} ") + } + nextLine(tmpContext) + case node => + } + nextLine(tmpContext) + tmpContext.toString() + } + + def nextLine(context: StringBuilder): Unit = { + context.append(LINE_SEPARATOR) + } + + /** + * rebuildAppStore + * + * @param store KVStore + * @param reader EventLogFileReader + */ + private[spark] def rebuildAppStore(store: KVStore, reader: EventLogFileReader): Unit = { + // Disable async updates, since they cause higher memory usage, and it's ok to take longer + // to parse the event logs in the SHS. + val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false) + val trackingStore = new ElementTrackingStore(store, replayConf) + val replayBus = new ReplayListenerBus() + val listener = new AppStatusListener(trackingStore, replayConf, false) + replayBus.addListener(listener) + replayBus.addListener(new MVRewriteSuccessListener(trackingStore)) + + for { + plugin <- loadPlugins() + listener <- plugin.createListeners(conf, trackingStore) + } replayBus.addListener(listener) + + try { + val eventLogFiles = reader.listEventLogFiles + + // parse event log + parseAppEventLogs(eventLogFiles, replayBus, !reader.completed) + trackingStore.close(false) + } catch { + case e: Exception => + Utils.tryLogNonFatalError { + trackingStore.close() + } + throw e + } + } + + /** + * loadPlugins + * + * @return Plugins + */ + private def loadPlugins(): Iterable[AppHistoryServerPlugin] = { + val plugins = ServiceLoader.load(classOf[AppHistoryServerPlugin], + Utils.getContextOrSparkClassLoader).asScala + plugins + } + + /** + * parseAppEventLogs + * + * @param logFiles Seq[FileStatus] + * @param replayBus ReplayListenerBus + * @param maybeTruncated Boolean + * @param eventsFilter ReplayEventsFilter + */ + private def parseAppEventLogs(logFiles: Seq[FileStatus], + replayBus: ReplayListenerBus, + maybeTruncated: Boolean, + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { + // stop replaying next log files if ReplayListenerBus indicates some error or halt + var continueReplay = true + logFiles.foreach { file => + if (continueReplay) { + Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath, fs)) { in => + continueReplay = replayBus.replay(in, file.getPath.toString, + maybeTruncated = maybeTruncated, eventsFilter = eventsFilter) + } + } + } + } + + /** + * write parsed logInfo to logPath + * + * @param fs FileSystem + * @param logPath logPath + * @param context logInfo + */ + private def writeFile(fs: FileSystem, logPath: String, context: String): Unit = { + val os = fs.create(new Path(logPath)) + os.write(context.getBytes()) + os.close() + } +} + +/* +arg0: spark.eventLog.dir, eg. hdfs://server1:9000/spark2-history +arg1: output dir in hdfs, eg. hdfs://server1:9000/logParser +arg2: log file to be parsed, eg. application_1646816941391_0115.lz4 + */ +object ParseLog extends Logging { + def main(args: Array[String]): Unit = { + if (args == null || args.length != 3) { + throw new RuntimeException("input params is invalid,such as below\n" + + "arg0: spark.eventLog.dir, eg. hdfs://server1:9000/spark2-history\n" + + "arg1: output dir in hdfs, eg. hdfs://server1:9000/logParser\n" + + "arg2: log file to be parsed, eg. application_1646816941391_0115.lz4\n") + } + val sparkEventLogDir = args(0) + val outputDir = args(1) + val logName = args(2) + + val conf = new SparkConf + // spark.eventLog.dir + conf.set("spark.eventLog.dir", sparkEventLogDir) + // spark.eventLog.compress + conf.set("spark.eventLog.compress", "true") + // fs.hdfs.impl + conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem") + val logParser = new LogsParser(conf, sparkEventLogDir, outputDir) + + // file pattern + val regex = "application_[0-9]+._[0-9]+.lz4" + val pattern = Pattern.compile(regex) + val matcher = pattern.matcher(logName) + if (matcher.find) { + val appId = matcher.group + logParser.parseAppHistoryLog(appId, logName) + } else { + throw new RuntimeException(logName + " is illegal") + } + } +} diff --git a/omnicache/omnicache-spark-extension/log-parser/src/test/resources/application_1663257594501_0003.lz4 b/omnicache/omnicache-spark-extension/log-parser/src/test/resources/application_1663257594501_0003.lz4 new file mode 100644 index 0000000000000000000000000000000000000000..f2b568905c4a786b9eec43dc8c5f05d1eee68e18 Binary files /dev/null and b/omnicache/omnicache-spark-extension/log-parser/src/test/resources/application_1663257594501_0003.lz4 differ diff --git a/omnicache/omnicache-spark-extension/log-parser/src/test/scala/org/apache/spark/deploy/history/LogsParserSuite.scala b/omnicache/omnicache-spark-extension/log-parser/src/test/scala/org/apache/spark/deploy/history/LogsParserSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..d686b0e379726bda722f48b681dfb90e2cd828f2 --- /dev/null +++ b/omnicache/omnicache-spark-extension/log-parser/src/test/scala/org/apache/spark/deploy/history/LogsParserSuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{FileInputStream, FileNotFoundException} + +import org.apache.commons.io.IOUtils +import org.json4s.DefaultFormats +import org.json4s.jackson.Json + +import org.apache.spark.SparkFunSuite + +class LogsParserSuite extends SparkFunSuite { + + test("parse") { + val path = this.getClass.getResource("/").getPath + val args: Array[String] = Array(path, "eventlog/parse" + , "application_1663257594501_0003.lz4") + ParseLog.main(args) + val fis = new FileInputStream("eventlog/parse/application_1663257594501_0003.lz4.json") + val lines = IOUtils.readLines(fis, "UTF-8") + IOUtils.closeQuietly(fis) + val json: Seq[Map[String, String]] = Json(DefaultFormats) + .read[Seq[Map[String, String]]](lines.get(0)) + assert(json.exists(map => + map.contains("materialized views") && + map("physical plan").contains(map("materialized views")) + )) + } + + test("error_invalid_param") { + assertThrows[RuntimeException] { + val path = this.getClass.getResource("/").getPath + val args: Array[String] = Array(path, "eventlog/parse") + ParseLog.main(args) + } + assertThrows[RuntimeException] { + val path = this.getClass.getResource("/").getPath + val args: Array[String] = Array(path, "eventlog/parse" + , "application_1663257594501_0003.lz4", "1") + ParseLog.main(args) + } + } + + test("error_invalid_logname") { + assertThrows[RuntimeException] { + val path = this.getClass.getResource("/").getPath + val args: Array[String] = Array(path, "eventlog/parse" + , "xxx.lz4") + ParseLog.main(args) + } + } + + test("error_log_not_exist") { + assertThrows[FileNotFoundException] { + val path = this.getClass.getResource("/").getPath + val args: Array[String] = Array(path, "eventlog/parse" + , "application_1663257594501_00031.lz4") + ParseLog.main(args) + } + } +} diff --git a/omnicache/omnicache-spark-extension/plugin/pom.xml b/omnicache/omnicache-spark-extension/plugin/pom.xml index 3879b778d28705f732a288b6877f0acea813e340..721879bddbf536cc7e37ca8d63dbd4cec3f93b0e 100644 --- a/omnicache/omnicache-spark-extension/plugin/pom.xml +++ b/omnicache/omnicache-spark-extension/plugin/pom.xml @@ -73,6 +73,7 @@ + ${artifactId}-${version}${dep.os.arch} target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/conf/OmniCachePluginConfig.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/conf/OmniCachePluginConfig.scala index d9fc9b31781400d7069b114ad52e4e955e3cad2b..77cacf0670244d3b0293796cb7b963cd266882b1 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/conf/OmniCachePluginConfig.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/conf/OmniCachePluginConfig.scala @@ -17,9 +17,13 @@ package com.huawei.boostkit.spark.conf +import java.util.Locale + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SQLConf class OmniCachePluginConfig(conf: SQLConf) { @@ -48,6 +52,10 @@ class OmniCachePluginConfig(conf: SQLConf) { .getConfString("spark.sql.omnicache.default.datasource", "orc") val dataSourceSet: Set[String] = Set("orc", "parquet") + + def logLevel: String = conf + .getConfString("spark.sql.omnicache.logLevel", "DEBUG") + .toUpperCase(Locale.ROOT) } object OmniCachePluginConfig { @@ -86,4 +94,10 @@ object OmniCachePluginConfig { val catalogTable = spark.sessionState.catalog.getTableMetadata(mv) !catalogTable.properties.getOrElse(MV_UPDATE_REWRITE_ENABLED, "true").toBoolean } + + def isMVInUpdate(viewTablePlan: LogicalPlan): Boolean = { + val logicalRelation = viewTablePlan.asInstanceOf[LogicalRelation] + !logicalRelation.catalogTable.get + .properties.getOrElse(MV_UPDATE_REWRITE_ENABLED, "true").toBoolean + } } diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ExprSimplifier.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ExprSimplifier.scala index 13b1a607d34f6fda8d8bfb8002be9d58175e0602..5cb7d19258cfc56f95ea0f7bebfe4847e566ef40 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ExprSimplifier.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ExprSimplifier.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.optimizer._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, OneRowRelation} import org.apache.spark.sql.types.{BooleanType, DataType, NullType} - case class ExprSimplifier(unknownAsFalse: Boolean, pulledUpPredicates: Set[Expression]) { @@ -476,14 +475,14 @@ case class ExprSimplifier(unknownAsFalse: Boolean, for (term <- terms) { // Excluding self-simplification if (!term.eq(orOp)) { - // Simplification between a orExpression and a orExpression. + // Simplification between a OrExpression and a OrExpression. if (term.isInstanceOf[Or]) { if (containsAllSql(ors, decomposeDisjunctions(term).toSet)) { terms.-=(orOp) breaks3.break() } } else if (containsSql(ors, term)) { - // Simplification between a otherExpression and a orExpression. + // Simplification between a otherExpression and a OrExpression. terms.-=(orOp) breaks3.break() } @@ -615,7 +614,7 @@ case class ExprSimplifier(unknownAsFalse: Boolean, case c@Not(c1@EqualNullSafe(_, Literal.TrueLiteral)) => Not(c1.copy(left = child2)) case c => - throw new RuntimeException("unSupport type is predict simplify :%s".format(c)) + throw new RuntimeException("unSupport type is predicate simplify :%s".format(c)) } return condition2 } diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteHelper.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteHelper.scala index 38b7d40a32402136e4cad14a369e724e3f6539b8..2ef67e5361bb8f6920bce59c8519e53eedfd1c1b 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteHelper.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteHelper.scala @@ -28,14 +28,14 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SQLConf -class RewriteHelper extends PredicateHelper { +trait RewriteHelper extends PredicateHelper with RewriteLogger { val SESSION_CATALOG_NAME: String = "spark_catalog" - val EMPTY_BITMAP: HashBiMap[String, String] = HashBiMap.create[String, String]() + val EMPTY_BIMAP: HashBiMap[String, String] = HashBiMap.create[String, String]() val EMPTY_MAP: Map[ExpressionEqual, mutable.Set[ExpressionEqual]] = Map[ExpressionEqual, mutable.Set[ExpressionEqual]]() - val EMPTY_MULTIMAP: Multimap[Int, Int] = ArrayListMultimap.create[Int, Int] + val EMPTY_MULTIMAP: Multimap[Int, Int] = ArrayListMultimap.create[Int, Int]() def mergeConjunctiveExpressions(e: Seq[Expression]): Expression = { if (e.isEmpty) { @@ -81,8 +81,9 @@ class RewriteHelper extends PredicateHelper { for ((project, column) <- topProjectList.zip(viewTablePlan.output)) { project match { // only map attr - case _@Alias(attr@AttributeReference(_, _, _, _), _) => - exprIdToQualifier += (column.exprId -> attr) + case a@Alias(attr@AttributeReference(_, _, _, _), _) => + exprIdToQualifier += (column.exprId -> + attr.copy(name = a.name)(exprId = attr.exprId, qualifier = attr.qualifier)) case a@AttributeReference(_, _, _, _) => exprIdToQualifier += (column.exprId -> a) // skip function @@ -204,11 +205,11 @@ class RewriteHelper extends PredicateHelper { (mappedQuery, mappedTables) } - def swapTableColumnReferences[T <: Iterable[Expression]](expression: T, + def swapTableColumnReferences[T <: Iterable[Expression]](expressions: T, tableMapping: BiMap[String, String], columnMapping: Map[ExpressionEqual, mutable.Set[ExpressionEqual]]): T = { - var result: T = expression + var result: T = expressions if (!tableMapping.isEmpty) { result = result.map { expr => expr.transform { @@ -243,28 +244,28 @@ class RewriteHelper extends PredicateHelper { result } - def swapColumnTableReferences[T <: Iterable[Expression]](expression: T, + def swapColumnTableReferences[T <: Iterable[Expression]](expressions: T, tableMapping: BiMap[String, String], columnMapping: Map[ExpressionEqual, mutable.Set[ExpressionEqual]]): T = { - var result = swapTableColumnReferences(expression, EMPTY_BITMAP, columnMapping) + var result = swapTableColumnReferences(expressions, EMPTY_BIMAP, columnMapping) result = swapTableColumnReferences(result, tableMapping, EMPTY_MAP) result } - def swapTableReferences[T <: Iterable[Expression]](expression: T, + def swapTableReferences[T <: Iterable[Expression]](expressions: T, tableMapping: BiMap[String, String]): T = { - swapTableColumnReferences(expression, tableMapping, EMPTY_MAP) + swapTableColumnReferences(expressions, tableMapping, EMPTY_MAP) } - def swapColumnReferences[T <: Iterable[Expression]](expression: T, + def swapColumnReferences[T <: Iterable[Expression]](expressions: T, columnMapping: Map[ExpressionEqual, mutable.Set[ExpressionEqual]]): T = { - swapTableColumnReferences(expression, EMPTY_BITMAP, columnMapping) + swapTableColumnReferences(expressions, EMPTY_BIMAP, columnMapping) } } -object RewriteHelper extends PredicateHelper { +object RewriteHelper extends PredicateHelper with RewriteLogger { /** * Rewrite [[EqualTo]] and [[EqualNullSafe]] operator to keep order. The following cases will be * equivalent: @@ -431,6 +432,36 @@ object RewriteHelper extends PredicateHelper { def disableCachePlugin(): Unit = { SQLConf.get.setConfString("spark.sql.omnicache.enable", "false") } + + def checkAttrsValid(logicalPlan: LogicalPlan): Boolean = { + logicalPlan.foreachUp { + case _: LeafNode => + case plan => + val attributeSets = plan.expressions.map { expression => + AttributeSet.fromAttributeSets( + expression.collect { + case s: SubqueryExpression => + var res = s.references + s.plan.transformAllExpressions { + case e@OuterReference(ar) => + res ++= AttributeSet(ar.references) + e + case e => e + } + res + case e => e.references + }) + } + val request = AttributeSet.fromAttributeSets(attributeSets) + val input = plan.inputSet + val missing = request -- input + if (missing.nonEmpty) { + logBasedOnLevel("checkAttrsValid failed for missing:%s".format(missing)) + return false + } + } + true + } } case class ExpressionEqual(expression: Expression) { diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteLogger.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteLogger.scala new file mode 100644 index 0000000000000000000000000000000000000000..f2b63f5ca0591b2dd807c20148fd4e4ceb0c0f33 --- /dev/null +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/RewriteLogger.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huawei.boostkit.spark.util + +import com.huawei.boostkit.spark.conf.OmniCachePluginConfig + +import org.apache.spark.internal.Logging + +trait RewriteLogger extends Logging { + + private def logLevel: String = OmniCachePluginConfig.getConf.logLevel + + def logBasedOnLevel(f: => String): Unit = { + logLevel match { + case "TRACE" => logTrace(f) + case "DEBUG" => logDebug(f) + case "INFO" => logInfo(f) + case "WARN" => logWarning(f) + case "ERROR" => logError(f) + case _ => logTrace(f) + } + } +} diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ViewMetadata.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ViewMetadata.scala index 8074184001d55c8f99ee686ae0499bc35d7f0f47..a3ab16e76d12e1d5a6903a238ae029be8b523486 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ViewMetadata.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/com/huawei/boostkit/spark/util/ViewMetadata.scala @@ -17,15 +17,15 @@ package com.huawei.boostkit.spark.util -import com.google.common.collect.Lists import com.huawei.boostkit.spark.conf.OmniCachePluginConfig._ import java.util.concurrent.ConcurrentHashMap -import org.apache.calcite.util.graph._ +import scala.collection.mutable import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.optimizer.rules.RewriteTime +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RepartitionByExpression, SubqueryAlias} object ViewMetadata extends RewriteHelper { @@ -35,9 +35,7 @@ object ViewMetadata extends RewriteHelper { val viewToContainsTables = new ConcurrentHashMap[String, Set[TableEqual]]() - val usesGraph: DirectedGraph[String, DefaultEdge] = DefaultDirectedGraph.create() - - var frozenGraph: Graphs.FrozenGraph[String, DefaultEdge] = Graphs.makeImmutable(usesGraph) + val tableToViews = new ConcurrentHashMap[String, mutable.Set[String]]() var spark: SparkSession = _ @@ -52,10 +50,6 @@ object ViewMetadata extends RewriteHelper { status = STATUS_LOADING } - def usesGraphTopologicalOrderIterator: java.lang.Iterable[String] = { - TopologicalOrderIterator.of[String, DefaultEdge](usesGraph) - } - def saveViewMetadataToMap(catalogTable: CatalogTable): Unit = this.synchronized { // if QUERY_REWRITE_ENABLED is false, doesn't load ViewMetadata if (!catalogTable.properties.getOrElse(MV_REWRITE_ENABLED, "false").toBoolean) { @@ -80,40 +74,58 @@ object ViewMetadata extends RewriteHelper { // db.table val tableName = catalogTable.identifier.quotedString - val viewTablePlan = spark.table(tableName).queryExecution.analyzed match { - case SubqueryAlias(_, child) => child - case a@_ => a + val viewTablePlan = RewriteTime + .withTimeStat("viewTablePlan") { + spark.table(tableName).queryExecution.analyzed match { + case SubqueryAlias(_, child) => child + case a@_ => a + } + } + var viewQueryPlan = RewriteTime + .withTimeStat("viewQueryPlan") { + spark.sql(viewQuerySql).queryExecution.analyzed + } + viewQueryPlan = viewQueryPlan match { + case RepartitionByExpression(_, child, _) => + child + case _ => + viewQueryPlan } - val viewQueryPlan = spark.sql(viewQuerySql).queryExecution.analyzed // reset preDatabase spark.sessionState.catalogManager.setCurrentNamespace(Array(preDatabase)) // spark_catalog.db.table val viewName = catalogTable.identifier.toString() - // mappedViewQueryPlan and mappedViewContainsTable - val (mappedViewQueryPlan, mappedViewContainsTables) = extractTables(viewQueryPlan) + // mappedViewQueryPlan and mappedViewContainsTables + val (mappedViewQueryPlan, mappedViewContainsTables) = RewriteTime + .withTimeStat("extractTables") { + extractTables(viewQueryPlan) + } - usesGraph.addVertex(viewName) mappedViewContainsTables .foreach { mappedViewContainsTable => val name = mappedViewContainsTable.tableName - usesGraph.addVertex(name) - usesGraph.addEdge(name, viewName) + val views = tableToViews.getOrDefault(name, mutable.Set.empty) + views += viewName + tableToViews.put(name, views) } // extract view query project's Attr and replace view table's Attr by query project's Attr // match function is attributeReferenceEqualSimple, by name and data type // Attr of table cannot used, because same Attr in view query and view table, // it's table is different. - val mappedViewTablePlan = mapTablePlanAttrToQuery(viewTablePlan, mappedViewQueryPlan) + val mappedViewTablePlan = RewriteTime + .withTimeStat("mapTablePlanAttrToQuery") { + mapTablePlanAttrToQuery(viewTablePlan, mappedViewQueryPlan) + } viewToContainsTables.put(viewName, mappedViewContainsTables) viewToViewQueryPlan.putIfAbsent(viewName, mappedViewQueryPlan) viewToTablePlan.putIfAbsent(viewName, mappedViewTablePlan) } catch { case e: Throwable => - logDebug(s"Failed to saveViewMetadataToMap. errmsg: ${e.getMessage}") + logDebug(s"Failed to saveViewMetadataToMap,errmsg: ${e.getMessage}") // reset preDatabase spark.sessionState.catalogManager.setCurrentNamespace(Array(preDatabase)) } @@ -129,21 +141,19 @@ object ViewMetadata extends RewriteHelper { def addCatalogTableToCache(table: CatalogTable): Unit = this.synchronized { saveViewMetadataToMap(table) - rebuildGraph() - } - - def rebuildGraph(): Unit = { - frozenGraph = Graphs.makeImmutable(usesGraph) } def removeMVCache(tableName: TableIdentifier): Unit = this.synchronized { val viewName = tableName.toString() - usesGraph.removeAllVertices(Lists.newArrayList(viewName)) viewToContainsTables.remove(viewName) viewToViewQueryPlan.remove(viewName) viewToTablePlan.remove(viewName) - viewToContainsTables.remove(viewName) - rebuildGraph() + tableToViews.forEach { (key, value) => + if (value.contains(viewName)) { + value -= viewName + tableToViews.put(key, value) + } + } } def init(sparkSession: SparkSession): Unit = { @@ -158,14 +168,16 @@ object ViewMetadata extends RewriteHelper { def forceLoad(): Unit = this.synchronized { val catalog = spark.sessionState.catalog - // val db = OmniCachePluginConfig.getConf.OmniCacheDB // load from all db for (db <- catalog.listDatabases()) { - val tables = omniCacheFilter(catalog, db) - tables.foreach(tableData => saveViewMetadataToMap(tableData)) + val tables = RewriteTime.withTimeStat("loadTable") { + omniCacheFilter(catalog, db) + } + RewriteTime.withTimeStat("saveViewMetadataToMap") { + tables.foreach(tableData => saveViewMetadataToMap(tableData)) + } } - rebuildGraph() } def omniCacheFilter(catalog: SessionCatalog, @@ -176,7 +188,7 @@ object ViewMetadata extends RewriteHelper { tableData.properties.contains(MV_QUERY_ORIGINAL_SQL) } } catch { - // if db exists a table hive materialized view, will throw annalysis exception + // if db exists a table hive materialized view, will throw analysis exception case e: Throwable => logDebug(s"Failed to listTables in $mvDataBase, errmsg: ${e.getMessage}") Seq.empty[CatalogTable] diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/AbstractMaterializedViewRule.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/AbstractMaterializedViewRule.scala index df26d192fb00e1597723f3b978afb4754bb45ed7..7d398c2cf07a952fb9bea27a9792a6e394b044e2 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/AbstractMaterializedViewRule.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/AbstractMaterializedViewRule.scala @@ -49,7 +49,9 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) if (ViewMetadata.status == ViewMetadata.STATUS_LOADING) { return finalPlan } - ViewMetadata.init(sparkSession) + RewriteTime.withTimeStat("viewMetadata") { + ViewMetadata.init(sparkSession) + } // 1.check query sql is match current rule if (ViewMetadata.isEmpty || !isValidPlan(plan)) { return finalPlan @@ -60,8 +62,10 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) val (queryExpr, queryTables) = extractTables(finalPlan) // 3.use all tables to fetch views(may match) from ViewMetaData - val candidateViewPlans = getApplicableMaterializations(queryTables.map(t => t.tableName)) - .filter(x => !OmniCachePluginConfig.isMVInUpdate(sparkSession, x._1)) + val candidateViewPlans = RewriteTime.withTimeStat("getApplicableMaterializations") { + getApplicableMaterializations(queryTables.map(t => t.tableName)) + .filter(x => !OmniCachePluginConfig.isMVInUpdate(x._2)) + } if (candidateViewPlans.isEmpty) { return finalPlan } @@ -112,16 +116,21 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) val (newViewTablePlan, newViewQueryPlan, newTopViewProject) = newViewPlans.get viewTablePlan = newViewTablePlan viewQueryPlan = newViewQueryPlan + if (newTopViewProject.isEmpty) { + viewQueryExpr = newViewQueryPlan + } topViewProject = newTopViewProject } // 4.5.extractPredictExpressions from viewQueryPlan and mappedQueryPlan - val queryPredictExpression = extractPredictExpressions(queryExpr, EMPTY_BITMAP) + val queryPredictExpression = RewriteTime.withTimeStat("extractPredictExpressions") { + extractPredictExpressions(queryExpr, EMPTY_BIMAP) + } val viewProjectList = extractTopProjectList(viewQueryExpr) val viewTableAttrs = viewTablePlan.output - // 4.6.if a table emps used >=2 times in s sql (query and view) + // 4.6.if a table emps used >=2 times in a sql (query and view) // we should try the combination,switch the seq // view:SELECT V1.locationid,V2.empname FROM emps V1 JOIN emps V2 // ON V1.deptno='1' AND V2.deptno='2' AND V1.empname = V2.empname; @@ -132,25 +141,31 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) flatListMappings.foreach { queryToViewTableMapping => mappingLoop.breakable { val inverseTableMapping = queryToViewTableMapping.inverse() - val viewPredictExpression = extractPredictExpressions(viewQueryExpr, - inverseTableMapping) + val viewPredictExpression = RewriteTime.withTimeStat("extractPredictExpressions") { + extractPredictExpressions(viewQueryExpr, + inverseTableMapping) + } // 4.7.compute compensationPredicates between viewQueryPlan and queryPlan - var newViewTablePlan = computeCompensationPredicates(viewTablePlan, - queryPredictExpression, viewPredictExpression, inverseTableMapping, - viewPredictExpression._1.getEquivalenceClassesMap, - viewProjectList, viewTableAttrs) - // 4.8.compensationPredicates isEmpty, because view's row data cannot satify query + var newViewTablePlan = RewriteTime.withTimeStat("computeCompensationPredicates") { + computeCompensationPredicates(viewTablePlan, + queryPredictExpression, viewPredictExpression, inverseTableMapping, + viewPredictExpression._1.getEquivalenceClassesMap, + viewProjectList, viewTableAttrs) + } + // 4.8.compensationPredicates isEmpty, because view's row data cannot satisfy query if (newViewTablePlan.isEmpty) { mappingLoop.break() } // 4.9.use viewTablePlan(join compensated), query project, // compensationPredicts to rewrite final plan - newViewTablePlan = rewriteView(newViewTablePlan.get, viewQueryExpr, - queryExpr, inverseTableMapping, - queryPredictExpression._1.getEquivalenceClassesMap, - viewProjectList, viewTableAttrs) + newViewTablePlan = RewriteTime.withTimeStat("rewriteView") { + rewriteView(newViewTablePlan.get, viewQueryExpr, + queryExpr, inverseTableMapping, + queryPredictExpression._1.getEquivalenceClassesMap, + viewProjectList, viewTableAttrs) + } if (newViewTablePlan.isEmpty) { mappingLoop.break() } @@ -165,13 +180,19 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) } /** - * cehck plan if match current rule + * check plan if match current rule * * @param logicalPlan LogicalPlan * @return true:matched ; false:unMatched */ def isValidPlan(logicalPlan: LogicalPlan): Boolean + /** + * basic check for all rule + * + * @param logicalPlan LogicalPlan + * @return true:matched ; false:unMatched + */ def isValidLogicalPlan(logicalPlan: LogicalPlan): Boolean = { logicalPlan.foreach { case _: LogicalRelation => @@ -199,19 +220,19 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) LogicalPlan, LogicalPlan)] = { // viewName, viewTablePlan, viewQueryPlan var viewPlans = Seq.empty[(String, LogicalPlan, LogicalPlan)] + val viewNames = mutable.Set.empty[String] // 1.topological iterate graph - ViewMetadata.usesGraphTopologicalOrderIterator.forEach { viewName => - // 2.check this node is mv - if (ViewMetadata.viewToTablePlan.containsKey(viewName) - // 3.iterate mv used tables and check edge of (table,mv) in graph - && usesTable(viewName, ViewMetadata - .viewToContainsTables.get(viewName), ViewMetadata.frozenGraph)) { - // 4.add plan info - val viewQueryPlan = ViewMetadata.viewToViewQueryPlan.get(viewName) - val viewTablePlan = ViewMetadata.viewToTablePlan.get(viewName) - viewPlans +:= (viewName, viewTablePlan, viewQueryPlan) + tableNames.foreach { tableName => + if (ViewMetadata.tableToViews.containsKey(tableName)) { + viewNames ++= ViewMetadata.tableToViews.get(tableName) } } + viewNames.foreach { viewName => + // 4.add plan info + val viewQueryPlan = ViewMetadata.viewToViewQueryPlan.get(viewName) + val viewTablePlan = ViewMetadata.viewToTablePlan.get(viewName) + viewPlans +:= (viewName, viewTablePlan, viewQueryPlan) + } viewPlans } @@ -239,7 +260,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) * @param viewTablePlan viewTablePlan * @param viewQueryPlan viewQueryPlan * @param topViewProject topViewProject - * @param needTables needTables + * @param needTables need join compensate tables * @return join compensated viewTablePlan */ def compensateViewPartial(viewTablePlan: LogicalPlan, @@ -248,6 +269,14 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) needTables: Set[TableEqual]): Option[(LogicalPlan, LogicalPlan, Option[Project])] = None + /** + * We map every table in the query to a table with the same qualified + * name (all query tables are contained in the view, thus this is equivalent + * to mapping every table in the query to a view table). + * + * @param queryTables queryTables + * @return + */ def generateTableMappings(queryTables: Set[TableEqual]): Seq[BiMap[String, String]] = { val multiMapTables: Multimap[String, String] = ArrayListMultimap.create() for (t1 <- queryTables) { @@ -269,7 +298,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) } // continue } else { - // Multiple reference: flatten + // Multiple references: flatten val newResult: ImmutableList.Builder[BiMap[String, String]] = ImmutableList.builder() t2s.forEach { target => result.forEach { m => @@ -284,6 +313,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) } } JavaConverters.asScalaIteratorConverter(result.iterator()).asScala.toSeq + .sortWith((map1, map2) => map1.toString < map2.toString) } /** @@ -346,10 +376,17 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) Some(compensationPredicate) } + /** + * extractPossibleMapping {queryEquivalenceClasses:[contained viewEquivalenceClasses]} + * + * @param queryEquivalenceClasses queryEquivalenceClasses + * @param viewEquivalenceClasses viewEquivalenceClasses + * @return {queryEquivalenceClasses:[contained viewEquivalenceClasses]} + */ def extractPossibleMapping(queryEquivalenceClasses: List[mutable.Set[ExpressionEqual]], viewEquivalenceClasses: List[mutable.Set[ExpressionEqual]]): Option[Multimap[Int, Int]] = { // extractPossibleMapping {queryEquivalenceClasses:[contained viewEquivalenceClasses]} - // query:c1=c2=c3=c4 view:c1=c2 ,c3=c4 + // query:c1=c2=c3=c4 view:c1=c2 , c3=c4 val mapping = ArrayListMultimap.create[Int, Int]() val breakLoop = new Breaks @@ -381,6 +418,12 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) Some(mapping) } + /** + * + * @param queryExpression queryExpression + * @param viewExpression viewExpression + * @return compensate Expression + */ def splitFilter(queryExpression: Expression, viewExpression: Expression): Option[Expression] = { // 1.canonicalize expression,main for reorder val queryExpression2 = RewriteHelper.canonicalize(ExprSimplifier.simplify(queryExpression)) @@ -422,7 +465,6 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) None } - /** * split expression by or,then compute compensation * @@ -434,7 +476,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) val queries = ExprOptUtil.disjunctions(queryExpression) val views = ExprOptUtil.disjunctions(viewExpression) - // 1.compare difference which queries residue + // 1.compute difference which queries residue val difference = queries.map(ExpressionEqual) -- views.map(ExpressionEqual) // 2.1.queries equal to views,just return true @@ -533,9 +575,8 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) true } - /** - * compute compensationPredicts between viewQueryPlan and mappedQueryPlan + * compute compensationPredicates between viewQueryPlan and mappedQueryPlan * * @param viewTablePlan viewTablePlan * @param queryPredict queryPredict @@ -594,18 +635,6 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) if (columnsEquiPredictsResult.isEmpty) { return None } - val viewTableAttrSet = viewTableAttrs.map(ExpressionEqual).toSet - columnsEquiPredictsResult.get.foreach { expr => - expr.foreach { - case attr: AttributeReference => - if (!viewTableAttrSet.contains(ExpressionEqual(attr))) { - logDebug(s"attr:%s cannot found in view:%s" - .format(attr, OmniCachePluginConfig.getConf.curMatchMV)) - return None - } - case _ => - } - } // 5.rewrite rangeCompensation,residualCompensation by viewTableAttrs val otherPredictsResult = rewriteExpressions(Seq(compensationRangePredicts.get, @@ -627,10 +656,10 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) * @param swapTableColumn true:swapTableColumn;false:swapColumnTable * @param tableMapping tableMapping * @param columnMapping columnMapping - * @param viewProjectList viewProjectList + * @param viewProjectList viewProjectList/viewAggExpression * @param viewTableAttrs viewTableAttrs * @tparam T T <: Iterable[Expression] - * @return rewriteExprs + * @return rewritedExprs */ def rewriteExpressions[T <: Iterable[Expression]]( exprsToRewrite: T, swapTableColumn: Boolean, @@ -642,7 +671,7 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) val swapProjectList = if (swapTableColumn) { swapTableColumnReferences(viewProjectList, tableMapping, columnMapping) } else { - swapColumnTableReferences(viewProjectList, tableMapping, columnMapping) + swapTableColumnReferences(viewProjectList, tableMapping, columnMapping) } val swapTableAttrs = swapTableReferences(viewTableAttrs, tableMapping) @@ -665,12 +694,12 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) }.asInstanceOf[T] // 4.iterate result and dfs check every AttributeReference in ViewTableAttributeReference - val viewTableAttrSet = swapTableAttrs.map(ExpressionEqual).toSet + val viewTableAttrsSet = swapTableAttrs.map(_.exprId).toSet result.foreach { expr => expr.foreach { case attr: AttributeReference => - if (!viewTableAttrSet.contains(ExpressionEqual(attr))) { - logDebug(s"attr:%s cannot found in view:%s" + if (!viewTableAttrsSet.contains(attr.exprId)) { + logBasedOnLevel(s"attr:%s cannot found in view:%s" .format(attr, OmniCachePluginConfig.getConf.curMatchMV)) return None } @@ -680,7 +709,6 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) Some(result) } - /** * if the rewrite expression exprId != origin expression exprId, * replace by Alias(rewrite expression,origin.name)(exprId=origin.exprId) @@ -706,6 +734,16 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) /** * replace and alias expression or attr by viewTableAttr + * + * @param exprsToRewrite exprsToRewrite + * @param swapTableColumn true:swapTableColumn;false:swapColumnTable + * @param tableMapping tableMapping + * @param columnMapping columnMapping + * @param viewProjectList viewProjectList/viewAggExpression + * @param viewTableAttrs viewTableAttrs + * @param originExpressions originExpressions + * @tparam T T <: Iterable[Expression] + * @return rewrited and alias expression */ def rewriteAndAliasExpressions[T <: Iterable[Expression]]( exprsToRewrite: T, swapTableColumn: Boolean, @@ -742,5 +780,4 @@ abstract class AbstractMaterializedViewRule(sparkSession: SparkSession) columnMapping: Map[ExpressionEqual, mutable.Set[ExpressionEqual]], viewProjectList: Seq[Expression], viewTableAttrs: Seq[Attribute]): Option[LogicalPlan] - } diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MVRewriteRule.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MVRewriteRule.scala index 98989c460b240765365b9870078a6679beb4e010..ba0c17f227485169f817532807f85ee1e16ef764 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MVRewriteRule.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MVRewriteRule.scala @@ -17,19 +17,23 @@ package org.apache.spark.sql.catalyst.optimizer.rules +import com.fasterxml.jackson.annotation.JsonIgnore import com.huawei.boostkit.spark.conf.OmniCachePluginConfig -import com.huawei.boostkit.spark.util.RewriteHelper +import com.huawei.boostkit.spark.util.{RewriteHelper, RewriteLogger} import scala.collection.mutable import org.apache.spark.SparkContext +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.SparkListenerEvent +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.OmniCacheCreateMvCommand +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.kvstore.KVIndex -class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] with Logging { +class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] with RewriteLogger { val omniCacheConf: OmniCachePluginConfig = OmniCachePluginConfig.getConf val joinRule = new MaterializedViewJoinRule(session) @@ -55,6 +59,8 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] with Loggin def tryRewritePlan(plan: LogicalPlan): LogicalPlan = { val usingMvs = mutable.Set.empty[String] + RewriteTime.clear() + val rewriteStartSecond = System.currentTimeMillis() val res = plan.transformDown { case p: Project => joinRule.perform(Some(p), p.child, usingMvs) @@ -66,23 +72,71 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] with Loggin RewriteHelper.extractAllAttrsFromExpression(a.aggregateExpressions).toSeq, a.child) val rewritedChild = joinRule.perform(Some(child), child.child, usingMvs) if (rewritedChild != child) { - rewritedPlan = a.copy(child = rewritedChild) + val projectChild = rewritedChild.asInstanceOf[Project] + rewritedPlan = a.copy(child = Project( + projectChild.projectList ++ projectChild.child.output, projectChild.child)) } } rewritedPlan case p => p } if (usingMvs.nonEmpty) { + RewriteTime.withTimeStat("checkAttrsValid") { + if (!RewriteHelper.checkAttrsValid(res)) { + return plan + } + } val sql = session.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) val mvs = usingMvs.mkString(";").replaceAll("`", "") - val log = "logicalPlan MVRewrite success,using materialized view:[%s],original sql:%s" - .format(mvs, sql) - logDebug(log) + val costSecond = (System.currentTimeMillis() - rewriteStartSecond).toString + val log = ("logicalPlan MVRewrite success," + + "using materialized view:[%s],cost %s milliseconds,original sql:%s") + .format(mvs, costSecond, sql) + logBasedOnLevel(log) session.sparkContext.listenerBus.post(SparkListenerMVRewriteSuccess(sql, mvs)) } + RewriteTime.statFromStartTime("total", rewriteStartSecond) + logBasedOnLevel(RewriteTime.timeStat.toString()) res } } +@DeveloperApi case class SparkListenerMVRewriteSuccess(sql: String, usingMvs: String) extends SparkListenerEvent { + @JsonIgnore + @KVIndex + def id: String = (System.currentTimeMillis() + "%s%s".format(sql, usingMvs).hashCode).toString +} + +class MVRewriteSuccessListener( + kvStore: ElementTrackingStore) extends SparkListener with Logging { + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case _: SparkListenerMVRewriteSuccess => + kvStore.write(event) + case _ => + } + } +} + +object RewriteTime { + val timeStat: mutable.Map[String, Long] = mutable.HashMap[String, Long]() + + def statFromStartTime(key: String, startTime: Long): Unit = { + timeStat += (key -> (timeStat.getOrElse(key, 0L) + System.currentTimeMillis() - startTime)) + } + + def clear(): Unit = { + timeStat.clear() + } + + def withTimeStat[T](key: String)(f: => T): T = { + val startTime = System.currentTimeMillis() + try { + f + } finally { + statFromStartTime(key, startTime) + } + } } diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRule.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRule.scala index fe82c4744b05fb70e0f1f03d4356221b930e7dee..6dda8689106d62c13603ea2d40ff544d9be9b079 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRule.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRule.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ class MaterializedViewAggregateRule(sparkSession: SparkSession) extends AbstractMaterializedViewRule(sparkSession: SparkSession) { /** - * cehck plan if match current rule + * check plan if match current rule * * @param logicalPlan LogicalPlan * @return true:matched ; false:unMatched @@ -43,7 +43,6 @@ class MaterializedViewAggregateRule(sparkSession: SparkSession) logicalPlan.children.forall(isValidLogicalPlan) } - /** * queryTableInfo!=viewTableInfo , need do join compensate * @@ -208,7 +207,6 @@ class MaterializedViewAggregateRule(sparkSession: SparkSession) // 4.rewrite and alias queryAggExpressions // if the rewrite expression exprId != origin expression exprId, // replace by Alias(rewrite expression,origin.name)(exprId=origin.exprId) - val rewritedQueryAggExpressions = rewriteAndAliasExpressions(newQueryAggExpressions, swapTableColumn = true, tableMapping, columnMapping, viewProjectList, viewTableAttrs, queryAgg.aggregateExpressions) @@ -240,5 +238,4 @@ class MaterializedViewAggregateRule(sparkSession: SparkSession) qualifier = qualifier, explicitMetadata = alias.explicitMetadata, nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys) } - } diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRule.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRule.scala index 15717fa168107be193a0e3a020b9646f9593df0a..5c7c477dda7bf2f03dca11d212222b7fcf75c829 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRule.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRule.scala @@ -22,14 +22,14 @@ import com.huawei.boostkit.spark.util.{ExpressionEqual, TableEqual} import scala.collection.mutable import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical._ class MaterializedViewJoinRule(sparkSession: SparkSession) extends AbstractMaterializedViewRule(sparkSession: SparkSession) { /** - * cehck plan if match current rule + * check plan if match current rule * * @param logicalPlan LogicalPlan * @return true:matched ; false:unMatched @@ -38,7 +38,6 @@ class MaterializedViewJoinRule(sparkSession: SparkSession) isValidLogicalPlan(logicalPlan) } - /** * queryTableInfo!=viewTableInfo , need do join compensate * @@ -66,11 +65,20 @@ class MaterializedViewJoinRule(sparkSession: SparkSession) topViewProject.get } + var projectList: Seq[NamedExpression] = newViewQueryPlan match { + case p: Project => + p.projectList + case _ => + newViewQueryPlan.output + } + needTables.foreach { needTable => newViewQueryPlan = Join(newViewQueryPlan, needTable.logicalPlan, Inner, None, JoinHint.NONE) + projectList ++= needTable.logicalPlan.output } - Some(newViewTablePlan, viewQueryPlan, None) + newViewQueryPlan = Project(projectList, newViewQueryPlan) + Some(newViewTablePlan, newViewQueryPlan, None) } /** diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionAstBuilder.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionAstBuilder.scala index a4168024488e088be045989b25fc8ffed11a21d1..414afa602b5e5a4244ba9ca5d3fa4002e39574ad 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionAstBuilder.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionAstBuilder.scala @@ -35,6 +35,11 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ +/** + * Parse extended AST to LogicalPlan + * + * @param delegate Spark default ParserInterface + */ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterface) extends OmniCacheSqlExtensionsBaseVisitor[AnyRef] with SQLConfHelper with Logging { @@ -61,8 +66,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac case Seq(mv) => (None, mv) case Seq(database, mv) => (Some(database), mv) case _ => throw new AnalysisException( - "The mv name is not valid: %s".format(identifier.mkString(".")) - ) + "The mv name is not valid: %s".format(identifier.mkString("."))) } try { @@ -102,8 +106,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac case Seq(mv) => (None, mv) case Seq(database, mv) => (Some(database), mv) case _ => throw new AnalysisException( - "The mv name is not valid: %s".format(tableIdent.mkString(".")) - ) + "The mv name is not valid: %s".format(tableIdent.mkString("."))) } val tableIdentifier = TableIdentifier(name, databaseName) @@ -119,7 +122,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac throw new RuntimeException("cannot refresh a table with refresh mv") } - // preserver preDatabase and set curDatabase + // preserve preDatabase and set curDatabase val preDatabase = spark.catalog.currentDatabase val curDatabase = catalogTable.properties.getOrElse(MV_QUERY_ORIGINAL_SQL_CUR_DB, "") if (curDatabase.isEmpty) { @@ -179,6 +182,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac RewriteHelper.enableCachePlugin() throw e } finally { + // reset preDatabase spark.sessionState.catalogManager.setCurrentNamespace(Array(preDatabase)) } } @@ -221,8 +225,7 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac case Seq(database, mv) => DropMaterializedViewCommand( TableIdentifier(mv, Some(database)), ifExists.isDefined, - purge = true - ) + purge = true) case _ => throw new AnalysisException( "The mv name is not valid: %s".format(identifier.mkString("."))) } @@ -297,6 +300,9 @@ class OmniCacheExtensionAstBuilder(spark: SparkSession, delegate: ParserInterfac visit(ctx.statement).asInstanceOf[LogicalPlan] } + /** + * alias tuple2 + */ type OmniCacheHeader = (Seq[String], Boolean) /** diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionSqlParser.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionSqlParser.scala index 65077216a9a48bdedfa159593165ade7b46101a5..aeab8423244438dbcca481edbbdd8196f992ba30 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionSqlParser.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/catalyst/parser/OmniCacheExtensionSqlParser.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale -import org.antlr.v4.runtime.{CharStreams, CommonTokenStream} +import org.antlr.v4.runtime._ import org.antlr.v4.runtime.atn.PredictionMode import org.antlr.v4.runtime.misc.ParseCancellationException diff --git a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/execution/command/OmniCacheCommand.scala b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/execution/command/OmniCacheCommand.scala index 801fffbd23c887242cc197bcc147a77d8dd486ba..c053b625236f6a731d175c5ebc8de7abd5c10ef9 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/execution/command/OmniCacheCommand.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/main/scala/org/apache/spark/sql/execution/command/OmniCacheCommand.scala @@ -54,6 +54,7 @@ case class OmniCacheCreateMvCommand( partitioning: Seq[String], query: LogicalPlan, outputColumnNames: Seq[String]) extends DataWritingCommand { + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { try { ViewMetadata.init(sparkSession) @@ -84,7 +85,7 @@ case class OmniCacheCreateMvCommand( throw new AnalysisException( s"Materialized View $tableIdentWithDB already exists. You need to drop it first") } else { - // Since the table already exists and the save mode is Ignore,we will just return. + // Since the table already exists and the save mode is Ignore, we will just return. return Seq.empty } } else { @@ -102,10 +103,11 @@ case class OmniCacheCreateMvCommand( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation - // provider (for example,see org.apache.spark.sql.parquet.DefaultSource). + // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = tableSchema) // Table location is already validated. No need to check it again during table creation. sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) + result match { case _: HadoopFsRelation if table.partitionColumnNames.nonEmpty && sparkSession.sqlContext.conf.manageFilesourcePartitions => @@ -153,10 +155,19 @@ case class OmniCacheCreateMvCommand( } } +/** + * Drops a materialized view from the metastore and removes it if it is cached. + * + * The syntax of this command is: + * {{{ + * DROP MATERIALIZED VIEW [IF EXISTS] view_name; + * }}} + */ case class DropMaterializedViewCommand( tableName: TableIdentifier, ifExists: Boolean, purge: Boolean) extends RunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { ViewMetadata.init(sparkSession) val catalog = sparkSession.sessionState.catalog @@ -201,13 +212,18 @@ case class DropMaterializedViewCommand( } } +/** + * ShowMaterializedViewCommand RunnableCommand + * + */ case class ShowMaterializedViewCommand( databaseName: Option[String], tableIdentifierPattern: Option[String]) extends RunnableCommand { + // The result of SHOW MaterializedView has three basic columns: + // database, tableName and originalSql. override val output: Seq[Attribute] = { val tableExtendedInfo = Nil - AttributeReference("database", StringType, nullable = false)() :: AttributeReference("mvName", StringType, nullable = false)() :: AttributeReference("rewriteEnable", StringType, nullable = false)() :: @@ -254,6 +270,7 @@ case class ShowMaterializedViewCommand( case class AlterRewriteMaterializedViewCommand( tableName: TableIdentifier, enableRewrite: Boolean) extends RunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { ViewMetadata.init(sparkSession) val catalog = sparkSession.sessionState.catalog @@ -379,7 +396,6 @@ case class RefreshMaterializedViewCommand( } if (doInsertion) { - def refreshUpdatedPartitions(updatedPartitionPaths: Set[String]): Unit = { val updatedPartitions = updatedPartitionPaths.map(PartitioningUtils.parsePathFragment) if (partitionsTrackedByCatalog) { @@ -446,7 +462,6 @@ case class RefreshMaterializedViewCommand( if (catalogTable.nonEmpty) { CommandUtils.updateTableStats(sparkSession, catalogTable.get) } - } else { logInfo("Skipping insertion into a relation that already exists.") } diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/resources/tpcds_ddl.sql b/omnicache/omnicache-spark-extension/plugin/src/test/resources/tpcds_ddl.sql new file mode 100644 index 0000000000000000000000000000000000000000..ae18bd4b0fc19c6b88821d3dbc53dd31240e91e9 --- /dev/null +++ b/omnicache/omnicache-spark-extension/plugin/src/test/resources/tpcds_ddl.sql @@ -0,0 +1,479 @@ +create table if not exists store_sales( + ss_sold_date_sk bigint +, ss_sold_time_sk bigint +, ss_item_sk bigint +, ss_customer_sk bigint +, ss_cdemo_sk bigint +, ss_hdemo_sk bigint +, ss_addr_sk bigint +, ss_store_sk bigint +, ss_promo_sk bigint +, ss_ticket_number bigint +, ss_quantity int +, ss_wholesale_cost decimal(7,2) +, ss_list_price decimal(7,2) +, ss_sales_price decimal(7,2) +, ss_ext_discount_amt decimal(7,2) +, ss_ext_sales_price decimal(7,2) +, ss_ext_wholesale_cost decimal(7,2) +, ss_ext_list_price decimal(7,2) +, ss_ext_tax decimal(7,2) +, ss_coupon_amt decimal(7,2) +, ss_net_paid decimal(7,2) +, ss_net_paid_inc_tax decimal(7,2) +, ss_net_profit decimal(7,2) +); + +create table if not exists store_returns( + sr_returned_date_sk bigint +, sr_return_time_sk bigint +, sr_item_sk bigint +, sr_customer_sk bigint +, sr_cdemo_sk bigint +, sr_hdemo_sk bigint +, sr_addr_sk bigint +, sr_store_sk bigint +, sr_reason_sk bigint +, sr_ticket_number bigint +, sr_return_quantity int +, sr_return_amt decimal(7,2) +, sr_return_tax decimal(7,2) +, sr_return_amt_inc_tax decimal(7,2) +, sr_fee decimal(7,2) +, sr_return_ship_cost decimal(7,2) +, sr_refunded_cash decimal(7,2) +, sr_reversed_charge decimal(7,2) +, sr_store_credit decimal(7,2) +, sr_net_loss decimal(7,2) +); + +create table if not exists catalog_sales( + cs_sold_date_sk bigint +, cs_sold_time_sk bigint +, cs_ship_date_sk bigint +, cs_bill_customer_sk bigint +, cs_bill_cdemo_sk bigint +, cs_bill_hdemo_sk bigint +, cs_bill_addr_sk bigint +, cs_ship_customer_sk bigint +, cs_ship_cdemo_sk bigint +, cs_ship_hdemo_sk bigint +, cs_ship_addr_sk bigint +, cs_call_center_sk bigint +, cs_catalog_page_sk bigint +, cs_ship_mode_sk bigint +, cs_warehouse_sk bigint +, cs_item_sk bigint +, cs_promo_sk bigint +, cs_order_number bigint +, cs_quantity int +, cs_wholesale_cost decimal(7,2) +, cs_list_price decimal(7,2) +, cs_sales_price decimal(7,2) +, cs_ext_discount_amt decimal(7,2) +, cs_ext_sales_price decimal(7,2) +, cs_ext_wholesale_cost decimal(7,2) +, cs_ext_list_price decimal(7,2) +, cs_ext_tax decimal(7,2) +, cs_coupon_amt decimal(7,2) +, cs_ext_ship_cost decimal(7,2) +, cs_net_paid decimal(7,2) +, cs_net_paid_inc_tax decimal(7,2) +, cs_net_paid_inc_ship decimal(7,2) +, cs_net_paid_inc_ship_tax decimal(7,2) +, cs_net_profit decimal(7,2) +); + +create table if not exists catalog_returns( + cr_returned_date_sk bigint +, cr_returned_time_sk bigint +, cr_item_sk bigint +, cr_refunded_customer_sk bigint +, cr_refunded_cdemo_sk bigint +, cr_refunded_hdemo_sk bigint +, cr_refunded_addr_sk bigint +, cr_returning_customer_sk bigint +, cr_returning_cdemo_sk bigint +, cr_returning_hdemo_sk bigint +, cr_returning_addr_sk bigint +, cr_call_center_sk bigint +, cr_catalog_page_sk bigint +, cr_ship_mode_sk bigint +, cr_warehouse_sk bigint +, cr_reason_sk bigint +, cr_order_number bigint +, cr_return_quantity int +, cr_return_amount decimal(7,2) +, cr_return_tax decimal(7,2) +, cr_return_amt_inc_tax decimal(7,2) +, cr_fee decimal(7,2) +, cr_return_ship_cost decimal(7,2) +, cr_refunded_cash decimal(7,2) +, cr_reversed_charge decimal(7,2) +, cr_store_credit decimal(7,2) +, cr_net_loss decimal(7,2) +); + +create table if not exists web_sales( + ws_sold_date_sk bigint +, ws_sold_time_sk bigint +, ws_ship_date_sk bigint +, ws_item_sk bigint +, ws_bill_customer_sk bigint +, ws_bill_cdemo_sk bigint +, ws_bill_hdemo_sk bigint +, ws_bill_addr_sk bigint +, ws_ship_customer_sk bigint +, ws_ship_cdemo_sk bigint +, ws_ship_hdemo_sk bigint +, ws_ship_addr_sk bigint +, ws_web_page_sk bigint +, ws_web_site_sk bigint +, ws_ship_mode_sk bigint +, ws_warehouse_sk bigint +, ws_promo_sk bigint +, ws_order_number bigint +, ws_quantity int +, ws_wholesale_cost decimal(7,2) +, ws_list_price decimal(7,2) +, ws_sales_price decimal(7,2) +, ws_ext_discount_amt decimal(7,2) +, ws_ext_sales_price decimal(7,2) +, ws_ext_wholesale_cost decimal(7,2) +, ws_ext_list_price decimal(7,2) +, ws_ext_tax decimal(7,2) +, ws_coupon_amt decimal(7,2) +, ws_ext_ship_cost decimal(7,2) +, ws_net_paid decimal(7,2) +, ws_net_paid_inc_tax decimal(7,2) +, ws_net_paid_inc_ship decimal(7,2) +, ws_net_paid_inc_ship_tax decimal(7,2) +, ws_net_profit decimal(7,2) +); + +create table if not exists web_returns( + wr_returned_date_sk bigint +, wr_returned_time_sk bigint +, wr_item_sk bigint +, wr_refunded_customer_sk bigint +, wr_refunded_cdemo_sk bigint +, wr_refunded_hdemo_sk bigint +, wr_refunded_addr_sk bigint +, wr_returning_customer_sk bigint +, wr_returning_cdemo_sk bigint +, wr_returning_hdemo_sk bigint +, wr_returning_addr_sk bigint +, wr_web_page_sk bigint +, wr_reason_sk bigint +, wr_order_number bigint +, wr_return_quantity int +, wr_return_amt decimal(7,2) +, wr_return_tax decimal(7,2) +, wr_return_amt_inc_tax decimal(7,2) +, wr_fee decimal(7,2) +, wr_return_ship_cost decimal(7,2) +, wr_refunded_cash decimal(7,2) +, wr_reversed_charge decimal(7,2) +, wr_account_credit decimal(7,2) +, wr_net_loss decimal(7,2) +); + +create table if not exists inventory( + inv_date_sk bigint +, inv_item_sk bigint +, inv_warehouse_sk bigint +, inv_quantity_on_hand int +); +create table if not exists store( + s_store_sk bigint +, s_store_id char(16) +, s_rec_start_date date +, s_rec_end_date date +, s_closed_date_sk bigint +, s_store_name varchar(50) +, s_number_employees int +, s_floor_space int +, s_hours char(20) +, S_manager varchar(40) +, S_market_id int +, S_geography_class varchar(100) +, S_market_desc varchar(100) +, s_market_manager varchar(40) +, s_division_id int +, s_division_name varchar(50) +, s_company_id int +, s_company_name varchar(50) +, s_street_number varchar(10) +, s_street_name varchar(60) +, s_street_type char(15) +, s_suite_number char(10) +, s_city varchar(60) +, s_county varchar(30) +, s_state char(2) +, s_zip char(10) +, s_country varchar(20) +, s_gmt_offset decimal(5,2) +, s_tax_percentage decimal(5,2) +); +create table if not exists call_center( + cc_call_center_sk bigint +, cc_call_center_id char(16) +, cc_rec_start_date date +, cc_rec_end_date date +, cc_closed_date_sk bigint +, cc_open_date_sk bigint +, cc_name varchar(50) +, cc_class varchar(50) +, cc_employees int +, cc_sq_ft int +, cc_hours char(20) +, cc_manager varchar(40) +, cc_mkt_id int +, cc_mkt_class char(50) +, cc_mkt_desc varchar(100) +, cc_market_manager varchar(40) +, cc_division int +, cc_division_name varchar(50) +, cc_company int +, cc_company_name char(50) +, cc_street_number char(10) +, cc_street_name varchar(60) +, cc_street_type char(15) +, cc_suite_number char(10) +, cc_city varchar(60) +, cc_county varchar(30) +, cc_state char(2) +, cc_zip char(10) +, cc_country varchar(20) +, cc_gmt_offset decimal(5,2) +, cc_tax_percentage decimal(5,2) +); +create table if not exists catalog_page( + cp_catalog_page_sk bigint +, cp_catalog_page_id char(16) +, cp_start_date_sk bigint +, cp_end_date_sk bigint +, cp_department varchar(50) +, cp_catalog_number int +, cp_catalog_page_number int +, cp_description varchar(100) +, cp_type varchar(100) +); +create table if not exists web_site( + web_site_sk bigint +, web_site_id char(16) +, web_rec_start_date date +, web_rec_end_date date +, web_name varchar(50) +, web_open_date_sk bigint +, web_close_date_sk bigint +, web_class varchar(50) +, web_manager varchar(40) +, web_mkt_id int +, web_mkt_class varchar(50) +, web_mkt_desc varchar(100) +, web_market_manager varchar(40) +, web_company_id int +, web_company_name char(50) +, web_street_number char(10) +, web_street_name varchar(60) +, web_street_type char(15) +, web_suite_number char(10) +, web_city varchar(60) +, web_county varchar(30) +, web_state char(2) +, web_zip char(10) +, web_country varchar(20) +, web_gmt_offset decimal(5,2) +, web_tax_percentage decimal(5,2) +); +create table if not exists web_page( + wp_web_page_sk bigint +, wp_web_page_id char(16) +, wp_rec_start_date date +, wp_rec_end_date date +, wp_creation_date_sk bigint +, wp_access_date_sk bigint +, wp_autogen_flag char(1) +, wp_customer_sk bigint +, wp_url varchar(100) +, wp_type char(50) +, wp_char_count int +, wp_link_count int +, wp_image_count int +, wp_max_ad_count int +); +create table if not exists warehouse( + w_warehouse_sk bigint +, w_warehouse_id char(16) +, w_warehouse_name varchar(20) +, w_warehouse_sq_ft int +, w_street_number char(10) +, w_street_name varchar(60) +, w_street_type char(15) +, w_suite_number char(10) +, w_city varchar(60) +, w_county varchar(30) +, w_state char(2) +, w_zip char(10) +, w_country varchar(20) +, w_gmt_offset decimal(5,2) +); +create table if not exists customer( + c_customer_sk bigint +, c_customer_id char(16) +, c_current_cdemo_sk bigint +, c_current_hdemo_sk bigint +, c_current_addr_sk bigint +, c_first_shipto_date_sk bigint +, c_first_sales_date_sk bigint +, c_salutation char(10) +, c_first_name char(20) +, c_last_name char(30) +, c_preferred_cust_flag char(1) +, c_birth_day int +, c_birth_month int +, c_birth_year int +, c_birth_country varchar(20) +, c_login char(13) +, c_email_address char(50) +, c_last_review_date_sk bigint +); +create table if not exists customer_address( + ca_address_sk bigint +, ca_address_id char(16) +, ca_street_number char(10) +, ca_street_name varchar(60) +, ca_street_type char(15) +, ca_suite_number char(10) +, ca_city varchar(60) +, ca_county varchar(30) +, ca_state char(2) +, ca_zip char(10) +, ca_country varchar(20) +, ca_gmt_offset decimal(5,2) +, ca_location_type char(20) +); +create table if not exists customer_demographics( + cd_demo_sk bigint +, cd_gender char(1) +, cd_marital_status char(1) +, cd_education_status char(20) +, cd_purchase_estimate int +, cd_credit_rating char(10) +, cd_dep_count int +, cd_dep_employed_count int +, cd_dep_college_count int +); +create table if not exists date_dim( + d_date_sk bigint +, d_date_id char(16) +, d_date date +, d_month_seq int +, d_week_seq int +, d_quarter_seq int +, d_year int +, d_dow int +, d_moy int +, d_dom int +, d_qoy int +, d_fy_year int +, d_fy_quarter_seq int +, d_fy_week_seq int +, d_day_name char(9) +, d_quarter_name char(6) +, d_holiday char(1) +, d_weekend char(1) +, d_following_holiday char(1) +, d_first_dom int +, d_last_dom int +, d_same_day_ly int +, d_same_day_lq int +, d_current_day char(1) +, d_current_week char(1) +, d_current_month char(1) +, d_current_quarter char(1) +, d_current_year char(1) +); +create table if not exists household_demographics( + hd_demo_sk bigint +, hd_income_band_sk bigint +, hd_buy_potential char(15) +, hd_dep_count int +, hd_vehicle_count int +); +create table if not exists item( + i_item_sk bigint +, i_item_id char(16) +, i_rec_start_date date +, i_rec_end_date date +, i_item_desc varchar(200) +, i_current_price decimal(7,2) +, i_wholesale_cost decimal(7,2) +, i_brand_id int +, i_brand char(50) +, i_class_id int +, i_class char(50) +, i_category_id int +, i_category char(50) +, i_manufact_id int +, i_manufact char(50) +, i_size char(20) +, i_formulation char(20) +, i_color char(20) +, i_units char(10) +, i_container char(10) +, i_manager_id int +, i_product_name char(50) +); +create table if not exists income_band( + ib_income_band_sk bigint +, ib_lower_bound int +, ib_upper_bound int +); +create table if not exists promotion( + p_promo_sk bigint +, p_promo_id char(16) +, p_start_date_sk bigint +, p_end_date_sk bigint +, p_item_sk bigint +, p_cost decimal(15,2) +, p_response_target int +, p_promo_name char(50) +, p_channel_dmail char(1) +, p_channel_email char(1) +, p_channel_catalog char(1) +, p_channel_tv char(1) +, p_channel_radio char(1) +, p_channel_press char(1) +, p_channel_event char(1) +, p_channel_demo char(1) +, p_channel_details varchar(100) +, p_purpose char(15) +, p_discount_active char(1) +); +create table if not exists reason( + r_reason_sk bigint +, r_reason_id char(16) +, r_reason_desc char(100) +); +create table if not exists ship_mode( + sm_ship_mode_sk bigint +, sm_ship_mode_id char(16) +, sm_type char(30) +, sm_code char(10) +, sm_carrier char(20) +, sm_contract char(20) +); +create table if not exists time_dim( + t_time_sk bigint +, t_time_id char(16) +, t_time int +, t_hour int +, t_minute int +, t_second int +, t_am_pm char(2) +, t_shift char(20) +, t_sub_shift char(20) +, t_meal_time char(20) +); \ No newline at end of file diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRuleSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRuleSuite.scala index 23a7831287f13f3651fa51c453c5305cb4b63939..a173eba2f0bd11f56a6dccdd6297813279e28c52 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRuleSuite.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewAggregateRuleSuite.scala @@ -162,7 +162,7 @@ class MaterializedViewAggregateRuleSuite extends RewriteSuite { } test("mv_agg3_3") { - // group by column(is empty) is subset of view,,additional agg on view column + // group by column(is empty) is subset of view,additional agg on view column val sql = """ |SELECT sum(c.integertype) as _sum, @@ -232,8 +232,112 @@ class MaterializedViewAggregateRuleSuite extends RewriteSuite { comparePlansAndRows(sql, "default", "mv_agg4", noData = false) } + test("mv_agg4_3") { + // group by column,agg is same to view, join more + val sql = + """ + |SELECT c.empid,c.deptno,c.locationid,sum(c.integertype) as _sum, + |max(c.longtype) as _max,min(c.floattype) as _min, + |count(c.doubletype) as _count,count(distinct c.datetype) as _count_distinct, + |avg(c.decimaltype) as _avg + |FROM column_type c JOIN emps e JOIN locations l + |ON c.empid=e.empid + |AND c.locationid=l.locationid + |AND c.empid=1 + |GROUP BY c.empid,c.deptno,c.locationid; + |""".stripMargin + comparePlansAndRows(sql, "default", "mv_agg4", noData = false) + } + test("mv_agg4_disable") { val sql = "ALTER MATERIALIZED VIEW mv_agg4 DISABLE REWRITE;" spark.sql(sql).show() } + + test("mv_agg5") { + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_agg5; + |""".stripMargin + ) + spark.sql( + """ + |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_agg5 + |AS + |SELECT c.empid,c.deptno,c.locationid,sum(c.integertype) as _sum, + |max(c.longtype) as _max,min(c.floattype) as _min, + |count(c.doubletype) as _count,count(distinct c.datetype) as _count_distinct, + |avg(c.decimaltype) as _avg + |FROM column_type c JOIN emps e + |ON c.empid=e.empid + |AND c.empid=1 + |GROUP BY c.empid,c.deptno,c.locationid; + |""".stripMargin + ) + } + + test("mv_agg5_1") { + val sql = + """ + |SELECT c.empid,c.deptno,sum(c.integertype) as _sum, + |max(c.longtype) as _max,min(c.floattype) as _min, + |count(c.doubletype) as _count + |FROM column_type c JOIN emps e + |ON c.empid=e.empid + |AND c.empid=1 + |GROUP BY c.empid,c.deptno; + |""".stripMargin + comparePlansAndRows(sql, "default", "mv_agg5", noData = false) + } + + test("mv_agg5_2") { + val sql = + """ + |SELECT c.empid,c.deptno,count(distinct c.datetype) as _count_distinct + |FROM column_type c JOIN emps e + |ON c.empid=e.empid + |AND c.empid=1 + |GROUP BY c.empid,c.deptno; + |""".stripMargin + comparePlansAndRows(sql, "default", "mv_agg5", noData = false) + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_agg5; + |""".stripMargin + ) + } + + test("mv_agg6") { + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_agg6; + |""".stripMargin + ) + spark.sql( + """ + |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_agg6 + |AS + |SELECT c.empid,c.deptno,c.locationid,sum(distinct c.integertype) as _sum, + |max(distinct c.longtype) as _max,min(distinct c.floattype) as _min, + |count(distinct c.doubletype) as _count + |FROM column_type c + |GROUP BY c.empid,c.deptno,c.locationid; + |""".stripMargin + ) + + val sql = + """ + |SELECT c.empid,c.deptno,sum(distinct c.integertype) as _sum, + |max(distinct c.longtype) as _max,min(distinct c.floattype) as _min, + |count(distinct c.doubletype) as _count + |FROM column_type c + |GROUP BY c.empid,c.deptno; + |""".stripMargin + comparePlansAndRows(sql, "default", "mv_agg6", noData = false) + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_agg6; + |""".stripMargin + ) + } } diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewFilterRuleSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewFilterRuleSuite.scala index 473d3fde12dce5e5e3a8710f727a53c2b28ee001..f34e2b74e2bbdadeca072d6a0166cb030e3433b0 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewFilterRuleSuite.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewFilterRuleSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer.rules +import com.huawei.boostkit.spark.util.RewriteHelper + class MaterializedViewFilterRuleSuite extends RewriteSuite { test("mv_filter1") { @@ -273,7 +275,100 @@ class MaterializedViewFilterRuleSuite extends RewriteSuite { } test("mv_filter2_disable") { - val sql = "ALTER MATERIALIZED VIEW mv_filter1 DISABLE REWRITE;" + val sql = "ALTER MATERIALIZED VIEW mv_filter2 DISABLE REWRITE;" spark.sql(sql).show() } + + test("mv_filter_rand") { + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_filter_rand; + |""".stripMargin + ) + spark.sql( + """ + |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_filter_rand + |AS + |SELECT * FROM COLUMN_TYPE WHERE doubletype=rand(1) + |""".stripMargin + ) + val sql = + """ + |SELECT * FROM COLUMN_TYPE WHERE doubletype=rand() + |""".stripMargin + compareNotRewriteAndRows(sql, noData = true) + + RewriteHelper.enableCachePlugin() + val sql2 = + """ + |SELECT * FROM COLUMN_TYPE WHERE doubletype=rand(1) + |""".stripMargin + comparePlansAndRows(sql2, "default", "mv_filter_rand", noData = true) + + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_filter_rand; + |""".stripMargin + ) + } + + test("mv_filter_if") { + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_filter_if; + |""".stripMargin + ) + spark.sql( + """ + |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_filter_if + |AS + |SELECT e.empid,e.deptno,if(e.deptno<2,e.empid,e.deptno) as _if FROM emps e; + |""".stripMargin + ) + val sql = "SELECT if(e.deptno<3,e.empid,e.deptno) as _if FROM emps e" + comparePlansAndRows(sql, "default", "mv_filter_if", noData = false) + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_filter_if; + |""".stripMargin + ) + } + + test("mv_filter3") { + // or + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_filter3; + |""".stripMargin + ) + + spark.sql( + """ + |CREATE MATERIALIZED VIEW IF NOT EXISTS mv_filter3 + |AS + |SELECT * FROM COLUMN_TYPE WHERE empid=3 + |OR bytetype>1 OR shorttype<5 OR integertype>=1 OR longtype<=5 + |OR floattype in(3.0) OR doubletype not in(2.0) + |OR datetype between DATE '2021-01-01' and DATE '2023-01-01' + |OR stringtype='stringtype3' + |OR timestamptype is not null OR decimaltype is null; + |""".stripMargin + ) + + val sql = + """ + |SELECT * FROM COLUMN_TYPE WHERE empid=3 + |OR bytetype>1 OR shorttype<5 OR integertype>=1 OR longtype<=5 + |OR floattype in(3.0) OR doubletype not in(2.0) + |OR datetype between DATE '2021-01-01' and DATE '2023-01-01' + |OR stringtype='stringtype3' + |OR timestamptype is not null OR decimaltype is null; + |""".stripMargin + comparePlansAndRows(sql, "default", "mv_filter3", noData = false) + spark.sql( + """ + |DROP MATERIALIZED VIEW IF EXISTS mv_filter3; + |""".stripMargin + ) + } } diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRuleSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRuleSuite.scala index df7a1f2559908636068ee5fe6d0a1f1b8f1e3e32..da9e4faf2961fd039181e9e449d044efcded889b 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRuleSuite.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/MaterializedViewJoinRuleSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer.rules +import com.huawei.boostkit.spark.util.RewriteHelper + class MaterializedViewJoinRuleSuite extends RewriteSuite { test("mv_join1") { @@ -150,13 +152,15 @@ class MaterializedViewJoinRuleSuite extends RewriteSuite { // view tables is same to query, equal tables val sql = """ - |SELECT e.*,c1.stringtype + |SELECT e.*,c2.stringtype |FROM emps e JOIN column_type c1 JOIN column_type c2 |ON e.deptno=c1.deptno AND e.deptno=c2.deptno |AND c1.deptno!=2 |AND c2.deptno=1; |""".stripMargin comparePlansAndRows(sql, "default", "mv_join2", noData = false) + RewriteHelper.enableCachePlugin() + comparePlansAndRows(sql, "default", "mv_join2", noData = false) } test("mv_join2_disable") { diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/RewriteSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/RewriteSuite.scala index 8ddc9dac1602892fa37eaef57d06ac529c93e272..e913b59ac4329cbd4a9765a20264cc4869f8d5dc 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/RewriteSuite.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/RewriteSuite.scala @@ -15,7 +15,6 @@ * limitations under the License. */ - package org.apache.spark.sql.catalyst.optimizer.rules import com.huawei.boostkit.spark.conf.OmniCachePluginConfig @@ -44,6 +43,7 @@ class RewriteSuite extends SparkFunSuite with PredicateHelper { .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.ui.port", "4050") // .config("spark.sql.planChangeLog.level","WARN") + .config("spark.sql.omnicache.logLevel", "WARN") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN") diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/TpcdsSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/TpcdsSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..42adf96cce46b18647958a63928638673a00fa05 --- /dev/null +++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/optimizer/rules/TpcdsSuite.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer.rules + +import org.apache.commons.io.IOUtils +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.TableIdentifier + +class TpcdsSuite extends RewriteSuite { + + def createTable(): Unit = { + if (catalog.tableExists(TableIdentifier("store_sales"))) { + return + } + val fis = this.getClass.getResourceAsStream("/tpcds_ddl.sql") + val lines = IOUtils.readLines(fis, "UTF-8") + IOUtils.closeQuietly(fis) + + var sqls = Seq.empty[String] + val sql = mutable.StringBuilder.newBuilder + lines.forEach { line => + sql.append(line) + sql.append(" ") + if (line.contains(';')) { + sqls +:= sql.toString() + sql.clear() + } + } + sqls.foreach { sql => + spark.sql(sql) + } + } + + createTable() + + test("subQuery outReference") { + spark.sql("DROP MATERIALIZED VIEW IF EXISTS mv536") + spark.sql( + """ + |CREATE MATERIALIZED VIEW IF NOT EXISTS mv536 PARTITIONED BY (ws_sold_date_sk) AS + | SELECT + | web_sales.ws_ext_discount_amt, + | item.i_item_sk, + | web_sales.ws_sold_date_sk, + | web_sales.ws_item_sk, + | item.i_manufact_id + |FROM + | web_sales, + | item + |WHERE + | item.i_manufact_id = 350 + | AND web_sales.ws_item_sk = item.i_item_sk + |distribute by ws_sold_date_sk; + |""".stripMargin + ) + val sql = + """ + |SELECT sum(ws_ext_discount_amt) AS `Excess Discount Amount ` + |FROM web_sales, item, date_dim + |WHERE i_manufact_id = 350 + | AND i_item_sk = ws_item_sk + | AND d_date BETWEEN '2000-01-27' AND (cast('2000-01-27' AS DATE) + INTERVAL 90 days) + | AND d_date_sk = ws_sold_date_sk + | AND ws_ext_discount_amt > + | ( + | SELECT 1.3 * avg(ws_ext_discount_amt) + | FROM web_sales, date_dim + | WHERE ws_item_sk = i_item_sk + | AND d_date BETWEEN '2000-01-27' AND (cast('2000-01-27' AS DATE) + INTERVAL 90 days) + | AND d_date_sk = ws_sold_date_sk + | ) + |ORDER BY sum(ws_ext_discount_amt) + |LIMIT 100 + | + |""".stripMargin + compareNotRewriteAndRows(sql, noData = true) + spark.sql("DROP MATERIALIZED VIEW IF EXISTS mv536") + } +} diff --git a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlParserSuite.scala b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlParserSuite.scala index 7b40f3111d1b50cf43c3c92287c0d4930977ecf9..273197c91f3a572c246f645bfc7411a6caa51c68 100644 --- a/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlParserSuite.scala +++ b/omnicache/omnicache-spark-extension/plugin/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlParserSuite.scala @@ -560,6 +560,7 @@ class SqlParserSuite extends RewriteSuite { } test("mv_is_cached") { + spark.sql("DROP MATERIALIZED VIEW IF EXISTS mv_create2;") spark.sql( """ |ALTER MATERIALIZED VIEW default.mv_create1 ENABLE REWRITE @@ -568,10 +569,10 @@ class SqlParserSuite extends RewriteSuite { ).show() val sql1 = "SELECT * FROM column_type" comparePlansAndRows(sql1, "default", "mv_create1", noData = false) - + RewriteHelper.enableCachePlugin() spark.sql( """ - |ALTER MATERIALIZED VIEW default.mv_create1 ENABLE REWRITE + |ALTER MATERIALIZED VIEW default.mv_create1 DISABLE REWRITE |; |""".stripMargin ).show() diff --git a/omnicache/omnicache-spark-extension/pom.xml b/omnicache/omnicache-spark-extension/pom.xml index 1e068b0af35e0d623bc2023af4742be9658548fc..28712374061c30dc3b86ffa62244d1c851fd807e 100644 --- a/omnicache/omnicache-spark-extension/pom.xml +++ b/omnicache/omnicache-spark-extension/pom.xml @@ -12,6 +12,7 @@ plugin + log-parser BoostKit Spark MaterializedView Sql Engine Extension Parent Pom @@ -34,6 +35,7 @@ 3.1.2 1.4.11 8.29 +