diff --git a/omniadvisor/pom.xml b/omniadvisor/pom.xml
index 74ad839147c027537db0dee19ef162f0df6743e7..d72c33d17aa732bcbca24f960e15d14c3c1ad05c 100644
--- a/omniadvisor/pom.xml
+++ b/omniadvisor/pom.xml
@@ -305,6 +305,10 @@
org.apache.commons
commons-text
+
+ org.apache.commons
+ commons-compress
+
com.google.guava
guava
diff --git a/omniadvisor/src/main/scala/org/apache/spark/SparkApplicationDataExtractor.scala b/omniadvisor/src/main/scala/org/apache/spark/SparkApplicationDataExtractor.scala
index 65368433a2bf733b6c5bdb63784980528765a712..b15800ef498fa27c58c80a1d0133432066e6486a 100644
--- a/omniadvisor/src/main/scala/org/apache/spark/SparkApplicationDataExtractor.scala
+++ b/omniadvisor/src/main/scala/org/apache/spark/SparkApplicationDataExtractor.scala
@@ -17,11 +17,10 @@ package org.apache.spark
import com.huawei.boostkit.omniadvisor.fetcher.FetcherType
import com.huawei.boostkit.omniadvisor.models.AppResult
-import com.huawei.boostkit.omniadvisor.spark.utils.ScalaUtils.parseMapToJsonString
-import com.huawei.boostkit.omniadvisor.spark.utils.ScalaUtils.checkSuccess
+import com.huawei.boostkit.omniadvisor.spark.utils.ScalaUtils.{checkSuccess, parseMapToJsonString}
import com.huawei.boostkit.omniadvisor.spark.utils.SparkUtils
import com.nimbusds.jose.util.StandardCharset
-import org.apache.spark.sql.execution.ui.SQLExecutionUIData
+import org.apache.spark.sql.execution.ui.{SQLExecutionUIData, SparkPlanGraph}
import org.apache.spark.status.api.v1._
import org.slf4j.{Logger, LoggerFactory}
@@ -41,7 +40,8 @@ object SparkApplicationDataExtractor {
workload: String,
environmentInfo: ApplicationEnvironmentInfo,
jobsList: Seq[JobData],
- sqlExecutionsList: Seq[SQLExecutionUIData]): AppResult = {
+ sqlExecutionsList: Seq[SQLExecutionUIData],
+ sqlGraphMap: mutable.Map[Long, SparkPlanGraph]): AppResult = {
val appResult = new AppResult
appResult.applicationId = appInfo.id
appResult.applicationName = appInfo.name
@@ -57,7 +57,7 @@ object SparkApplicationDataExtractor {
val attempt: ApplicationAttemptInfo = lastAttempt(appInfo)
if (attempt.completed && jobsList.nonEmpty && checkSuccess(jobsList)) {
- saveSuccessfulStatus(appResult, jobsList, sqlExecutionsList)
+ saveSuccessfulStatus(appResult, jobsList, sqlExecutionsList, sqlGraphMap)
} else {
saveFailedStatus(appResult, attempt)
}
@@ -112,7 +112,7 @@ object SparkApplicationDataExtractor {
appResult.query = ""
}
- private def saveSuccessfulStatus(appResult: AppResult, jobsList: Seq[JobData], sqlExecutionsList: Seq[SQLExecutionUIData]): Unit = {
+ private def saveSuccessfulStatus(appResult: AppResult, jobsList: Seq[JobData], sqlExecutionsList: Seq[SQLExecutionUIData], sqlGraphMap: mutable.Map[Long, SparkPlanGraph]): Unit = {
appResult.executionStatus = AppResult.SUCCEEDED_STATUS
val (startTime, finishTime) = extractJobsTime(jobsList)
@@ -122,7 +122,7 @@ object SparkApplicationDataExtractor {
finishTime - startTime else AppResult.FAILED_JOB_DURATION
if (appResult.submit_method.equals(AppResult.SPARK_SQL)) {
- appResult.query = extractQuerySQL(sqlExecutionsList)
+ appResult.query = extractQuerySQL(sqlExecutionsList, sqlGraphMap)
} else {
appResult.query = ""
}
@@ -153,16 +153,32 @@ object SparkApplicationDataExtractor {
(startTime, finishTime)
}
- private def extractQuerySQL(sqlExecutionsList: Seq[SQLExecutionUIData]): String = {
+ private def extractQuerySQL(sqlExecutionsList: Seq[SQLExecutionUIData], sqlGraphMap: mutable.Map[Long, SparkPlanGraph]): String = {
require(sqlExecutionsList.nonEmpty)
- val nonEmptyDescriptions = sqlExecutionsList.flatMap { execution =>
- Option(execution.description).filter(_.nonEmpty)
+ val descriptions: mutable.ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]
+ var previousExecutionDesc: Option[String] = None
+ for ((execution, index) <- sqlExecutionsList.zipWithIndex) {
+ if (index > 0) {
+ val sqlGraph = sqlGraphMap(execution.executionId)
+ if (execution.description.nonEmpty && !isDuplicatedQuery(execution, previousExecutionDesc, sqlGraph)) {
+ descriptions += execution.description.trim
+ }
+ } else {
+ if (execution.description.nonEmpty) {
+ descriptions += execution.description.trim
+ }
+ }
+ previousExecutionDesc = Some(execution.description)
}
- nonEmptyDescriptions.mkString(";")
+ descriptions.mkString(";\n")
}
private def lastAttempt(applicationInfo: ApplicationInfo): ApplicationAttemptInfo = {
require(applicationInfo.attempts.nonEmpty)
applicationInfo.attempts.last
}
+
+ private def isDuplicatedQuery(execution: SQLExecutionUIData, previousExecutionDesc: Option[String], sqlGraph: SparkPlanGraph): Boolean = {
+ execution.description.equals(previousExecutionDesc.getOrElse("")) && sqlGraph.allNodes.size == 1 && sqlGraph.allNodes.head.name.equals("LocalTableScan")
+ }
}
diff --git a/omniadvisor/src/main/scala/org/apache/spark/SparkDataCollection.scala b/omniadvisor/src/main/scala/org/apache/spark/SparkDataCollection.scala
index 236469cce9da0828965d2a694a426e214069e1fb..aa5a4bf7d8b1c68c249575acd90e94518a62376e 100644
--- a/omniadvisor/src/main/scala/org/apache/spark/SparkDataCollection.scala
+++ b/omniadvisor/src/main/scala/org/apache/spark/SparkDataCollection.scala
@@ -16,16 +16,17 @@
package org.apache.spark
import com.huawei.boostkit.omniadvisor.models.AppResult
-import org.apache.spark.status.api.v1
-import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
import org.apache.spark.internal.config.Status.ASYNC_TRACKING_ENABLED
import org.apache.spark.scheduler.ReplayListenerBus
+import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLExecutionUIData, SparkPlanGraph}
+import org.apache.spark.status.api.v1
import org.apache.spark.status.{AppStatusListener, AppStatusStore, ElementTrackingStore}
-import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLExecutionUIData}
import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
import org.slf4j.{Logger, LoggerFactory}
import java.io.InputStream
+import scala.collection.mutable
class SparkDataCollection {
val LOG: Logger = LoggerFactory.getLogger(classOf[SparkDataCollection])
@@ -36,6 +37,7 @@ class SparkDataCollection {
var jobsList: Seq[v1.JobData] = _
var appInfo: v1.ApplicationInfo = _
var sqlExecutionsList: Seq[SQLExecutionUIData] = _
+ var sqlGraphMap: mutable.Map[Long, SparkPlanGraph] = _
def replayEventLogs(in: InputStream, sourceName: String): Unit = {
@@ -66,12 +68,22 @@ class SparkDataCollection {
val sqlAppStatusStore: SQLAppStatusStore = new SQLAppStatusStore(store)
sqlExecutionsList = sqlAppStatusStore.executionsList()
+ sqlGraphMap = mutable.HashMap.empty[Long, SparkPlanGraph]
+ sqlExecutionsList.foreach { sqlExecution =>
+ try {
+ val planGraph = sqlAppStatusStore.planGraph(sqlExecution.executionId)
+ sqlGraphMap.put(sqlExecution.executionId, planGraph)
+ } catch {
+ case e: Exception =>
+ LOG.warn(s"Get PlanGraph for SQLExecution [${sqlExecution.executionId}] in ${appInfo.id} failed")
+ }
+ }
appStatusStore.close()
}
def getAppResult(workload: String): AppResult = {
- SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(appInfo, workload, environmentInfo, jobsList, sqlExecutionsList)
+ SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(appInfo, workload, environmentInfo, jobsList, sqlExecutionsList, sqlGraphMap)
}
private def createInMemoryStore(): KVStore = {
diff --git a/omniadvisor/src/test/java/org/apache/spark/TestSparkApplicationDataExtractor.java b/omniadvisor/src/test/java/org/apache/spark/TestSparkApplicationDataExtractor.java
index 7eed130902a290b108988aadbedd3db301053c24..02953cb7cc46400f5de9343995b8e07dfbf47d28 100644
--- a/omniadvisor/src/test/java/org/apache/spark/TestSparkApplicationDataExtractor.java
+++ b/omniadvisor/src/test/java/org/apache/spark/TestSparkApplicationDataExtractor.java
@@ -113,7 +113,7 @@ public class TestSparkApplicationDataExtractor {
sqlExecutionList.add(new SQLExecutionUIData(1, sqlQ10, "", "", asScalaBuffer(ImmutableList.of()), 0, Option.apply(new Date()), new HashMap<>(), new HashSet<>(), new HashMap<>()));
sqlExecutionList.add(new SQLExecutionUIData(2, sqlQ11, "", "", asScalaBuffer(ImmutableList.of()), 0, Option.apply(new Date()), new HashMap<>(), new HashSet<>(), new HashMap<>()));
- AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(jobsList), asScalaBuffer(sqlExecutionList));
+ AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(jobsList), asScalaBuffer(sqlExecutionList), null);
assertEquals(result.applicationId, "id");
assertEquals(result.durationTime, 15 * 60 * 1000L);
assertEquals(result.submit_method, "spark-sql");
@@ -134,7 +134,7 @@ public class TestSparkApplicationDataExtractor {
when(environmentInfo.sparkProperties()).thenReturn(asScalaBuffer(clientSparkProperties));
when(environmentInfo.systemProperties()).thenReturn(asScalaBuffer(systemProperties));
- AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(jobsList), asScalaBuffer(ImmutableList.of()));
+ AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(jobsList), asScalaBuffer(ImmutableList.of()), null);
assertEquals(result.applicationId, "id");
assertEquals(result.durationTime, 15 * 60 * 1000L);
assertEquals(result.submit_method, "spark-submit");
@@ -154,7 +154,7 @@ public class TestSparkApplicationDataExtractor {
when(environmentInfo.sparkProperties()).thenReturn(asScalaBuffer(clusterSparkProperties));
when(environmentInfo.systemProperties()).thenReturn(asScalaBuffer(systemProperties));
- AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(jobsList), asScalaBuffer(ImmutableList.of()));
+ AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(jobsList), asScalaBuffer(ImmutableList.of()), null);
assertEquals(result.applicationId, "id");
assertEquals(result.durationTime, 15 * 60 * 1000L);
assertEquals(result.submit_method, "spark-submit");
@@ -169,7 +169,7 @@ public class TestSparkApplicationDataExtractor {
ApplicationEnvironmentInfo environmentInfo = Mockito.mock(ApplicationEnvironmentInfo.class);
when(environmentInfo.sparkProperties()).thenReturn(asScalaBuffer(ImmutableList.of()));
when(environmentInfo.systemProperties()).thenReturn(asScalaBuffer(ImmutableList.of()));
- AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(ImmutableList.of(runningData)), asScalaBuffer(ImmutableList.of()));
+ AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(ImmutableList.of(runningData)), asScalaBuffer(ImmutableList.of()), null);
assertEquals(result.applicationId, "id");
assertEquals(result.executionStatus, AppResult.FAILED_STATUS);
assertEquals(result.durationTime, AppResult.FAILED_JOB_DURATION);
@@ -181,7 +181,7 @@ public class TestSparkApplicationDataExtractor {
ApplicationEnvironmentInfo environmentInfo = Mockito.mock(ApplicationEnvironmentInfo.class);
when(environmentInfo.sparkProperties()).thenReturn(asScalaBuffer(ImmutableList.of()));
when(environmentInfo.systemProperties()).thenReturn(asScalaBuffer(ImmutableList.of()));
- AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(ImmutableList.of(failedData)), asScalaBuffer(ImmutableList.of()));
+ AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(ImmutableList.of(failedData)), asScalaBuffer(ImmutableList.of()), null);
assertEquals(result.applicationId, "id");
assertEquals(result.executionStatus, AppResult.FAILED_STATUS);
assertEquals(result.durationTime, AppResult.FAILED_JOB_DURATION);
@@ -193,7 +193,7 @@ public class TestSparkApplicationDataExtractor {
ApplicationEnvironmentInfo environmentInfo = Mockito.mock(ApplicationEnvironmentInfo.class);
when(environmentInfo.sparkProperties()).thenReturn(asScalaBuffer(ImmutableList.of()));
when(environmentInfo.systemProperties()).thenReturn(asScalaBuffer(ImmutableList.of()));
- AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(ImmutableList.of()), asScalaBuffer(ImmutableList.of()));
+ AppResult result = SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(ImmutableList.of()), asScalaBuffer(ImmutableList.of()), null);
assertEquals(result.applicationId, "id");
assertEquals(result.executionStatus, AppResult.FAILED_STATUS);
assertEquals(result.durationTime, AppResult.FAILED_JOB_DURATION);
@@ -205,6 +205,6 @@ public class TestSparkApplicationDataExtractor {
ApplicationEnvironmentInfo environmentInfo = Mockito.mock(ApplicationEnvironmentInfo.class);
when(environmentInfo.sparkProperties()).thenReturn(asScalaBuffer(ImmutableList.of()));
when(environmentInfo.systemProperties()).thenReturn(asScalaBuffer(ImmutableList.of()));
- SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(ImmutableList.of()), asScalaBuffer(ImmutableList.of()));
+ SparkApplicationDataExtractor.extractAppResultFromAppStatusStore(applicationInfo, TEST_WORK_LOAD, environmentInfo, asScalaBuffer(ImmutableList.of()), asScalaBuffer(ImmutableList.of()), null);
}
}
diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java
index 21e717a0d58d54bc52967f3416095426a0719c2b..6d83c383bced9885552cabd804a8afd34ee20d42 100644
--- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java
+++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java
@@ -18,10 +18,12 @@
package com.huawei.boostkit.hive;
+import static com.huawei.boostkit.hive.expression.TypeUtils.checkUnsupportedTimestamp;
import static com.huawei.boostkit.hive.expression.TypeUtils.checkOmniJsonWhiteList;
import static com.huawei.boostkit.hive.expression.TypeUtils.checkUnsupportedArithmetic;
import static com.huawei.boostkit.hive.expression.TypeUtils.checkUnsupportedCast;
import static com.huawei.boostkit.hive.expression.TypeUtils.convertHiveTypeToOmniType;
+import static com.huawei.boostkit.hive.expression.TypeUtils.isValidConversion;
import static nova.hetu.omniruntime.constants.FunctionType.OMNI_AGGREGATION_TYPE_AVG;
import static nova.hetu.omniruntime.constants.FunctionType.OMNI_AGGREGATION_TYPE_SUM;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
@@ -36,6 +38,7 @@ import static org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspe
import static org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory.STRING;
import static org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP;
import static org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory.VARCHAR;
+import static org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory.VOID;
import com.huawei.boostkit.hive.expression.BaseExpression;
import com.huawei.boostkit.hive.expression.CastFunctionExpression;
@@ -50,6 +53,7 @@ import nova.hetu.omniruntime.constants.FunctionType;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -93,6 +97,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
@@ -131,7 +136,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
public static final Set SUPPORTED_JOIN = new HashSet<>(Arrays.asList(JoinDesc.INNER_JOIN,
JoinDesc.LEFT_OUTER_JOIN, JoinDesc.FULL_OUTER_JOIN, JoinDesc.LEFT_SEMI_JOIN));
private static final Set SUPPORTED_TYPE = new HashSet<>(Arrays.asList(BOOLEAN,
- SHORT, INT, LONG, DOUBLE, STRING, DATE, DECIMAL, VARCHAR, CHAR));
+ SHORT, INT, LONG, DOUBLE, STRING, DATE, DECIMAL, VARCHAR, CHAR, VOID));
private static final int DECIMAL64_MAX_PRECISION = 19;
@@ -530,6 +535,21 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
if (tableMetadata != null && (!tableMetadata.getInputFormatClass().equals(OrcInputFormat.class) || tableMetadata.getParameters().getOrDefault("transactional", "").equals("true"))) {
return false;
}
+ if (tableScanDesc.isVectorized()) {
+ TypeInfo[] columnTypeInfos = ((VectorTableScanDesc) tableScanDesc.getVectorDesc()).getProjectedColumnTypeInfos();
+ for (int id : tableScanDesc.getNeededColumnIDs()) {
+ if (columnTypeInfos[id].getTypeName() == "timestamp") {
+ return false;
+ }
+ }
+ } else if (tableMetadata != null && tableMetadata.getCols() != null) {
+ List colList = tableMetadata.getCols();
+ for (int id : tableScanDesc.getNeededColumnIDs()) {
+ if (colList.get(id).getType() == "timestamp") {
+ return false;
+ }
+ }
+ }
List> childOperators = op.getChildOperators();
for (Operator childOperator : childOperators) {
if (childOperator.getType().equals(OperatorType.REDUCESINK) && reduceSinkDescUnReplaceable((ReduceSinkDesc) childOperator.getConf())) {
@@ -619,7 +639,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
return replaceable;
case FILTER:
List colList = Collections.singletonList(((FilterDesc) operator.getConf()).getPredicate());
- if (!isUDFSupport(colList) || !isLegalDeciConstant(colList)) {
+ if ((!isUDFSupport(colList) && !isLegalTimestamp(colList)) || !isLegalDeci(colList)) {
return false;
}
boolean result = true;
@@ -629,7 +649,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
for (Operator child : operator.getChildOperators()) {
if (child.getType() != null && child.getType().equals(OperatorType.SELECT)) {
SelectDesc conf = (SelectDesc) child.getConf();
- result = result && isUDFSupport(conf.getColList()) && isLegalDeciConstant(conf.getColList());
+ result = result && isUDFSupport(conf.getColList()) && isLegalDeci(conf.getColList());
}
}
return result;
@@ -698,6 +718,16 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
return false;
}
List windowFunctionDefs = ((WindowTableFunctionDef) conf.getFuncDef()).getWindowFunctions();
+ for (WindowFunctionDef functionDef : windowFunctionDefs) {
+ if (functionDef.getArgs() == null) {
+ continue;
+ }
+ for (PTFExpressionDef expressionDef : functionDef.getArgs()) {
+ if (expressionDef.getExprNode() != null && expressionDef.getExprNode().getTypeInfo().getTypeName() == "timestamp") {
+ return false;
+ }
+ }
+ }
if (!PTFSupportedAgg(windowFunctionDefs)) {
return false;
}
@@ -724,6 +754,12 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
if (!isUDFSupport(exprNodeDescList)) {
return false;
}
+ JoinCondDesc[] joinCondDescs = mapJoinDesc.getConds();
+ if (joinCondDescs.length >= 2) {
+ if (joinCondDescs[0].getType() == JoinDesc.LEFT_OUTER_JOIN && joinCondDescs[1].getType() == JoinDesc.LEFT_SEMI_JOIN) {
+ return false;
+ }
+ }
return joinReplaceable(operator);
}
@@ -783,12 +819,14 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
private boolean joinReplaceable(Operator extends OperatorDesc> operator) {
JoinDesc joinDesc = (JoinDesc) operator.getConf();
JoinCondDesc[] joinCondDescs = joinDesc.getConds();
- if (joinDesc.getConds()[0].getType() == JoinDesc.FULL_OUTER_JOIN && joinDesc.getKeysString().get(0) == null || !SUPPORTED_JOIN.contains(joinCondDescs[0].getType())) {
- return false;
+ for (JoinCondDesc joinCondDesc: joinCondDescs) {
+ if (joinCondDesc.getType() == JoinDesc.FULL_OUTER_JOIN && joinDesc.getKeysString().get("0") == null || !SUPPORTED_JOIN.contains(joinCondDesc.getType())) {
+ return false;
+ }
}
if (joinCondDescs.length >= 2) {
- for (int i = 0; i < joinCondDescs.length; i++) {
- if (joinCondDescs[i].getType() != JoinDesc.INNER_JOIN) {
+ for (List filters: joinDesc.getFilters().values()) {
+ if (!filters.isEmpty()) {
return false;
}
}
@@ -859,16 +897,16 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
return checkOmniJsonWhiteList("", expressions.toArray(new String[0]));
}
- private boolean isLegalDeciConstant(List colList) {
+ private boolean isLegalDeci(List colList) {
for (ExprNodeDesc desc : colList) {
- if (!checkDecimalConstant(desc)) {
+ if (!isValidConversion(desc)) {
return false;
}
}
if (colList.size() > 0 && colList.get(0).getChildren() != null) {
List childList = colList.get(0).getChildren();
for (ExprNodeDesc desc : childList) {
- if (!checkDecimalConstant(desc)) {
+ if (!isValidConversion(desc)) {
return false;
}
}
@@ -876,23 +914,16 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext {
return true;
}
- private boolean checkDecimalConstant(ExprNodeDesc desc) {
- if (desc instanceof ExprNodeGenericFuncDesc && desc.getChildren() != null && desc.getChildren().size() == 2) {
- List child = desc.getChildren();
- if (child.get(0) instanceof ExprNodeConstantDesc && child.get(1) instanceof ExprNodeColumnDesc) {
- Collections.swap(child, 0, 1);
+ private boolean isLegalTimestamp(List colList) {
+ for (ExprNodeDesc desc : colList) {
+ if (checkUnsupportedTimestamp(desc)) {
+ return false;
}
- if (child.get(0) instanceof ExprNodeColumnDesc && child.get(1) instanceof ExprNodeConstantDesc) {
- TypeInfo deciInfo = child.get(0).getTypeInfo();
- TypeInfo constInfo = child.get(1).getTypeInfo();
- if (!(deciInfo instanceof DecimalTypeInfo && constInfo instanceof DecimalTypeInfo)) {
- return true;
- }
- int deciPrecision = ((DecimalTypeInfo) deciInfo).getPrecision();
- int deciScale = ((DecimalTypeInfo) deciInfo).getScale();
- int constPrecision = ((DecimalTypeInfo) constInfo).getPrecision();
- int constScale = ((DecimalTypeInfo) constInfo).getScale();
- if (constPrecision - constScale > deciPrecision - deciScale || constScale > deciScale) {
+ }
+ if (colList.size() > 0 && colList.get(0).getChildren() != null) {
+ List childList = colList.get(0).getChildren();
+ for (ExprNodeDesc desc : childList) {
+ if (checkUnsupportedTimestamp(desc)) {
return false;
}
}
diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterOperator.java
index 8a8b2f550a196cb1e381fc9d3f5ab0f9f936018b..134b02c72b4c8e66d115d2c7e47b4b6533b045af 100644
--- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterOperator.java
+++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterOperator.java
@@ -23,6 +23,9 @@ import com.huawei.boostkit.hive.expression.ExpressionUtils;
import com.huawei.boostkit.hive.expression.ReferenceFactor;
import com.huawei.boostkit.hive.expression.TypeUtils;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
import nova.hetu.omniruntime.operator.OmniOperator;
import nova.hetu.omniruntime.operator.config.OperatorConfig;
import nova.hetu.omniruntime.operator.config.OverflowConfig;
@@ -32,7 +35,9 @@ import nova.hetu.omniruntime.type.DataType;
import nova.hetu.omniruntime.vector.VecBatch;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -57,6 +62,13 @@ public class OmniFilterOperator extends OmniHiveOperator impleme
private transient OmniOperator omniOperator;
private transient Iterator output;
+ private static Cache