exprNodeDescList) {
+ for (ExprNodeDesc desc : exprNodeDescList) {
+ for (ExprNodeDesc child : desc.getChildren()) {
+ if (child instanceof ExprNodeDynamicValueDesc || child instanceof ExprNodeDynamicListDesc) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
/**
* Todo: need to optimization
*
@@ -156,7 +174,7 @@ public class NdpFilter {
return newChildren.get(0);
} else {
return new ExprNodeGenericFuncDesc(oldFuncDesc.getTypeInfo(), oldFuncDesc.getGenericUDF(),
- oldFuncDesc.getFuncText(), newChildren);
+ oldFuncDesc.getFuncText(), newChildren);
}
}
@@ -176,8 +194,8 @@ public class NdpFilter {
break;
case UNSUPPORTED:
// operator unsupported
- LOG.info("OmniData Hive Part Filter failed to push down, since unsupported this Operator class: [{}]",
- operator);
+ LOG.info("OmniData Hive Filter do not to all push down, since unsupported this Operator class: [{}]",
+ funcDesc.getGenericUDF().getClass().getSimpleName());
unsupportedFilterDescList.add(funcDesc);
break;
default:
@@ -259,6 +277,10 @@ public class NdpFilter {
private boolean checkUdf(ExprNodeGenericFuncDesc funcDesc, NdpHiveOperatorEnum operator) {
int argumentIndex = 0;
if (operator.equals(NdpHiveOperatorEnum.BETWEEN)) {
+ // Between not support ExprNodeDynamicValueDesc
+ if (!checkBetween(funcDesc)) {
+ return false;
+ }
// first argument for BETWEEN should be boolean type
argumentIndex = 1;
}
@@ -269,8 +291,8 @@ public class NdpFilter {
if (udfFuncDesc instanceof ExprNodeGenericFuncDesc) {
// check whether the UDF supports push down
if (!NdpUdfEnum.checkUdfSupported(((ExprNodeGenericFuncDesc) udfFuncDesc).getGenericUDF())) {
- LOG.info("OmniData Hive Part Filter failed to push down, since unsupported this UDF class: [{}]",
- udfFuncDesc.getClass());
+ LOG.info("OmniData Hive Filter failed to all push down, since unsupported this UDF class: [{}]",
+ ((ExprNodeGenericFuncDesc) udfFuncDesc).getGenericUDF());
return false;
}
}
@@ -278,6 +300,11 @@ public class NdpFilter {
return true;
}
+ private boolean checkBetween(ExprNodeGenericFuncDesc funcDesc) {
+ return funcDesc.getChildren().get(2) instanceof ExprNodeConstantDesc && funcDesc.getChildren()
+ .get(3) instanceof ExprNodeConstantDesc;
+ }
+
/**
* Converting Hive Operators to Omnidata Operators
*
@@ -318,7 +345,7 @@ public class NdpFilter {
}
private NdpFilterBinaryTree(NdpOperator ndpOperator, List exprNodes,
- List ndpFilterLeafList) {
+ List ndpFilterLeafList) {
if (exprNodes.size() >= 2) {
this.ndpOperator = ndpOperator;
leftChild = new NdpFilterBinaryTree(exprNodes.get(0), ndpFilterLeafList);
@@ -375,8 +402,8 @@ public class NdpFilter {
restChildren.add((ExprNodeGenericFuncDesc) expr);
} else {
LOG.info(
- "OmniData Hive Filter failed to push down, since Method parseAndOrOperator() unsupported this [{}]",
- expr.getClass());
+ "OmniData Hive Filter failed to push down, since Method parseAndOrOperator() unsupported this [{}]",
+ expr.getClass());
setNdpLeafOperatorUnsupported();
}
}
@@ -388,8 +415,8 @@ public class NdpFilter {
}
} else {
LOG.info(
- "OmniData Hive Filter failed to push down, since Method parseAndOrOperator() unsupported this [{}]",
- leftExpr.getClass());
+ "OmniData Hive Filter failed to push down, since Method parseAndOrOperator() unsupported this [{}]",
+ leftExpr.getClass());
setNdpLeafOperatorUnsupported();
}
}
@@ -402,8 +429,8 @@ public class NdpFilter {
leftChild = new NdpFilterBinaryTree((ExprNodeGenericFuncDesc) expr, ndpFilterLeafList);
} else {
LOG.info(
- "OmniData Hive Filter failed to push down, since Method parseNotOperator() unsupported this [{}]",
- expr.getClass());
+ "OmniData Hive Filter failed to push down, since Method parseNotOperator() unsupported this [{}]",
+ expr.getClass());
setNdpLeafOperatorUnsupported();
}
} else {
@@ -428,8 +455,8 @@ public class NdpFilter {
public void parseFilterOperator(ExprNodeGenericFuncDesc exprNode) {
Class operator = (exprNode.getGenericUDF() instanceof GenericUDFBridge)
- ? ((GenericUDFBridge) exprNode.getGenericUDF()).getUdfClass()
- : exprNode.getGenericUDF().getClass();
+ ? ((GenericUDFBridge) exprNode.getGenericUDF()).getUdfClass()
+ : exprNode.getGenericUDF().getClass();
if (operator == GenericUDFOPAnd.class) {
ndpOperator = NdpOperator.AND;
} else if (operator == GenericUDFOPOr.class) {
@@ -462,8 +489,8 @@ public class NdpFilter {
ndpLeafOperator = NdpLeafOperator.LESS_THAN_OR_EQUAL;
} else if (operator == GenericUDFBetween.class) {
ndpOperator = (Boolean) ((ExprNodeConstantDesc) exprNode.getChildren().get(0)).getValue()
- ? NdpOperator.NOT
- : NdpOperator.LEAF;
+ ? NdpOperator.NOT
+ : NdpOperator.LEAF;
ndpLeafOperator = NdpLeafOperator.BETWEEN;
} else if (operator == GenericUDFIn.class) {
ndpOperator = NdpOperator.LEAF;
@@ -474,8 +501,8 @@ public class NdpFilter {
} else {
ndpOperator = NdpOperator.LEAF;
LOG.info(
- "OmniData Hive Filter failed to push down, since Method parseFilterOperator() unsupported this [{}]",
- operator);
+ "OmniData Hive Filter failed to push down, since Method parseFilterOperator() unsupported this [{}]",
+ operator);
setNdpLeafOperatorUnsupported();
}
}
@@ -516,4 +543,4 @@ public class NdpFilter {
UNSUPPORTED
}
-}
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/operator/predicate/NdpPredicateInfo.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/operator/predicate/NdpPredicateInfo.java
index 4c42daa06716b31000fa076c87edb99f6579d4a3..825c122d3f39bfc2a2a93314f44431b1dcc2cea3 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/operator/predicate/NdpPredicateInfo.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/operator/predicate/NdpPredicateInfo.java
@@ -24,10 +24,10 @@ import com.huawei.boostkit.omnidata.model.Predicate;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.hive.ql.omnidata.physical.NdpVectorizedRowBatchCtx;
+
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
/**
* Ndp Predicate Info
@@ -54,6 +54,10 @@ public class NdpPredicateInfo implements Serializable {
private List decodeTypesWithAgg;
+ private NdpVectorizedRowBatchCtx ndpVectorizedRowBatchCtx;
+
+ private String dataFormat;
+
public NdpPredicateInfo() {
}
@@ -63,12 +67,14 @@ public class NdpPredicateInfo implements Serializable {
@JsonCreator
public NdpPredicateInfo(@JsonProperty("isPushDown") boolean isPushDown,
- @JsonProperty("isPushDownAgg") boolean isPushDownAgg,
- @JsonProperty("isPushDownFilter") boolean isPushDownFilter,
- @JsonProperty("hasPartitionColumn") boolean hasPartitionColumn, @JsonProperty("predicate") Predicate predicate,
- @JsonProperty("outputColumns") List outputColumns,
- @JsonProperty("decodeTypes") List decodeTypes,
- @JsonProperty("decodeTypesWithAgg") List decodeTypesWithAgg) {
+ @JsonProperty("isPushDownAgg") boolean isPushDownAgg,
+ @JsonProperty("isPushDownFilter") boolean isPushDownFilter,
+ @JsonProperty("hasPartitionColumn") boolean hasPartitionColumn, @JsonProperty("predicate") Predicate predicate,
+ @JsonProperty("outputColumns") List outputColumns,
+ @JsonProperty("decodeTypes") List decodeTypes,
+ @JsonProperty("decodeTypesWithAgg") List decodeTypesWithAgg,
+ @JsonProperty("ndpVectorizedRowBatchCtx") NdpVectorizedRowBatchCtx ndpVectorizedRowBatchCtx,
+ @JsonProperty("dataFormat") String dataFormat) {
this.isPushDown = isPushDown;
this.isPushDownAgg = isPushDownAgg;
this.isPushDownFilter = isPushDownFilter;
@@ -77,6 +83,8 @@ public class NdpPredicateInfo implements Serializable {
this.outputColumns = outputColumns;
this.decodeTypes = decodeTypes;
this.decodeTypesWithAgg = decodeTypesWithAgg;
+ this.ndpVectorizedRowBatchCtx = ndpVectorizedRowBatchCtx;
+ this.dataFormat = dataFormat;
}
@JsonProperty
@@ -119,24 +127,37 @@ public class NdpPredicateInfo implements Serializable {
return decodeTypesWithAgg;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- NdpPredicateInfo that = (NdpPredicateInfo) o;
- return isPushDown == that.isPushDown && isPushDownAgg == that.isPushDownAgg
- && isPushDownFilter == that.isPushDownFilter && hasPartitionColumn == that.hasPartitionColumn
- && Objects.equals(predicate, that.predicate) && Objects.equals(outputColumns, that.outputColumns) && Objects
- .equals(decodeTypes, that.decodeTypes) && Objects.equals(decodeTypesWithAgg, that.decodeTypesWithAgg);
+ @JsonProperty
+ public NdpVectorizedRowBatchCtx getNdpVectorizedRowBatchCtx() {
+ return ndpVectorizedRowBatchCtx;
+ }
+
+ @JsonProperty
+ public String getDataFormat() {
+ return dataFormat;
+ }
+
+ public void setPredicate(Predicate predicate) {
+ this.predicate = predicate;
+ }
+
+ public void setOutputColumns(List outputColumns) {
+ this.outputColumns = outputColumns;
+ }
+
+ public void setDecodeTypes(List decodeTypes) {
+ this.decodeTypes = decodeTypes;
+ }
+
+ public void setDecodeTypesWithAgg(List decodeTypesWithAgg) {
+ this.decodeTypesWithAgg = decodeTypesWithAgg;
+ }
+
+ public void setNdpVectorizedRowBatchCtx(NdpVectorizedRowBatchCtx ndpVectorizedRowBatchCtx) {
+ this.ndpVectorizedRowBatchCtx = ndpVectorizedRowBatchCtx;
}
- @Override
- public int hashCode() {
- return Objects.hash(isPushDown, isPushDownAgg, isPushDownFilter, hasPartitionColumn, predicate, outputColumns,
- decodeTypes, decodeTypesWithAgg);
+ public void setDataFormat(String dataFormat) {
+ this.dataFormat = dataFormat;
}
-}
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanChecker.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanChecker.java
index 56fcecc36de9041737b6c20f73aeb71a3e8a8d6c..c2c1c23cb011649855a0e10b3e6e2e7f8422610a 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanChecker.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanChecker.java
@@ -31,18 +31,18 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
-import org.apache.hadoop.hive.ql.omnidata.config.NdpConf;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
import org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpHiveOperatorEnum;
import org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpUdfEnum;
import org.apache.hadoop.hive.ql.omnidata.operator.filter.NdpFilter.*;
import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusInfo;
-import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusManager;
import org.apache.hadoop.hive.ql.plan.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
/**
* Used to check the validity of the operation during the Ndp planning phase.
@@ -112,57 +112,32 @@ public class NdpPlanChecker {
* Currently, two data formats are supported: Orc and Parquet.
*
* @param tableScanOp TableScanOperator
- * @return true or false
+ * @param work mapWork
+ * @return 'orc' and 'parquet'
*/
- public static boolean checkDataFormat(TableScanOperator tableScanOp, BaseWork work) {
+ public static Optional getDataFormat(TableScanOperator tableScanOp, BaseWork work) {
String tableName = tableScanOp.getConf().getAlias();
+ String format = "";
// TableMetadata may be 'null'
if (tableScanOp.getConf().getTableMetadata() == null) {
if (work instanceof MapWork) {
PartitionDesc desc = ((MapWork) work).getAliasToPartnInfo().get(tableScanOp.getConf().getAlias());
if (desc != null) {
- String inputFormat = desc.getInputFileFormatClass().getSimpleName();
- String outputFormat = desc.getOutputFileFormatClass().getSimpleName();
- return checkDataFormat(inputFormat, outputFormat, tableName);
+ format = desc.getInputFileFormatClass().getSimpleName();
} else {
- LOG.info("Table [{}] failed to push down, since PartitionDesc is null",
- tableName);
- return false;
+ LOG.info("Table [{}] failed to push down, since PartitionDesc is null", tableName);
}
- } else {
- LOG.info("Table [{}] failed to push down, since unsupported this work: [{}]",
- tableName, work.getClass().getSimpleName());
- return false;
}
} else {
- String inputFormat = tableScanOp.getConf().getTableMetadata().getInputFormatClass().getSimpleName();
- String outputFormat = tableScanOp.getConf().getTableMetadata().getOutputFormatClass().getSimpleName();
- return checkDataFormat(inputFormat, outputFormat, tableName);
- }
- }
-
- /**
- * Currently, two data formats are supported: Orc and Parquet.
- *
- * @param inputFormat hive input data format
- * @param outputFormat hive output data format
- * @param tableName hive table name
- * @return true or false
- */
- public static boolean checkDataFormat(String inputFormat, String outputFormat, String tableName) {
- if (!(inputFormat.toLowerCase(Locale.ENGLISH).contains("orc") || inputFormat.toLowerCase(Locale.ENGLISH)
- .contains("parquet"))) {
- LOG.info("Table [{}] failed to push down, since unsupported this input data format: [{}]", tableName,
- inputFormat);
- return false;
+ format = tableScanOp.getConf().getTableMetadata().getInputFormatClass().getSimpleName();
}
- if (!(outputFormat.toLowerCase(Locale.ENGLISH).contains("orc") || outputFormat.toLowerCase(Locale.ENGLISH)
- .contains("parquet"))) {
- LOG.info("Table [{}] failed to push down, since unsupported this output data format: [{}]", tableName,
- inputFormat);
- return false;
+ if (format.toLowerCase(Locale.ENGLISH).contains("orc")) {
+ return Optional.of("orc");
+ } else if (format.toLowerCase(Locale.ENGLISH).contains("parquet")) {
+ return Optional.of("parquet");
+ } else {
+ return Optional.empty();
}
- return true;
}
/**
@@ -194,11 +169,6 @@ public class NdpPlanChecker {
return true;
}
- public static boolean checkPushDown(Configuration conf, boolean isPushDown) {
- return isPushDown && conf.get(NdpStatusManager.NDP_DATANODE_HOSTNAMES) != null
- && conf.get(NdpStatusManager.NDP_DATANODE_HOSTNAMES).length() > 0;
- }
-
/**
* Check whether the Udf in white list
*
@@ -262,17 +232,17 @@ public class NdpPlanChecker {
* @param vectorFilterOperator VectorFilterOperator
* @return true or false
*/
- public static ExprNodeGenericFuncDesc checkFilterOperator(VectorFilterOperator vectorFilterOperator) {
+ public static Optional checkFilterOperator(VectorFilterOperator vectorFilterOperator) {
if (vectorFilterOperator == null) {
- return null;
+ return Optional.empty();
}
ExprNodeDesc nodeDesc = vectorFilterOperator.getConf().getPredicate();
if (nodeDesc instanceof ExprNodeGenericFuncDesc) {
- return (ExprNodeGenericFuncDesc) nodeDesc;
+ return Optional.of((ExprNodeGenericFuncDesc) nodeDesc);
}
LOG.info("FilterOperator failed to push down, since unsupported this ExprNodeDesc: [{}]",
nodeDesc.getClass().getSimpleName());
- return null;
+ return Optional.empty();
}
/**
@@ -281,17 +251,17 @@ public class NdpPlanChecker {
* @param vectorSelectOperator VectorSelectOperator
* @return true or false
*/
- public static VectorSelectDesc checkSelectOperator(VectorSelectOperator vectorSelectOperator) {
+ public static Optional checkSelectOperator(VectorSelectOperator vectorSelectOperator) {
if (vectorSelectOperator == null) {
- return null;
+ return Optional.empty();
}
SelectDesc selectDesc = vectorSelectOperator.getConf();
if (selectDesc.getVectorDesc() instanceof VectorSelectDesc) {
- return (VectorSelectDesc) selectDesc.getVectorDesc();
+ return Optional.of((VectorSelectDesc) selectDesc.getVectorDesc());
}
LOG.info("VectorSelectOperator failed to push down, since unsupported this SelectDesc: [{}]",
selectDesc.getClass().getSimpleName());
- return null;
+ return Optional.empty();
}
/**
@@ -300,40 +270,39 @@ public class NdpPlanChecker {
* @param vectorGroupByOperator VectorGroupByOperator
* @return true or false
*/
- public static GroupByDesc checkGroupByOperator(VectorGroupByOperator vectorGroupByOperator) {
- if (vectorGroupByOperator != null) {
+ public static Optional checkGroupByOperator(VectorGroupByOperator vectorGroupByOperator) {
+ if (vectorGroupByOperator != null && vectorGroupByOperator.getVectorDesc() instanceof VectorGroupByDesc) {
VectorGroupByDesc aggVectorsDesc = (VectorGroupByDesc) vectorGroupByOperator.getVectorDesc();
// Agg or groupby can be pushed down only when agg or groupby exists.
if (aggVectorsDesc.getKeyExpressions().length > 0 || aggVectorsDesc.getVecAggrDescs().length > 0) {
for (VectorAggregationDesc agg : aggVectorsDesc.getVecAggrDescs()) {
if (!checkAggregationDesc(agg.getAggrDesc())) {
- return null;
+ return Optional.empty();
}
}
- return vectorGroupByOperator.getConf();
+ return Optional.of(aggVectorsDesc);
}
}
LOG.info("VectorGroupByOperator failed to push down");
- return null;
+ return Optional.empty();
}
-
/**
* Check whether Limit offset > 0
*
* @param vectorLimitOperator VectorLimitOperator
* @return true or false
*/
- public static LimitDesc checkLimitOperator(VectorLimitOperator vectorLimitOperator) {
+ public static Optional checkLimitOperator(VectorLimitOperator vectorLimitOperator) {
if (vectorLimitOperator == null) {
- return null;
+ return Optional.empty();
}
LimitDesc limitDesc = vectorLimitOperator.getConf();
if (limitDesc.getOffset() == null && limitDesc.getLimit() > 0) {
- return limitDesc;
+ return Optional.of(limitDesc);
}
LOG.info("VectorLimitOperator failed to push down, since unsupported Limit offset > 0");
- return null;
+ return Optional.empty();
}
public static boolean checkAggregationDesc(AggregationDesc agg) {
@@ -417,36 +386,20 @@ public class NdpPlanChecker {
return checkMinFunction(agg);
}
- public static double getSelectivity(TableScanOperator tableScanOp) {
- double selectivity = 0.5;
- if (tableScanOp.getConf().getStatistics() == null) {
- return selectivity;
- }
- try {
- long tableCount = tableScanOp.getConf().getStatistics().getNumRows();
- long filterCount = tableScanOp.getChildOperators().get(0).getConf().getStatistics().getNumRows();
- if (tableCount > 0) {
- selectivity = 1.0 * filterCount / tableCount;
- }
- } catch (Exception e) {
- LOG.error("Can't calculate the selectivity", e);
- }
- return selectivity;
- }
-
/**
* Check whether the filter selectivity is supported
*
* @param tableScanOp TableScanOperator
- * @param ndpConf NdpConf
+ * @param conf OmniDataConf
* @return true or false
*/
- public static boolean checkSelectivity(TableScanOperator tableScanOp, NdpConf ndpConf) {
- if (ndpConf.getNdpFilterSelectivityEnable()) {
- double currentSelectivity = getSelectivity(tableScanOp);
- if (currentSelectivity > ndpConf.getNdpFilterSelectivity()) {
+ public static boolean checkSelectivity(TableScanOperator tableScanOp, Configuration conf) {
+ if (OmniDataConf.getOmniDataFilterSelectivityEnabled(conf)) {
+ double currentSelectivity = NdpStatisticsUtils.getSelectivity(tableScanOp);
+ double filterSelectivity = OmniDataConf.getOmniDataFilterSelectivity(conf);
+ if (currentSelectivity > filterSelectivity) {
LOG.info("Table [{}] failed to push down, since selectivity[{}] > threshold[{}]",
- tableScanOp.getConf().getAlias(), currentSelectivity, ndpConf.getNdpFilterSelectivity());
+ tableScanOp.getConf().getAlias(), currentSelectivity, filterSelectivity);
return false;
} else {
LOG.info("Table [{}] selectivity is {}", tableScanOp.getConf().getAlias(), currentSelectivity);
@@ -462,17 +415,17 @@ public class NdpPlanChecker {
* Check whether the table size is supported
*
* @param tableScanOp TableScanOperator
- * @param ndpConf NdpConf
+ * @param conf hive conf
* @return true or false
*/
- public static boolean checkTableSize(TableScanOperator tableScanOp, NdpConf ndpConf) {
+ public static boolean checkTableSize(TableScanOperator tableScanOp, Configuration conf) {
if (tableScanOp.getConf().getStatistics() == null) {
return false;
}
long currentTableSize = tableScanOp.getConf().getStatistics().getDataSize();
- if (currentTableSize < ndpConf.getNdpTablesSizeThreshold()) {
+ if (currentTableSize < OmniDataConf.getOmniDataTablesSizeThreshold(conf)) {
LOG.info("Table [{}] failed to push down, since table size[{}] < threshold[{}]",
- tableScanOp.getConf().getAlias(), currentTableSize, ndpConf.getNdpTablesSizeThreshold());
+ tableScanOp.getConf().getAlias(), currentTableSize, OmniDataConf.getOmniDataTablesSizeThreshold(conf));
return false;
}
return true;
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanResolver.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanResolver.java
index e27f0b59969ecfe3f16765c06c6b0cf40d152f0d..16b3071cad1a6f46969db5833adf82cd2780068f 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanResolver.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanResolver.java
@@ -19,9 +19,6 @@
package org.apache.hadoop.hive.ql.omnidata.physical;
-import static org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpEngineEnum.MR;
-import static org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpEngineEnum.Tez;
-
import com.huawei.boostkit.omnidata.model.AggregationInfo;
import com.huawei.boostkit.omnidata.model.Predicate;
@@ -29,27 +26,23 @@ import com.google.common.collect.ImmutableMap;
import io.prestosql.spi.relation.RowExpression;
-import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc;
import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.omnidata.config.NdpConf;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
import org.apache.hadoop.hive.ql.omnidata.operator.aggregation.OmniDataAggregation;
-import org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpEngineEnum;
import org.apache.hadoop.hive.ql.omnidata.operator.filter.NdpFilter;
import org.apache.hadoop.hive.ql.omnidata.operator.filter.OmniDataFilter;
import org.apache.hadoop.hive.ql.omnidata.operator.limit.OmniDataLimit;
@@ -60,16 +53,17 @@ import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusInfo;
import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusManager;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
import org.apache.hadoop.hive.ql.plan.VectorSelectDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,9 +82,7 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
private Context context;
- private NdpConf ndpConf;
-
- private NdpEngineEnum engine;
+ private ParseContext parseContext;
private boolean isPushDownFilter = false;
@@ -120,7 +112,7 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
/**
* Hive agg && group by expression
*/
- private GroupByDesc aggDesc;
+ private VectorGroupByDesc aggDesc;
/**
* Hive limit expression
@@ -128,20 +120,26 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
private LimitDesc limitDesc;
/**
- * Hive agg pushDown optimize
+ * Table data format
+ */
+ private String dataFormat = "";
+
+ /**
+ * If a table in an SQL statement is pushed down, this parameter is set to true.
*/
- private boolean isAggOptimized = false;
+ private boolean existsTablePushDown = false;
+
+ private NdpVectorizedRowBatchCtx ndpCtx;
@Override
public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
this.hiveConf = pctx.getConf();
- this.ndpConf = new NdpConf(hiveConf);
this.context = pctx.getContext();
+ this.parseContext = pctx.getParseContext();
Dispatcher dispatcher = new NdpDispatcher();
TaskGraphWalker walker = new TaskGraphWalker(dispatcher);
List topNodes = new ArrayList<>(pctx.getRootTasks());
walker.startWalking(topNodes, null);
- hiveConf.set(NdpConf.NDP_AGG_OPTIMIZED_ENABLE, String.valueOf(isAggOptimized));
return pctx;
}
@@ -151,13 +149,13 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
if (nodeOutputs == null || nodeOutputs.length == 0) {
throw new SemanticException("No Dispatch Context");
}
-
- if (!ndpConf.getNdpEnabled() || !NdpPlanChecker.checkRollUp(context.getCmd())) {
+ // OmniData Hive unsupported operator 'ROLLUP'
+ if (!NdpPlanChecker.checkRollUp(context.getCmd())) {
return null;
}
// host resources status map
- Map ndpStatusInfoMap = new HashMap<>(NdpStatusManager.getNdpZookeeperData(ndpConf));
+ Map ndpStatusInfoMap = new HashMap<>(NdpStatusManager.getNdpZookeeperData(hiveConf));
if (!NdpPlanChecker.checkHostResources(ndpStatusInfoMap)) {
return null;
}
@@ -168,100 +166,103 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
return null;
}
- List workIndexList = new ArrayList<>();
- List works = new ArrayList<>();
- List> topOp = new ArrayList<>();
+ // key: work name
+ Map mapWorkMap = new HashMap<>();
+ Map tableScanOperatorMap = new HashMap<>();
+ // use TreeMap, sort by reduce name, for example: Reduce1, Reduce2...
+ Map reduceWorkMap = new TreeMap<>();
if (currTask.getWork() instanceof TezWork) {
- // tez
- engine = Tez;
- TezWork tezWork = (TezWork) currTask.getWork();
- works.addAll(tezWork.getAllWork());
- for (int i = 0; i < works.size(); i++) {
- BaseWork work = works.get(i);
- if (work instanceof MapWork) {
- if (((MapWork) work).getAliasToWork().size() == 1) {
- topOp.addAll(((MapWork) work).getAliasToWork().values());
- workIndexList.add(i);
- }
- }
- }
+ NdpStatisticsUtils.generateMapReduceWork((TezWork) currTask.getWork(), mapWorkMap, tableScanOperatorMap,
+ reduceWorkMap);
} else {
// unsupported
return null;
}
- int index = 0;
- int ndpTableNums = 0;
- for (Operator extends OperatorDesc> operator : topOp) {
- if (operator instanceof TableScanOperator) {
- TableScanOperator tableScanOp = (TableScanOperator) operator;
- NdpPredicateInfo ndpPredicateInfo = new NdpPredicateInfo(false);
- // check TableScanOp
- if (checkTableScanOp(tableScanOp, works.get(workIndexList.get(index)))) {
- Optional filter = Optional.empty();
- Optional aggregation = Optional.empty();
- OptionalLong limit = OptionalLong.empty();
- OmniDataPredicate omniDataPredicate = new OmniDataPredicate(tableScanOp);
- omniDataPredicate.setSelectExpressions(selectDesc);
- // get OmniData filter expression
- if (isPushDownFilter) {
- filter = getOmniDataFilter(omniDataPredicate);
- if (!filter.isPresent()) {
- isPushDownFilter = false;
- isPushDownAgg = false;
- }
- }
- // get OmniData agg expression
- if (isPushDownAgg) {
- // The output column of the aggregation needs to be processed separately.
- aggregation = getOmniDataAggregation(omniDataPredicate);
- }
- // get OmniData select expression
- if (isPushDownSelect && !isPushDownAgg) {
- omniDataPredicate = new OmniDataPredicate(tableScanOp);
- omniDataPredicate.setSelectExpressions(selectDesc);
- omniDataPredicate.addProjectionsByTableScan(tableScanOp);
- }
- // get OmniData limit expression
- if (isPushDownLimit) {
- limit = getOmniDataLimit();
+ // deal with each TableScanOperator
+ tableScanOperatorMap.forEach((workName, tableScanOp) -> {
+ NdpPredicateInfo ndpPredicateInfo = new NdpPredicateInfo(false);
+ // set table's selectivity
+ tableScanOp.getConf().setOmniDataSelectivity(NdpStatisticsUtils.getSelectivity(tableScanOp));
+ // check TableScanOp
+ if (checkTableScanOp(tableScanOp, mapWorkMap.get(workName))) {
+ Optional filter = Optional.empty();
+ Optional aggregation = Optional.empty();
+ OptionalLong limit = OptionalLong.empty();
+ OmniDataPredicate omniDataPredicate = new OmniDataPredicate(tableScanOp);
+ omniDataPredicate.setSelectExpressions(selectDesc);
+ // get OmniData filter expression
+ if (isPushDownFilter) {
+ filter = getOmniDataFilter(omniDataPredicate, tableScanOp);
+ if (!filter.isPresent()) {
+ isPushDownFilter = false;
+ isPushDownAgg = false;
}
- // the decode type must exist
- if ((isPushDownFilter || isPushDownAgg) && omniDataPredicate.getDecodeTypes().size() > 0) {
- replaceTableScanOp(tableScanOp, works.get(workIndexList.get(index)));
- Predicate predicate = new Predicate(omniDataPredicate.getTypes(),
- omniDataPredicate.getColumns(), filter, omniDataPredicate.getProjections(),
- ImmutableMap.of(), ImmutableMap.of(), aggregation, limit);
- ndpPredicateInfo = new NdpPredicateInfo(true, isPushDownAgg, isPushDownFilter,
- omniDataPredicate.getHasPartitionColumns(), predicate,
- tableScanOp.getConf().getNeededColumnIDs(), omniDataPredicate.getDecodeTypes(),
- omniDataPredicate.getDecodeTypesWithAgg());
- NdpStatusManager.setOmniDataHostToConf(hiveConf, ndpStatusInfoMap);
- printPushDownInfo(tableScanOp.getConf().getAlias(), ndpStatusInfoMap);
- ndpTableNums++;
+ }
+ // get OmniData agg expression
+ if (isPushDownAgg) {
+ // The output column of the aggregation needs to be processed separately.
+ aggregation = getOmniDataAggregation(omniDataPredicate);
+ if (!aggregation.isPresent()) {
+ isPushDownAgg = false;
}
}
- initPushDown();
- // set ndpPredicateInfo after serialization
- tableScanOp.getConf()
- .setNdpPredicateInfoStr(NdpSerializationUtils.serializeNdpPredicateInfo(ndpPredicateInfo));
+ // get OmniData select expression
+ if (isPushDownSelect && !isPushDownAgg) {
+ omniDataPredicate = new OmniDataPredicate(tableScanOp);
+ omniDataPredicate.addProjectionsByTableScan(tableScanOp);
+ }
+ // get OmniData limit expression
+ if (isPushDownLimit) {
+ limit = getOmniDataLimit();
+ }
+ // the decode type must exist
+ if ((isPushDownFilter || isPushDownAgg) && omniDataPredicate.getDecodeTypes().size() > 0) {
+ replaceTableScanOp(tableScanOp, mapWorkMap.get(workName));
+ Predicate predicate = new Predicate(omniDataPredicate.getTypes(),
+ omniDataPredicate.getColumns(), filter, omniDataPredicate.getProjections(),
+ ImmutableMap.of(), ImmutableMap.of(), aggregation, limit);
+ ndpPredicateInfo = new NdpPredicateInfo(true, isPushDownAgg, isPushDownFilter,
+ omniDataPredicate.getHasPartitionColumns(), predicate,
+ tableScanOp.getConf().getNeededColumnIDs(), omniDataPredicate.getDecodeTypes(),
+ omniDataPredicate.getDecodeTypesWithAgg(), ndpCtx, dataFormat);
+ // print OmniData Hive log information
+ printPushDownInfo(tableScanOp.getConf().getAlias(), ndpStatusInfoMap);
+ existsTablePushDown = true;
+ }
+ }
+ // set ndpPredicateInfo after serialization
+ tableScanOp.getConf()
+ .setNdpPredicateInfoStr(NdpSerializationUtils.serializeNdpPredicateInfo(ndpPredicateInfo));
+ initPushDown();
+ });
+ if (existsTablePushDown) {
+ // set OmniData hosts info to Hive conf
+ NdpStatusManager.setOmniDataHostToConf(hiveConf, ndpStatusInfoMap);
+ // OmniData reduce work optimize
+ if (OmniDataConf.getOmniDataReduceOptimizedEnabled(hiveConf)) {
+ NdpStatisticsUtils.optimizeReduceWork(tableScanOperatorMap, reduceWorkMap, hiveConf, parseContext);
}
- index++;
- }
- if (ndpTableNums != 1) {
- isAggOptimized = false;
}
+ OmniDataConf.setOmniDataExistsTablePushDown(hiveConf, existsTablePushDown);
return null;
}
private boolean checkTableScanOp(TableScanOperator tableScanOp, BaseWork work) {
+ Optional format = NdpPlanChecker.getDataFormat(tableScanOp, work);
+ if (format.isPresent()) {
+ dataFormat = format.get();
+ } else {
+ LOG.info("Table [{}] failed to push down, only orc and parquet are supported",
+ tableScanOp.getConf().getAlias());
+ return false;
+ }
if (NdpPlanChecker.checkTableScanNumChild(tableScanOp) && NdpPlanChecker.checkHiveType(tableScanOp)
- && NdpPlanChecker.checkTableSize(tableScanOp, ndpConf) && NdpPlanChecker.checkSelectivity(tableScanOp,
- ndpConf) && NdpPlanChecker.checkDataFormat(tableScanOp, work)) {
+ && NdpPlanChecker.checkTableSize(tableScanOp, hiveConf) && NdpPlanChecker.checkSelectivity(tableScanOp,
+ hiveConf)) {
// scan operator: select agg filter limit
scanTableScanChildOperators(tableScanOp.getChildOperators());
- return isPushDownAgg || isPushDownFilter;
}
- return false;
+ return isPushDownAgg || isPushDownFilter;
}
/**
@@ -277,31 +278,57 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
Operator extends OperatorDesc> operator = operators.get(0);
if (operator instanceof VectorFilterOperator) {
// filter push down
- filterDesc = NdpPlanChecker.checkFilterOperator((VectorFilterOperator) operator);
- isPushDownFilter = (filterDesc != null);
+ Optional tmpFilterDesc = NdpPlanChecker.checkFilterOperator(
+ (VectorFilterOperator) operator);
+ if (tmpFilterDesc.isPresent()) {
+ filterDesc = tmpFilterDesc.get();
+ isPushDownFilter = true;
+ }
scanTableScanChildOperators(operator.getChildOperators());
} else if (operator instanceof VectorSelectOperator) {
// check select
- selectDesc = NdpPlanChecker.checkSelectOperator((VectorSelectOperator) operator);
- isPushDownSelect = (selectDesc != null);
+ Optional tmpSelectDesc = NdpPlanChecker.checkSelectOperator(
+ (VectorSelectOperator) operator);
+ if (tmpSelectDesc.isPresent()) {
+ selectDesc = tmpSelectDesc.get();
+ isPushDownSelect = true;
+ }
scanTableScanChildOperators(operator.getChildOperators());
} else if (operator instanceof VectorGroupByOperator) {
// check agg
- aggDesc = NdpPlanChecker.checkGroupByOperator((VectorGroupByOperator) operator);
- isPushDownAgg = (aggDesc != null);
+ Optional tmpAggDesc = NdpPlanChecker.checkGroupByOperator(
+ (VectorGroupByOperator) operator);
+ if (tmpAggDesc.isPresent()) {
+ aggDesc = tmpAggDesc.get();
+ isPushDownAgg = true;
+ }
scanTableScanChildOperators(operator.getChildOperators());
} else if (operator instanceof VectorLimitOperator) {
// check limit
- limitDesc = NdpPlanChecker.checkLimitOperator((VectorLimitOperator) operator);
- isPushDownLimit = (limitDesc != null);
+ Optional