exprNodeDescList) {
+ for (ExprNodeDesc desc : exprNodeDescList) {
+ for (ExprNodeDesc child : desc.getChildren()) {
+ if (child instanceof ExprNodeDynamicValueDesc || child instanceof ExprNodeDynamicListDesc) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
/**
* Todo: need to optimization
*
@@ -156,7 +174,7 @@ public class NdpFilter {
return newChildren.get(0);
} else {
return new ExprNodeGenericFuncDesc(oldFuncDesc.getTypeInfo(), oldFuncDesc.getGenericUDF(),
- oldFuncDesc.getFuncText(), newChildren);
+ oldFuncDesc.getFuncText(), newChildren);
}
}
@@ -176,8 +194,8 @@ public class NdpFilter {
break;
case UNSUPPORTED:
// operator unsupported
- LOG.info("OmniData Hive Part Filter failed to push down, since unsupported this Operator class: [{}]",
- operator);
+ LOG.info("OmniData Hive Filter do not to all push down, since unsupported this Operator class: [{}]",
+ funcDesc.getGenericUDF().getClass().getSimpleName());
unsupportedFilterDescList.add(funcDesc);
break;
default:
@@ -259,6 +277,10 @@ public class NdpFilter {
private boolean checkUdf(ExprNodeGenericFuncDesc funcDesc, NdpHiveOperatorEnum operator) {
int argumentIndex = 0;
if (operator.equals(NdpHiveOperatorEnum.BETWEEN)) {
+ // Between not support ExprNodeDynamicValueDesc
+ if (!checkBetween(funcDesc)) {
+ return false;
+ }
// first argument for BETWEEN should be boolean type
argumentIndex = 1;
}
@@ -269,8 +291,8 @@ public class NdpFilter {
if (udfFuncDesc instanceof ExprNodeGenericFuncDesc) {
// check whether the UDF supports push down
if (!NdpUdfEnum.checkUdfSupported(((ExprNodeGenericFuncDesc) udfFuncDesc).getGenericUDF())) {
- LOG.info("OmniData Hive Part Filter failed to push down, since unsupported this UDF class: [{}]",
- udfFuncDesc.getClass());
+ LOG.info("OmniData Hive Filter failed to all push down, since unsupported this UDF class: [{}]",
+ ((ExprNodeGenericFuncDesc) udfFuncDesc).getGenericUDF());
return false;
}
}
@@ -278,6 +300,11 @@ public class NdpFilter {
return true;
}
+ private boolean checkBetween(ExprNodeGenericFuncDesc funcDesc) {
+ return funcDesc.getChildren().get(2) instanceof ExprNodeConstantDesc && funcDesc.getChildren()
+ .get(3) instanceof ExprNodeConstantDesc;
+ }
+
/**
* Converting Hive Operators to Omnidata Operators
*
@@ -318,7 +345,7 @@ public class NdpFilter {
}
private NdpFilterBinaryTree(NdpOperator ndpOperator, List exprNodes,
- List ndpFilterLeafList) {
+ List ndpFilterLeafList) {
if (exprNodes.size() >= 2) {
this.ndpOperator = ndpOperator;
leftChild = new NdpFilterBinaryTree(exprNodes.get(0), ndpFilterLeafList);
@@ -375,8 +402,8 @@ public class NdpFilter {
restChildren.add((ExprNodeGenericFuncDesc) expr);
} else {
LOG.info(
- "OmniData Hive Filter failed to push down, since Method parseAndOrOperator() unsupported this [{}]",
- expr.getClass());
+ "OmniData Hive Filter failed to push down, since Method parseAndOrOperator() unsupported this [{}]",
+ expr.getClass());
setNdpLeafOperatorUnsupported();
}
}
@@ -388,8 +415,8 @@ public class NdpFilter {
}
} else {
LOG.info(
- "OmniData Hive Filter failed to push down, since Method parseAndOrOperator() unsupported this [{}]",
- leftExpr.getClass());
+ "OmniData Hive Filter failed to push down, since Method parseAndOrOperator() unsupported this [{}]",
+ leftExpr.getClass());
setNdpLeafOperatorUnsupported();
}
}
@@ -402,8 +429,8 @@ public class NdpFilter {
leftChild = new NdpFilterBinaryTree((ExprNodeGenericFuncDesc) expr, ndpFilterLeafList);
} else {
LOG.info(
- "OmniData Hive Filter failed to push down, since Method parseNotOperator() unsupported this [{}]",
- expr.getClass());
+ "OmniData Hive Filter failed to push down, since Method parseNotOperator() unsupported this [{}]",
+ expr.getClass());
setNdpLeafOperatorUnsupported();
}
} else {
@@ -428,8 +455,8 @@ public class NdpFilter {
public void parseFilterOperator(ExprNodeGenericFuncDesc exprNode) {
Class operator = (exprNode.getGenericUDF() instanceof GenericUDFBridge)
- ? ((GenericUDFBridge) exprNode.getGenericUDF()).getUdfClass()
- : exprNode.getGenericUDF().getClass();
+ ? ((GenericUDFBridge) exprNode.getGenericUDF()).getUdfClass()
+ : exprNode.getGenericUDF().getClass();
if (operator == GenericUDFOPAnd.class) {
ndpOperator = NdpOperator.AND;
} else if (operator == GenericUDFOPOr.class) {
@@ -462,8 +489,8 @@ public class NdpFilter {
ndpLeafOperator = NdpLeafOperator.LESS_THAN_OR_EQUAL;
} else if (operator == GenericUDFBetween.class) {
ndpOperator = (Boolean) ((ExprNodeConstantDesc) exprNode.getChildren().get(0)).getValue()
- ? NdpOperator.NOT
- : NdpOperator.LEAF;
+ ? NdpOperator.NOT
+ : NdpOperator.LEAF;
ndpLeafOperator = NdpLeafOperator.BETWEEN;
} else if (operator == GenericUDFIn.class) {
ndpOperator = NdpOperator.LEAF;
@@ -474,8 +501,8 @@ public class NdpFilter {
} else {
ndpOperator = NdpOperator.LEAF;
LOG.info(
- "OmniData Hive Filter failed to push down, since Method parseFilterOperator() unsupported this [{}]",
- operator);
+ "OmniData Hive Filter failed to push down, since Method parseFilterOperator() unsupported this [{}]",
+ operator);
setNdpLeafOperatorUnsupported();
}
}
@@ -516,4 +543,4 @@ public class NdpFilter {
UNSUPPORTED
}
-}
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/operator/predicate/NdpPredicateInfo.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/operator/predicate/NdpPredicateInfo.java
index 4c42daa06716b31000fa076c87edb99f6579d4a3..825c122d3f39bfc2a2a93314f44431b1dcc2cea3 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/operator/predicate/NdpPredicateInfo.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/operator/predicate/NdpPredicateInfo.java
@@ -24,10 +24,10 @@ import com.huawei.boostkit.omnidata.model.Predicate;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.hive.ql.omnidata.physical.NdpVectorizedRowBatchCtx;
+
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
/**
* Ndp Predicate Info
@@ -54,6 +54,10 @@ public class NdpPredicateInfo implements Serializable {
private List decodeTypesWithAgg;
+ private NdpVectorizedRowBatchCtx ndpVectorizedRowBatchCtx;
+
+ private String dataFormat;
+
public NdpPredicateInfo() {
}
@@ -63,12 +67,14 @@ public class NdpPredicateInfo implements Serializable {
@JsonCreator
public NdpPredicateInfo(@JsonProperty("isPushDown") boolean isPushDown,
- @JsonProperty("isPushDownAgg") boolean isPushDownAgg,
- @JsonProperty("isPushDownFilter") boolean isPushDownFilter,
- @JsonProperty("hasPartitionColumn") boolean hasPartitionColumn, @JsonProperty("predicate") Predicate predicate,
- @JsonProperty("outputColumns") List outputColumns,
- @JsonProperty("decodeTypes") List decodeTypes,
- @JsonProperty("decodeTypesWithAgg") List decodeTypesWithAgg) {
+ @JsonProperty("isPushDownAgg") boolean isPushDownAgg,
+ @JsonProperty("isPushDownFilter") boolean isPushDownFilter,
+ @JsonProperty("hasPartitionColumn") boolean hasPartitionColumn, @JsonProperty("predicate") Predicate predicate,
+ @JsonProperty("outputColumns") List outputColumns,
+ @JsonProperty("decodeTypes") List decodeTypes,
+ @JsonProperty("decodeTypesWithAgg") List decodeTypesWithAgg,
+ @JsonProperty("ndpVectorizedRowBatchCtx") NdpVectorizedRowBatchCtx ndpVectorizedRowBatchCtx,
+ @JsonProperty("dataFormat") String dataFormat) {
this.isPushDown = isPushDown;
this.isPushDownAgg = isPushDownAgg;
this.isPushDownFilter = isPushDownFilter;
@@ -77,6 +83,8 @@ public class NdpPredicateInfo implements Serializable {
this.outputColumns = outputColumns;
this.decodeTypes = decodeTypes;
this.decodeTypesWithAgg = decodeTypesWithAgg;
+ this.ndpVectorizedRowBatchCtx = ndpVectorizedRowBatchCtx;
+ this.dataFormat = dataFormat;
}
@JsonProperty
@@ -119,24 +127,37 @@ public class NdpPredicateInfo implements Serializable {
return decodeTypesWithAgg;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- NdpPredicateInfo that = (NdpPredicateInfo) o;
- return isPushDown == that.isPushDown && isPushDownAgg == that.isPushDownAgg
- && isPushDownFilter == that.isPushDownFilter && hasPartitionColumn == that.hasPartitionColumn
- && Objects.equals(predicate, that.predicate) && Objects.equals(outputColumns, that.outputColumns) && Objects
- .equals(decodeTypes, that.decodeTypes) && Objects.equals(decodeTypesWithAgg, that.decodeTypesWithAgg);
+ @JsonProperty
+ public NdpVectorizedRowBatchCtx getNdpVectorizedRowBatchCtx() {
+ return ndpVectorizedRowBatchCtx;
+ }
+
+ @JsonProperty
+ public String getDataFormat() {
+ return dataFormat;
+ }
+
+ public void setPredicate(Predicate predicate) {
+ this.predicate = predicate;
+ }
+
+ public void setOutputColumns(List outputColumns) {
+ this.outputColumns = outputColumns;
+ }
+
+ public void setDecodeTypes(List decodeTypes) {
+ this.decodeTypes = decodeTypes;
+ }
+
+ public void setDecodeTypesWithAgg(List decodeTypesWithAgg) {
+ this.decodeTypesWithAgg = decodeTypesWithAgg;
+ }
+
+ public void setNdpVectorizedRowBatchCtx(NdpVectorizedRowBatchCtx ndpVectorizedRowBatchCtx) {
+ this.ndpVectorizedRowBatchCtx = ndpVectorizedRowBatchCtx;
}
- @Override
- public int hashCode() {
- return Objects.hash(isPushDown, isPushDownAgg, isPushDownFilter, hasPartitionColumn, predicate, outputColumns,
- decodeTypes, decodeTypesWithAgg);
+ public void setDataFormat(String dataFormat) {
+ this.dataFormat = dataFormat;
}
-}
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanChecker.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanChecker.java
index 56fcecc36de9041737b6c20f73aeb71a3e8a8d6c..87e34f2399a2a55af7bddd3a0ef4f766a8009926 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanChecker.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanChecker.java
@@ -31,18 +31,18 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
-import org.apache.hadoop.hive.ql.omnidata.config.NdpConf;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
import org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpHiveOperatorEnum;
import org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpUdfEnum;
import org.apache.hadoop.hive.ql.omnidata.operator.filter.NdpFilter.*;
import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusInfo;
-import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusManager;
import org.apache.hadoop.hive.ql.plan.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
/**
* Used to check the validity of the operation during the Ndp planning phase.
@@ -112,57 +112,32 @@ public class NdpPlanChecker {
* Currently, two data formats are supported: Orc and Parquet.
*
* @param tableScanOp TableScanOperator
- * @return true or false
+ * @param work mapWork
+ * @return 'orc' and 'parquet'
*/
- public static boolean checkDataFormat(TableScanOperator tableScanOp, BaseWork work) {
+ public static Optional getDataFormat(TableScanOperator tableScanOp, BaseWork work) {
String tableName = tableScanOp.getConf().getAlias();
+ String format = "";
// TableMetadata may be 'null'
if (tableScanOp.getConf().getTableMetadata() == null) {
if (work instanceof MapWork) {
PartitionDesc desc = ((MapWork) work).getAliasToPartnInfo().get(tableScanOp.getConf().getAlias());
if (desc != null) {
- String inputFormat = desc.getInputFileFormatClass().getSimpleName();
- String outputFormat = desc.getOutputFileFormatClass().getSimpleName();
- return checkDataFormat(inputFormat, outputFormat, tableName);
+ format = desc.getInputFileFormatClass().getSimpleName();
} else {
- LOG.info("Table [{}] failed to push down, since PartitionDesc is null",
- tableName);
- return false;
+ LOG.info("Table [{}] failed to push down, since PartitionDesc is null", tableName);
}
- } else {
- LOG.info("Table [{}] failed to push down, since unsupported this work: [{}]",
- tableName, work.getClass().getSimpleName());
- return false;
}
} else {
- String inputFormat = tableScanOp.getConf().getTableMetadata().getInputFormatClass().getSimpleName();
- String outputFormat = tableScanOp.getConf().getTableMetadata().getOutputFormatClass().getSimpleName();
- return checkDataFormat(inputFormat, outputFormat, tableName);
- }
- }
-
- /**
- * Currently, two data formats are supported: Orc and Parquet.
- *
- * @param inputFormat hive input data format
- * @param outputFormat hive output data format
- * @param tableName hive table name
- * @return true or false
- */
- public static boolean checkDataFormat(String inputFormat, String outputFormat, String tableName) {
- if (!(inputFormat.toLowerCase(Locale.ENGLISH).contains("orc") || inputFormat.toLowerCase(Locale.ENGLISH)
- .contains("parquet"))) {
- LOG.info("Table [{}] failed to push down, since unsupported this input data format: [{}]", tableName,
- inputFormat);
- return false;
+ format = tableScanOp.getConf().getTableMetadata().getInputFormatClass().getSimpleName();
}
- if (!(outputFormat.toLowerCase(Locale.ENGLISH).contains("orc") || outputFormat.toLowerCase(Locale.ENGLISH)
- .contains("parquet"))) {
- LOG.info("Table [{}] failed to push down, since unsupported this output data format: [{}]", tableName,
- inputFormat);
- return false;
+ if (format.toLowerCase(Locale.ENGLISH).contains("orc")) {
+ return Optional.of("orc");
+ } else if (format.toLowerCase(Locale.ENGLISH).contains("parquet")) {
+ return Optional.of("parquet");
+ } else {
+ return Optional.empty();
}
- return true;
}
/**
@@ -194,11 +169,6 @@ public class NdpPlanChecker {
return true;
}
- public static boolean checkPushDown(Configuration conf, boolean isPushDown) {
- return isPushDown && conf.get(NdpStatusManager.NDP_DATANODE_HOSTNAMES) != null
- && conf.get(NdpStatusManager.NDP_DATANODE_HOSTNAMES).length() > 0;
- }
-
/**
* Check whether the Udf in white list
*
@@ -317,7 +287,6 @@ public class NdpPlanChecker {
return null;
}
-
/**
* Check whether Limit offset > 0
*
@@ -417,36 +386,20 @@ public class NdpPlanChecker {
return checkMinFunction(agg);
}
- public static double getSelectivity(TableScanOperator tableScanOp) {
- double selectivity = 0.5;
- if (tableScanOp.getConf().getStatistics() == null) {
- return selectivity;
- }
- try {
- long tableCount = tableScanOp.getConf().getStatistics().getNumRows();
- long filterCount = tableScanOp.getChildOperators().get(0).getConf().getStatistics().getNumRows();
- if (tableCount > 0) {
- selectivity = 1.0 * filterCount / tableCount;
- }
- } catch (Exception e) {
- LOG.error("Can't calculate the selectivity", e);
- }
- return selectivity;
- }
-
/**
* Check whether the filter selectivity is supported
*
* @param tableScanOp TableScanOperator
- * @param ndpConf NdpConf
+ * @param conf OmniDataConf
* @return true or false
*/
- public static boolean checkSelectivity(TableScanOperator tableScanOp, NdpConf ndpConf) {
- if (ndpConf.getNdpFilterSelectivityEnable()) {
- double currentSelectivity = getSelectivity(tableScanOp);
- if (currentSelectivity > ndpConf.getNdpFilterSelectivity()) {
+ public static boolean checkSelectivity(TableScanOperator tableScanOp, Configuration conf) {
+ if (OmniDataConf.getOmniDataFilterSelectivityEnabled(conf)) {
+ double currentSelectivity = NdpStatisticsUtils.getSelectivity(tableScanOp);
+ double filterSelectivity = OmniDataConf.getOmniDataFilterSelectivity(conf);
+ if (currentSelectivity > filterSelectivity) {
LOG.info("Table [{}] failed to push down, since selectivity[{}] > threshold[{}]",
- tableScanOp.getConf().getAlias(), currentSelectivity, ndpConf.getNdpFilterSelectivity());
+ tableScanOp.getConf().getAlias(), currentSelectivity, filterSelectivity);
return false;
} else {
LOG.info("Table [{}] selectivity is {}", tableScanOp.getConf().getAlias(), currentSelectivity);
@@ -462,17 +415,17 @@ public class NdpPlanChecker {
* Check whether the table size is supported
*
* @param tableScanOp TableScanOperator
- * @param ndpConf NdpConf
+ * @param conf hive conf
* @return true or false
*/
- public static boolean checkTableSize(TableScanOperator tableScanOp, NdpConf ndpConf) {
+ public static boolean checkTableSize(TableScanOperator tableScanOp, Configuration conf) {
if (tableScanOp.getConf().getStatistics() == null) {
return false;
}
long currentTableSize = tableScanOp.getConf().getStatistics().getDataSize();
- if (currentTableSize < ndpConf.getNdpTablesSizeThreshold()) {
+ if (currentTableSize < OmniDataConf.getOmniDataTablesSizeThreshold(conf)) {
LOG.info("Table [{}] failed to push down, since table size[{}] < threshold[{}]",
- tableScanOp.getConf().getAlias(), currentTableSize, ndpConf.getNdpTablesSizeThreshold());
+ tableScanOp.getConf().getAlias(), currentTableSize, OmniDataConf.getOmniDataTablesSizeThreshold(conf));
return false;
}
return true;
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanResolver.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanResolver.java
index e27f0b59969ecfe3f16765c06c6b0cf40d152f0d..8c4e067713e599431bd67b6ede566e1730f614cf 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanResolver.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpPlanResolver.java
@@ -19,9 +19,6 @@
package org.apache.hadoop.hive.ql.omnidata.physical;
-import static org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpEngineEnum.MR;
-import static org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpEngineEnum.Tez;
-
import com.huawei.boostkit.omnidata.model.AggregationInfo;
import com.huawei.boostkit.omnidata.model.Predicate;
@@ -29,27 +26,22 @@ import com.google.common.collect.ImmutableMap;
import io.prestosql.spi.relation.RowExpression;
-import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.omnidata.config.NdpConf;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
import org.apache.hadoop.hive.ql.omnidata.operator.aggregation.OmniDataAggregation;
-import org.apache.hadoop.hive.ql.omnidata.operator.enums.NdpEngineEnum;
import org.apache.hadoop.hive.ql.omnidata.operator.filter.NdpFilter;
import org.apache.hadoop.hive.ql.omnidata.operator.filter.OmniDataFilter;
import org.apache.hadoop.hive.ql.omnidata.operator.limit.OmniDataLimit;
@@ -60,6 +52,7 @@ import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusInfo;
import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusManager;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -69,6 +62,7 @@ import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.VectorSelectDesc;
import org.slf4j.Logger;
@@ -88,9 +82,7 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
private Context context;
- private NdpConf ndpConf;
-
- private NdpEngineEnum engine;
+ private ParseContext parseContext;
private boolean isPushDownFilter = false;
@@ -128,20 +120,26 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
private LimitDesc limitDesc;
/**
- * Hive agg pushDown optimize
+ * Table data format
+ */
+ private String dataFormat = "";
+
+ /**
+ * If a table in an SQL statement is pushed down, this parameter is set to true.
*/
- private boolean isAggOptimized = false;
+ private boolean existsTablePushDown = false;
+
+ private NdpVectorizedRowBatchCtx ndpCtx;
@Override
public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
this.hiveConf = pctx.getConf();
- this.ndpConf = new NdpConf(hiveConf);
this.context = pctx.getContext();
+ this.parseContext = pctx.getParseContext();
Dispatcher dispatcher = new NdpDispatcher();
TaskGraphWalker walker = new TaskGraphWalker(dispatcher);
List topNodes = new ArrayList<>(pctx.getRootTasks());
walker.startWalking(topNodes, null);
- hiveConf.set(NdpConf.NDP_AGG_OPTIMIZED_ENABLE, String.valueOf(isAggOptimized));
return pctx;
}
@@ -151,13 +149,13 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
if (nodeOutputs == null || nodeOutputs.length == 0) {
throw new SemanticException("No Dispatch Context");
}
-
- if (!ndpConf.getNdpEnabled() || !NdpPlanChecker.checkRollUp(context.getCmd())) {
+ // OmniData Hive unsupported operator 'ROLLUP'
+ if (!NdpPlanChecker.checkRollUp(context.getCmd())) {
return null;
}
// host resources status map
- Map ndpStatusInfoMap = new HashMap<>(NdpStatusManager.getNdpZookeeperData(ndpConf));
+ Map ndpStatusInfoMap = new HashMap<>(NdpStatusManager.getNdpZookeeperData(hiveConf));
if (!NdpPlanChecker.checkHostResources(ndpStatusInfoMap)) {
return null;
}
@@ -168,100 +166,103 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
return null;
}
- List workIndexList = new ArrayList<>();
- List works = new ArrayList<>();
- List> topOp = new ArrayList<>();
+ // key: work name
+ Map mapWorkMap = new HashMap<>();
+ Map tableScanOperatorMap = new HashMap<>();
+ // use TreeMap, sort by reduce name, for example: Reduce1, Reduce2...
+ Map reduceWorkMap = new TreeMap<>();
if (currTask.getWork() instanceof TezWork) {
- // tez
- engine = Tez;
- TezWork tezWork = (TezWork) currTask.getWork();
- works.addAll(tezWork.getAllWork());
- for (int i = 0; i < works.size(); i++) {
- BaseWork work = works.get(i);
- if (work instanceof MapWork) {
- if (((MapWork) work).getAliasToWork().size() == 1) {
- topOp.addAll(((MapWork) work).getAliasToWork().values());
- workIndexList.add(i);
- }
- }
- }
+ NdpStatisticsUtils.generateMapReduceWork((TezWork) currTask.getWork(), mapWorkMap, tableScanOperatorMap,
+ reduceWorkMap);
} else {
// unsupported
return null;
}
- int index = 0;
- int ndpTableNums = 0;
- for (Operator extends OperatorDesc> operator : topOp) {
- if (operator instanceof TableScanOperator) {
- TableScanOperator tableScanOp = (TableScanOperator) operator;
- NdpPredicateInfo ndpPredicateInfo = new NdpPredicateInfo(false);
- // check TableScanOp
- if (checkTableScanOp(tableScanOp, works.get(workIndexList.get(index)))) {
- Optional filter = Optional.empty();
- Optional aggregation = Optional.empty();
- OptionalLong limit = OptionalLong.empty();
- OmniDataPredicate omniDataPredicate = new OmniDataPredicate(tableScanOp);
- omniDataPredicate.setSelectExpressions(selectDesc);
- // get OmniData filter expression
- if (isPushDownFilter) {
- filter = getOmniDataFilter(omniDataPredicate);
- if (!filter.isPresent()) {
- isPushDownFilter = false;
- isPushDownAgg = false;
- }
- }
- // get OmniData agg expression
- if (isPushDownAgg) {
- // The output column of the aggregation needs to be processed separately.
- aggregation = getOmniDataAggregation(omniDataPredicate);
- }
- // get OmniData select expression
- if (isPushDownSelect && !isPushDownAgg) {
- omniDataPredicate = new OmniDataPredicate(tableScanOp);
- omniDataPredicate.setSelectExpressions(selectDesc);
- omniDataPredicate.addProjectionsByTableScan(tableScanOp);
+ // deal with each TableScanOperator
+ tableScanOperatorMap.forEach((workName, tableScanOp) -> {
+ NdpPredicateInfo ndpPredicateInfo = new NdpPredicateInfo(false);
+ // set table's selectivity
+ tableScanOp.getConf().setOmniDataSelectivity(NdpStatisticsUtils.getSelectivity(tableScanOp));
+ // check TableScanOp
+ if (checkTableScanOp(tableScanOp, mapWorkMap.get(workName))) {
+ Optional filter = Optional.empty();
+ Optional aggregation = Optional.empty();
+ OptionalLong limit = OptionalLong.empty();
+ OmniDataPredicate omniDataPredicate = new OmniDataPredicate(tableScanOp);
+ omniDataPredicate.setSelectExpressions(selectDesc);
+ // get OmniData filter expression
+ if (isPushDownFilter) {
+ filter = getOmniDataFilter(omniDataPredicate, tableScanOp);
+ if (!filter.isPresent()) {
+ isPushDownFilter = false;
+ isPushDownAgg = false;
}
- // get OmniData limit expression
- if (isPushDownLimit) {
- limit = getOmniDataLimit();
- }
- // the decode type must exist
- if ((isPushDownFilter || isPushDownAgg) && omniDataPredicate.getDecodeTypes().size() > 0) {
- replaceTableScanOp(tableScanOp, works.get(workIndexList.get(index)));
- Predicate predicate = new Predicate(omniDataPredicate.getTypes(),
- omniDataPredicate.getColumns(), filter, omniDataPredicate.getProjections(),
- ImmutableMap.of(), ImmutableMap.of(), aggregation, limit);
- ndpPredicateInfo = new NdpPredicateInfo(true, isPushDownAgg, isPushDownFilter,
- omniDataPredicate.getHasPartitionColumns(), predicate,
- tableScanOp.getConf().getNeededColumnIDs(), omniDataPredicate.getDecodeTypes(),
- omniDataPredicate.getDecodeTypesWithAgg());
- NdpStatusManager.setOmniDataHostToConf(hiveConf, ndpStatusInfoMap);
- printPushDownInfo(tableScanOp.getConf().getAlias(), ndpStatusInfoMap);
- ndpTableNums++;
+ }
+ // get OmniData agg expression
+ if (isPushDownAgg) {
+ // The output column of the aggregation needs to be processed separately.
+ aggregation = getOmniDataAggregation(omniDataPredicate);
+ if (!aggregation.isPresent()) {
+ isPushDownAgg = false;
}
}
- initPushDown();
- // set ndpPredicateInfo after serialization
- tableScanOp.getConf()
- .setNdpPredicateInfoStr(NdpSerializationUtils.serializeNdpPredicateInfo(ndpPredicateInfo));
+ // get OmniData select expression
+ if (isPushDownSelect && !isPushDownAgg) {
+ omniDataPredicate = new OmniDataPredicate(tableScanOp);
+ omniDataPredicate.addProjectionsByTableScan(tableScanOp);
+ }
+ // get OmniData limit expression
+ if (isPushDownLimit) {
+ limit = getOmniDataLimit();
+ }
+ // the decode type must exist
+ if ((isPushDownFilter || isPushDownAgg) && omniDataPredicate.getDecodeTypes().size() > 0) {
+ replaceTableScanOp(tableScanOp, mapWorkMap.get(workName));
+ Predicate predicate = new Predicate(omniDataPredicate.getTypes(),
+ omniDataPredicate.getColumns(), filter, omniDataPredicate.getProjections(),
+ ImmutableMap.of(), ImmutableMap.of(), aggregation, limit);
+ ndpPredicateInfo = new NdpPredicateInfo(true, isPushDownAgg, isPushDownFilter,
+ omniDataPredicate.getHasPartitionColumns(), predicate,
+ tableScanOp.getConf().getNeededColumnIDs(), omniDataPredicate.getDecodeTypes(),
+ omniDataPredicate.getDecodeTypesWithAgg(), ndpCtx, dataFormat);
+ // print OmniData Hive log information
+ printPushDownInfo(tableScanOp.getConf().getAlias(), ndpStatusInfoMap);
+ existsTablePushDown = true;
+ }
+ }
+ // set ndpPredicateInfo after serialization
+ tableScanOp.getConf()
+ .setNdpPredicateInfoStr(NdpSerializationUtils.serializeNdpPredicateInfo(ndpPredicateInfo));
+ initPushDown();
+ });
+ if (existsTablePushDown) {
+ // set OmniData hosts info to Hive conf
+ NdpStatusManager.setOmniDataHostToConf(hiveConf, ndpStatusInfoMap);
+ // OmniData reduce work optimize
+ if (OmniDataConf.getOmniDataReduceOptimizedEnabled(hiveConf)) {
+ NdpStatisticsUtils.optimizeReduceWork(tableScanOperatorMap, reduceWorkMap, hiveConf, parseContext);
}
- index++;
- }
- if (ndpTableNums != 1) {
- isAggOptimized = false;
}
+ OmniDataConf.setOmniDataExistsTablePushDown(hiveConf, existsTablePushDown);
return null;
}
private boolean checkTableScanOp(TableScanOperator tableScanOp, BaseWork work) {
+ Optional format = NdpPlanChecker.getDataFormat(tableScanOp, work);
+ if (format.isPresent()) {
+ dataFormat = format.get();
+ } else {
+ LOG.info("Table [{}] failed to push down, only orc and parquet are supported",
+ tableScanOp.getConf().getAlias());
+ return false;
+ }
if (NdpPlanChecker.checkTableScanNumChild(tableScanOp) && NdpPlanChecker.checkHiveType(tableScanOp)
- && NdpPlanChecker.checkTableSize(tableScanOp, ndpConf) && NdpPlanChecker.checkSelectivity(tableScanOp,
- ndpConf) && NdpPlanChecker.checkDataFormat(tableScanOp, work)) {
+ && NdpPlanChecker.checkTableSize(tableScanOp, hiveConf) && NdpPlanChecker.checkSelectivity(tableScanOp,
+ hiveConf)) {
// scan operator: select agg filter limit
scanTableScanChildOperators(tableScanOp.getChildOperators());
- return isPushDownAgg || isPushDownFilter;
}
- return false;
+ return isPushDownAgg || isPushDownFilter;
}
/**
@@ -298,10 +299,21 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
}
}
- private Optional getOmniDataFilter(OmniDataPredicate omniDataPredicate) {
+ /**
+ * Converts the filter expression of Hive to that of the OmniData Server.
+ *
+ * @param omniDataPredicate OmniData Predicate
+ * @return Filter expression of OmniData Filter
+ */
+ private Optional getOmniDataFilter(OmniDataPredicate omniDataPredicate,
+ TableScanOperator tableScanOperator) {
NdpFilter ndpFilter = new NdpFilter(filterDesc);
NdpFilter.NdpFilterMode mode = ndpFilter.getMode();
if (mode.equals(NdpFilter.NdpFilterMode.PART)) {
+ if (!NdpStatisticsUtils.evaluatePartFilterSelectivity(tableScanOperator,
+ ndpFilter.getUnPushDownFuncDesc(), parseContext, hiveConf)) {
+ return Optional.empty();
+ }
isPushDownPartFilter = true;
unsupportedFilterDesc = ndpFilter.getUnPushDownFuncDesc();
filterDesc = (ExprNodeGenericFuncDesc) ndpFilter.getPushDownFuncDesc();
@@ -312,69 +324,43 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
}
OmniDataFilter omniDataFilter = new OmniDataFilter(omniDataPredicate);
// ExprNodeGenericFuncDesc need to clone
- RowExpression filterRowExpression = omniDataFilter.getFilterExpression(
- (ExprNodeGenericFuncDesc) filterDesc.clone(), ndpFilter);
- if (filterRowExpression == null) {
- return Optional.empty();
- }
- return Optional.of(filterRowExpression);
+ return Optional.ofNullable(
+ omniDataFilter.getFilterExpression((ExprNodeGenericFuncDesc) filterDesc.clone(), ndpFilter));
}
+ /**
+ * Converts the aggregation expression of Hive to that of the OmniData Server.
+ *
+ * @param omniDataPredicate OmniData Predicate
+ * @return Aggregation expression of OmniData Server
+ */
private Optional getOmniDataAggregation(OmniDataPredicate omniDataPredicate) {
OmniDataAggregation omniDataAggregation = new OmniDataAggregation(omniDataPredicate);
- AggregationInfo aggregationInfo = omniDataAggregation.getAggregation(aggDesc);
- if (aggregationInfo == null) {
- isPushDownAgg = false;
- return Optional.empty();
- }
- return Optional.of(aggregationInfo);
+ return Optional.ofNullable(omniDataAggregation.getAggregation(aggDesc));
}
private OptionalLong getOmniDataLimit() {
return OmniDataLimit.getOmniDataLimit(limitDesc.getLimit());
}
- private DataTypePhysicalVariation[] getDataTypePhysicalVariation(VectorizationContext vOutContext) {
- List variations = new ArrayList<>();
- vOutContext.getProjectedColumns().forEach(c -> {
- try {
- variations.add(vOutContext.getDataTypePhysicalVariation(c));
- } catch (HiveException e) {
- e.printStackTrace();
- }
- });
- return variations.toArray(new DataTypePhysicalVariation[0]);
- }
-
- private void replaceTableScanOp(TableScanOperator tableScanOp, BaseWork work) {
+ private void replaceTableScanOp(TableScanOperator tableScanOp, BaseWork work){
if (isPushDownFilter) {
if (isPushDownPartFilter) {
- replaceTableScanRawFilter(tableScanOp, tableScanOp.getChildOperators(), unsupportedFilterDesc);
replaceRawVectorizedRowBatchCtx(tableScanOp, work);
} else {
removeTableScanRawFilter(tableScanOp.getChildOperators());
}
- } else {
- filterDesc = null;
+ ndpCtx = new NdpVectorizedRowBatchCtx(work.getVectorizedRowBatchCtx());
+ // set rowColumnTypeInfos to TableScanDesc
+ tableScanOp.getConf().setRowColumnTypeInfos(work.getVectorizedRowBatchCtx().getRowColumnTypeInfos());
}
if (isPushDownAgg) {
removeTableScanRawAggregation(tableScanOp.getChildOperators(), work);
removeTableScanRawSelect(tableScanOp.getChildOperators());
- isAggOptimized = isPushDownAgg;
- } else {
- aggDesc = null;
}
- }
-
- private void replaceAggRawVectorizedRowBatchCtx(BaseWork work, VectorizationContext vOutContext) {
- VectorizedRowBatchCtx oldCtx = work.getVectorizedRowBatchCtx();
- VectorizedRowBatchCtx ndpCtx = new VectorizedRowBatchCtx(
- vOutContext.getInitialColumnNames().toArray(new String[0]), vOutContext.getInitialTypeInfos(),
- getDataTypePhysicalVariation(vOutContext),
- vOutContext.getProjectedColumns().stream().mapToInt(Integer::valueOf).toArray(),
- oldCtx.getPartitionColumnCount(), 0, oldCtx.getNeededVirtualColumns(),
- vOutContext.getScratchColumnTypeNames(), vOutContext.getScratchDataTypePhysicalVariations());
- work.setVectorizedRowBatchCtx(ndpCtx);
+ // set whether to push down
+ tableScanOp.getConf().setPushDownFilter(isPushDownFilter);
+ tableScanOp.getConf().setPushDownAgg(isPushDownAgg);
}
/**
@@ -385,33 +371,12 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
*/
private void replaceRawVectorizedRowBatchCtx(TableScanOperator tableScanOp, BaseWork work) {
VectorizedRowBatchCtx oldCtx = work.getVectorizedRowBatchCtx();
- VectorizedRowBatchCtx ndpCtx = new VectorizedRowBatchCtx(oldCtx.getRowColumnNames(),
+ VectorizedRowBatchCtx newCtx = new VectorizedRowBatchCtx(oldCtx.getRowColumnNames(),
oldCtx.getRowColumnTypeInfos(), oldCtx.getRowdataTypePhysicalVariations(), oldCtx.getDataColumnNums(),
oldCtx.getPartitionColumnCount(), oldCtx.getVirtualColumnCount(), oldCtx.getNeededVirtualColumns(),
tableScanOp.getOutputVectorizationContext().getScratchColumnTypeNames(),
tableScanOp.getOutputVectorizationContext().getScratchDataTypePhysicalVariations());
- work.setVectorizedRowBatchCtx(ndpCtx);
- }
-
- private void replaceTableScanRawFilter(TableScanOperator tableScanOp,
- List> operators, ExprNodeDesc newExprDesc) {
- try {
- for (Operator extends OperatorDesc> child : operators) {
- if (child instanceof VectorFilterOperator) {
- // Convert 'ExprNodeDesc' to 'VectorExpression' via VectorizationContext, mode is 'FILTER'
- VectorExpression ndpExpr = tableScanOp.getOutputVectorizationContext()
- .getVectorExpression(newExprDesc, VectorExpressionDescriptor.Mode.FILTER);
- // Replace with new filter expression
- ((VectorFilterOperator) child).setFilterCondition(ndpExpr);
- return;
- }
- replaceTableScanRawFilter(tableScanOp, child.getChildOperators(), newExprDesc);
- }
- } catch (SemanticException e) {
- LOG.error("Exception when trying to remove partition predicates: fail to find child from parent", e);
- } catch (HiveException e) {
- e.printStackTrace();
- }
+ work.setVectorizedRowBatchCtx(newCtx);
}
private void removeTableScanRawFilter(List> operators) {
@@ -433,10 +398,6 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
try {
for (Operator extends OperatorDesc> child : operators) {
if (child instanceof VectorGroupByOperator) {
- if (engine.equals(MR)) {
- replaceAggRawVectorizedRowBatchCtx(work,
- ((VectorGroupByOperator) child).getOutputVectorizationContext());
- }
// remove raw VectorGroupByOperator
child.getParentOperators().get(0).removeChildAndAdoptItsChildren(child);
return;
@@ -449,22 +410,13 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
}
private void removeTableScanRawSelect(List> operators) {
- try {
- for (Operator extends OperatorDesc> child : operators) {
- if (child instanceof VectorSelectOperator) {
- if (engine.equals(MR)) {
- // remove raw VectorSelectOperator
- child.getParentOperators().get(0).removeChildAndAdoptItsChildren(child);
- } else {
- // remove raw VectorExpression
- ((VectorSelectOperator) child).setvExpressions(new VectorExpression[] {});
- }
- return;
- }
- removeTableScanRawSelect(child.getChildOperators());
+ for (Operator extends OperatorDesc> child : operators) {
+ if (child instanceof VectorSelectOperator) {
+ // remove raw VectorExpression
+ ((VectorSelectOperator) child).setvExpressions(new VectorExpression[] {});
+ return;
}
- } catch (SemanticException e) {
- LOG.error("Exception when trying to remove partition predicates: fail to find child from parent", e);
+ removeTableScanRawSelect(child.getChildOperators());
}
}
@@ -473,13 +425,13 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
"Table [%s] Push Down Info [ Select:[%s], Filter:[%s], Aggregation:[%s], Group By:[%s], Limit:[%s], Raw Filter:[%s], Host Map:[%s]",
tableName, ((selectDesc != null) && isPushDownAgg) ? Arrays.stream(selectDesc.getSelectExpressions())
.map(VectorExpression::toString)
- .collect(Collectors.joining(", ")) : "", (filterDesc != null) ? filterDesc.toString() : "",
- (aggDesc != null) ? (aggDesc.getAggregators()
+ .collect(Collectors.joining(", ")) : "", isPushDownFilter ? filterDesc.toString() : "",
+ isPushDownAgg ? (aggDesc.getAggregators()
.stream()
.map(AggregationDesc::getExprString)
.collect(Collectors.joining(", "))) : "",
- (aggDesc != null && aggDesc.getKeyString() != null) ? aggDesc.getKeyString() : "",
- (limitDesc != null) ? limitDesc.getLimit() : "",
+ (isPushDownAgg && aggDesc.getKeyString() != null) ? aggDesc.getKeyString() : "",
+ isPushDownLimit ? limitDesc.getLimit() : "",
(unsupportedFilterDesc != null) ? unsupportedFilterDesc.toString() : "",
((ndpStatusInfoMap.size() > 0) ? ndpStatusInfoMap.entrySet()
.stream()
@@ -501,6 +453,8 @@ public class NdpPlanResolver implements PhysicalPlanResolver {
selectDesc = null;
aggDesc = null;
limitDesc = null;
+ ndpCtx = null;
+ dataFormat = "";
}
}
}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpStatisticsUtils.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpStatisticsUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..9285f44d0f910d8c9b6db55676989232f74f8879
--- /dev/null
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpStatisticsUtils.java
@@ -0,0 +1,509 @@
+/*
+ * Copyright (C) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.omnidata.physical;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.LevelOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateStatsProcCtx;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.StatsRulesProcFactory;
+import org.apache.hadoop.hive.ql.parse.ColumnStatsList;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * OmniData Hive statistics tool
+ * Used to collect statistics for optimization
+ *
+ * @since 2022-07-12
+ */
+public class NdpStatisticsUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(NdpStatisticsUtils.class);
+
+ /**
+ * basic reduce quantity and protection measures
+ */
+ public static final int BASED_REDUCES = 50;
+
+ /**
+ * basic selectivity quantity and protection measures
+ */
+ public static final double BASED_SELECTIVITY = 0.025d;
+
+ /**
+ * Ratio of the length of the needed column to the total length, the value cannot be less than the basic value
+ */
+ public static final double BASED_COL_LENGTH_PROPORTION = 0.025d;
+
+ /**
+ * Optimization can be performed only when the selectivity is lower than this threshold
+ */
+ public static final double SELECTIVITY_THRESHOLD = 0.5d;
+
+ /**
+ * Number of decimal places reserved
+ */
+ public static final int OMNIDATA_SCALE = 4;
+
+ /**
+ * Calculate the estimated selectivity based on the total number of Table and the total number of Filter
+ *
+ * @param tableScanOp TableScanOperator
+ * @return selectivity
+ */
+ public static double getSelectivity(TableScanOperator tableScanOp) {
+ double selectivity = 1.0d;
+ if (tableScanOp.getConf().getStatistics() == null) {
+ return selectivity;
+ }
+ long tableCount = tableScanOp.getConf().getStatistics().getNumRows();
+ long filterCount = tableScanOp.getChildOperators().get(0).getConf().getStatistics().getNumRows();
+ if (tableCount > 0) {
+ BigDecimal tableCountBig = new BigDecimal(tableCount);
+ BigDecimal filterCountBig = new BigDecimal(filterCount);
+ selectivity = filterCountBig.divide(tableCountBig, OMNIDATA_SCALE, BigDecimal.ROUND_HALF_UP).doubleValue();
+ }
+ return selectivity;
+ }
+
+ /**
+ * Through TezWork, get MapWork, ReduceWork and TableScanOperator
+ *
+ * @param tezWork TezWork
+ * @param mapWorkMap MapWork's map
+ * @param tableScanOperatorMap TableScanOperator's map
+ * @param reduceWorkMap ReduceWork's map
+ */
+ public static void generateMapReduceWork(TezWork tezWork, Map mapWorkMap,
+ Map tableScanOperatorMap, Map reduceWorkMap) {
+ for (BaseWork work : tezWork.getAllWork()) {
+ if (work instanceof MapWork) {
+ MapWork mapWork = (MapWork) work;
+ if (mapWork.getAliasToWork().size() != 1) {
+ continue;
+ }
+ // support only one operator
+ generateMapWork(mapWork, mapWorkMap, tableScanOperatorMap);
+ }
+ if (work instanceof ReduceWork) {
+ reduceWorkMap.put(work.getName(), (ReduceWork) work);
+ }
+ }
+ }
+
+ /**
+ * Analyze the table in advance to collect statistics.
+ * Estimate the optimized size based on the selectivity and needed column proportion.
+ *
+ * @param tableScanOp tableScanOp
+ * @param parseContext The current parse context.
+ * @return optimized size
+ */
+ public static long estimateOptimizedSize(TableScanOperator tableScanOp, ParseContext parseContext) {
+ long dataSize = tableScanOp.getConf().getStatistics().getDataSize();
+ long avgRowSize = tableScanOp.getConf().getStatistics().getAvgRowSize();
+ double omniDataSelectivity = Math.max(tableScanOp.getConf().getOmniDataSelectivity(), BASED_SELECTIVITY);
+ // get the average length of the needed column
+ double totalAvgColLen = getTotalAvgColLen(tableScanOp, parseContext);
+ if (totalAvgColLen <= 0 || avgRowSize <= 0) {
+ return (long) ((double) dataSize * omniDataSelectivity);
+ } else {
+ double colLenProportion = Math.max(totalAvgColLen / (double) avgRowSize, BASED_COL_LENGTH_PROPORTION);
+ return (long) ((double) dataSize * omniDataSelectivity * colLenProportion);
+ }
+ }
+
+ /**
+ * get the average length of the needed column
+ *
+ * @param tableScanOp tableScanOp
+ * @param parseContext parseContext
+ * @return table's average length
+ */
+ public static double getTotalAvgColLen(TableScanOperator tableScanOp, ParseContext parseContext) {
+ Table table = tableScanOp.getConf().getTableMetadata();
+ List colStats;
+ if (table.isPartitioned()) {
+ Optional statistics = collectStatistics(parseContext, tableScanOp);
+ if (statistics.isPresent()) {
+ colStats = statistics.get().getColumnStats();
+ } else {
+ colStats = new ArrayList<>();
+ }
+ } else {
+ // Get table level column statistics from metastore for needed columns
+ colStats = StatsUtils.getTableColumnStats(tableScanOp.getConf().getTableMetadata(),
+ tableScanOp.getSchema().getSignature(), tableScanOp.getConf().getNeededColumns(), null);
+ }
+ double totalAvgColLen = 0d;
+ // add the avgColLen for each column, colStats's size may be null
+ for (ColStatistics colStat : colStats) {
+ totalAvgColLen += colStat.getAvgColLen();
+ }
+ return totalAvgColLen;
+ }
+
+ /**
+ * Statistics: Describes the output of an operator in terms of size, rows, etc based on estimates.
+ * use ParseContext to collect Table's statistics
+ *
+ * @param pctx The current parse context.
+ * @param tableScanOp TableScanOperator
+ * @return Statistics
+ */
+ public static Optional collectStatistics(ParseContext pctx, TableScanOperator tableScanOp) {
+ try {
+ AnnotateStatsProcCtx aspCtx = new AnnotateStatsProcCtx(pctx);
+ PrunedPartitionList partList = aspCtx.getParseContext().getPrunedPartitions(tableScanOp);
+ ColumnStatsList colStatsCached = aspCtx.getParseContext().getColStatsCached(partList);
+ Table table = tableScanOp.getConf().getTableMetadata();
+ // column level statistics are required only for the columns that are needed
+ List schema = tableScanOp.getSchema().getSignature();
+ List neededColumns = tableScanOp.getNeededColumns();
+ List referencedColumns = tableScanOp.getReferencedColumns();
+ // gather statistics for the first time and the attach it to table scan operator
+ return Optional.of(
+ StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, schema, neededColumns, colStatsCached,
+ referencedColumns, true));
+ } catch (HiveException e) {
+ LOG.error("OmniData Hive failed to retrieve stats ", e);
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * estimate reducer number
+ *
+ * @param totalInputFileSize total input size
+ * @param bytesPerReducer Size of bytes available for each reduce
+ * @param oldReducers old reduce number
+ * @return new reduce number
+ */
+ public static int estimateReducersByFileSize(long totalInputFileSize, long bytesPerReducer, int oldReducers) {
+ if (totalInputFileSize <= 0) {
+ return 0;
+ }
+ double bytes = Math.max(totalInputFileSize, bytesPerReducer);
+ int newReducers = (int) Math.ceil(bytes / (double) bytesPerReducer);
+ // The new reduce number cannot be less than the basic reduce number
+ newReducers = Math.max(BASED_REDUCES, newReducers);
+ // The new reduce number cannot be greater than the old reduce number
+ newReducers = Math.min(oldReducers, newReducers);
+ return newReducers;
+ }
+
+ /**
+ * Update TezEdgeProperty and set reduce tasks in ReduceWork
+ *
+ * @param reduceWork ReduceWork
+ * @param optimizedNumReduces optimized reduce numbers
+ * @param bytesPerReducer size per reducer
+ */
+ public static void setOptimizedNumReduceTasks(ReduceWork reduceWork, int optimizedNumReduces,
+ long bytesPerReducer) {
+ // limit for reducers
+ final int maxReducers = reduceWork.getNumReduceTasks();
+ if (!reduceWork.isAutoReduceParallelism()) {
+ // set optimized reduce number
+ reduceWork.setNumReduceTasks(Math.min(optimizedNumReduces, maxReducers));
+ return;
+ }
+ // tez auto reduce parallelism should be set to the 'minPartition' and 'maxPartition'
+ float minPartitionFactor = 0.5f;
+ float maxPartitionFactor = 1.0f;
+
+ // min we allow tez to pick
+ int minPartition = Math.max(1, (int) ((float) optimizedNumReduces * minPartitionFactor));
+ minPartition = Math.min(minPartition, maxReducers);
+
+ // max we allow tez to pick
+ int maxPartition = Math.max(1, (int) ((float) optimizedNumReduces * maxPartitionFactor));
+ maxPartition = Math.min(maxPartition, maxReducers);
+
+ // set optimized reduce number
+ reduceWork.setNumReduceTasks(optimizedNumReduces);
+ reduceWork.setMinReduceTasks(minPartition);
+ reduceWork.setMaxReduceTasks(maxPartition);
+
+ // update TezEdgeProperty
+ reduceWork.getEdgePropRef()
+ .setAutoReduce(reduceWork.getEdgePropRef().getHiveConf(), true, minPartition, maxPartition,
+ bytesPerReducer);
+ }
+
+ /**
+ * The ReduceWork entry is optimized
+ *
+ * @param tableScanOperatorMap TableScanOperator's map
+ * @param reduceWorkMap ReduceWork's map
+ * @param hiveConf hive conf
+ * @param pctx The current parse context.
+ */
+ public static void optimizeReduceWork(Map tableScanOperatorMap,
+ Map reduceWorkMap, HiveConf hiveConf, ParseContext pctx) {
+ // use 'tez.grouping.max-size' to optimize bytesPerReducer
+ long bytesPerReducer = Math.max(hiveConf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER),
+ TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
+ for (ReduceWork reduceWork : reduceWorkMap.values()) {
+ if (!reduceWork.isAutoReduceParallelism() || reduceWork.getTagToInput().values().size() <= 0) {
+ continue;
+ }
+ // supported 'isAutoReduceParallelism' is true
+ int optimizedNumReduces = estimateNumReducesByInput(reduceWork, tableScanOperatorMap, reduceWorkMap, pctx,
+ bytesPerReducer);
+ if (optimizedNumReduces > 0) {
+ NdpStatisticsUtils.setOptimizedNumReduceTasks(reduceWork, optimizedNumReduces, bytesPerReducer);
+ }
+ }
+ }
+
+ /**
+ * The LengthPerGroup is optimized based on the selectivity of Agg or Filter
+ * to increase the number of splits processed by a Tez Task.
+ *
+ * @param conf hive conf
+ * @return optimized LengthPerGroup
+ */
+ public static double optimizeLengthPerGroup(Configuration conf) {
+ double selectivity = OmniDataConf.getOmniDataTableOptimizedSelectivity(conf);
+ double coefficient = 1d;
+ double configCoefficient = OmniDataConf.getOmniDataGroupOptimizedCoefficient(conf);
+ if (OmniDataConf.getOmniDataAggOptimizedEnabled(conf)) {
+ if (configCoefficient > 0) {
+ LOG.info("Desired OmniData optimized coefficient overridden by config to: {}", configCoefficient);
+ return configCoefficient;
+ }
+ // OmniData agg optimized
+ if (selectivity <= SELECTIVITY_THRESHOLD) {
+ int maxAggCoefficient = 10;
+ coefficient = Math.min((new BigDecimal("1").divide(new BigDecimal(selectivity), OMNIDATA_SCALE,
+ BigDecimal.ROUND_HALF_UP)).add(new BigDecimal("1")).doubleValue(), maxAggCoefficient);
+ }
+ } else if (OmniDataConf.getOmniDataFilterOptimizedEnabled(conf)) {
+ if (configCoefficient > 0) {
+ LOG.info("Desired OmniData optimized coefficient overridden by config to: {}", configCoefficient);
+ return configCoefficient;
+ }
+ // OmniData filter optimized
+ if (selectivity <= SELECTIVITY_THRESHOLD) {
+ int maxFilterCoefficient = 10;
+ coefficient = Math.min((new BigDecimal("0.5").divide(new BigDecimal(selectivity), OMNIDATA_SCALE,
+ BigDecimal.ROUND_HALF_UP)).add(new BigDecimal("0.5")).doubleValue(), maxFilterCoefficient);
+ }
+ } else {
+ if (OmniDataConf.getOmniDataGroupOptimizedEnabled(conf)) {
+ double maxCoefficient = 2.5d;
+ coefficient = Math.min((new BigDecimal("0.2").divide(new BigDecimal(selectivity), OMNIDATA_SCALE,
+ BigDecimal.ROUND_HALF_UP)).add(new BigDecimal("1")).doubleValue(), maxCoefficient);
+ }
+ }
+ return coefficient;
+ }
+
+ /**
+ * The Filter in TableScanOperator is changed. The Stats of the Filter needs to be updated.
+ *
+ * @param pctx The current parse context.
+ * @param tableScanOp TableScanOperator
+ * @throws SemanticException Exception from SemanticAnalyzer.
+ */
+ public static void updateFilterStats(ParseContext pctx, TableScanOperator tableScanOp) throws SemanticException {
+ AnnotateStatsProcCtx aspCtx = new AnnotateStatsProcCtx(pctx);
+
+ // create a walker which walks the tree in a BFS manner while maintaining the
+ // operator stack. The dispatcher generates the plan from the operator tree
+ Map opRules = new LinkedHashMap<>();
+ opRules.put(new RuleRegExp("FIL", FilterOperator.getOperatorName() + "%"),
+ StatsRulesProcFactory.getFilterRule());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher dispatcher = new DefaultRuleDispatcher(StatsRulesProcFactory.getDefaultRule(), opRules, aspCtx);
+ GraphWalker ogw = new LevelOrderWalker(dispatcher, 0);
+
+ // Create a list of topOp nodes
+ ArrayList topNodes = new ArrayList<>();
+ topNodes.add(tableScanOp);
+ ogw.startWalking(topNodes, null);
+ }
+
+ /**
+ * get DataTypePhysicalVariations
+ *
+ * @param vOutContext Context class for vectorization execution.
+ * @return DataTypePhysicalVariation[]
+ */
+ public static DataTypePhysicalVariation[] getDataTypePhysicalVariation(VectorizationContext vOutContext) {
+ List variations = new ArrayList<>();
+ vOutContext.getProjectedColumns().forEach(c -> {
+ try {
+ variations.add(vOutContext.getDataTypePhysicalVariation(c));
+ } catch (HiveException e) {
+ LOG.error("OmniData Hive failed to get DataTypePhysicalVariation ", e);
+ }
+ });
+ return variations.toArray(new DataTypePhysicalVariation[0]);
+ }
+
+ /**
+ * Evaluate whether the part pushdown selectivity meets the requirements. If yes, update the filter and selectivity.
+ *
+ * @param tableScanOp TableScanOperator
+ * @param newExprDesc new Filter ExprNodeDesc
+ * @param parseContext The current parse context.
+ * @param conf hive conf
+ * @return true or false
+ */
+ public static boolean evaluatePartFilterSelectivity(TableScanOperator tableScanOp, ExprNodeDesc newExprDesc,
+ ParseContext parseContext, Configuration conf) {
+ Operator extends OperatorDesc> operator = tableScanOp.getChildOperators().get(0);
+ if (!(operator instanceof VectorFilterOperator)) {
+ return false;
+ }
+ VectorFilterOperator vectorFilterOp = (VectorFilterOperator) operator;
+ VectorExpression oldExpr = vectorFilterOp.getPredicateExpression();
+ ExprNodeDesc oldExprNodeDesc = vectorFilterOp.getConf().getPredicate();
+ try {
+ VectorExpression ndpExpr = tableScanOp.getOutputVectorizationContext()
+ .getVectorExpression(newExprDesc, VectorExpressionDescriptor.Mode.FILTER);
+ // set new filter expression
+ vectorFilterOp.setFilterCondition(ndpExpr);
+ // set new filter desc
+ vectorFilterOp.getConf().setPredicate(newExprDesc);
+ // update filter's statistics
+ NdpStatisticsUtils.updateFilterStats(parseContext, tableScanOp);
+ } catch (HiveException e) {
+ LOG.error(String.valueOf(e));
+ return false;
+ }
+ // Calculate the new selection rate
+ double newSelectivity = tableScanOp.getConf().getOmniDataSelectivity() / NdpStatisticsUtils.getSelectivity(
+ tableScanOp);
+ if (newSelectivity <= OmniDataConf.getOmniDataFilterSelectivity(conf)) {
+ // set table's selectivity
+ tableScanOp.getConf().setOmniDataSelectivity(newSelectivity);
+ LOG.info("Table [{}] part selectivity is {}", tableScanOp.getConf().getAlias(), newSelectivity);
+ return true;
+ }
+ vectorFilterOp.setFilterCondition(oldExpr);
+ vectorFilterOp.getConf().setPredicate(oldExprNodeDesc);
+ try {
+ NdpStatisticsUtils.updateFilterStats(parseContext, tableScanOp);
+ } catch (SemanticException e) {
+ LOG.error("OmniData Hive failed to get updateFilterStats ", e);
+ }
+ LOG.info("Table [{}] failed to part push down, since selectivity[{}] > threshold[{}]",
+ tableScanOp.getConf().getAlias(), newSelectivity, OmniDataConf.getOmniDataFilterSelectivity(conf));
+ return false;
+ }
+
+ private static void generateMapWork(MapWork mapWork, Map mapWorkMap,
+ Map tableScanOperatorMap) {
+ mapWork.getAliasToWork().values().forEach(operator -> {
+ if (operator instanceof TableScanOperator) {
+ tableScanOperatorMap.put(mapWork.getName(), (TableScanOperator) operator);
+ mapWorkMap.put(mapWork.getName(), mapWork);
+ }
+ });
+ }
+
+ private static int estimateNumReducesByInput(ReduceWork reduceWork,
+ Map tableScanOperatorMap, Map reduceWorkMap, ParseContext pctx,
+ long bytesPerReducer) {
+ boolean isSupported = true;
+ long totalOptimizedSize = 0L;
+ int reduces = 0;
+ int totalReduces = 0;
+ // we need to add up all the estimates from reduceWork's input
+ for (String inputWorkName : reduceWork.getTagToInput().values()) {
+ if (tableScanOperatorMap.containsKey(inputWorkName)) {
+ TableScanOperator tableScanOp = tableScanOperatorMap.get(inputWorkName);
+ if (tableScanOp.getConf().getStatistics() != null) {
+ // add the optimized input sizes
+ totalOptimizedSize += NdpStatisticsUtils.estimateOptimizedSize(tableScanOp, pctx);
+ } else {
+ isSupported = false;
+ break;
+ }
+ } else if (reduceWorkMap.containsKey(inputWorkName)) {
+ reduces++;
+ // add the child's reduce number
+ totalReduces += reduceWorkMap.get(inputWorkName).getNumReduceTasks();
+ } else {
+ // unsupported MergeJoinWork
+ isSupported = false;
+ break;
+ }
+ }
+ int optimizedNumReduces = -1;
+ if (isSupported) {
+ optimizedNumReduces = NdpStatisticsUtils.estimateReducersByFileSize(totalOptimizedSize, bytesPerReducer,
+ reduceWork.getNumReduceTasks());
+ // reduce work exists in the input
+ if (reduces > 0) {
+ int avgReduces = (int) (totalReduces / reduces);
+ // When the number of reduce works of a map work is less than the BASED_REDUCES,
+ // the number of reduce work can be ignored.
+ optimizedNumReduces = (optimizedNumReduces > NdpStatisticsUtils.BASED_REDUCES) ? optimizedNumReduces
+ + avgReduces : avgReduces;
+ }
+ }
+ return optimizedNumReduces;
+ }
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpVectorizedRowBatchCtx.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpVectorizedRowBatchCtx.java
new file mode 100644
index 0000000000000000000000000000000000000000..4827cae0210ed30b780aa694f2bdce1b2340b23b
--- /dev/null
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/physical/NdpVectorizedRowBatchCtx.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.omnidata.physical;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.IOPrepareCache;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde2.io.DateWritableV2;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Context for Ndp Vectorized row batch.
+ *
+ * @since 2022-05-28
+ */
+public class NdpVectorizedRowBatchCtx implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String[] rowColumnNames;
+
+ private DataTypePhysicalVariation[] rowDataTypePhysicalVariations;
+
+ private int[] dataColumnNums;
+
+ private int dataColumnCount;
+
+ private int partitionColumnCount;
+
+ private int virtualColumnCount;
+
+ private VirtualColumn[] neededVirtualColumns;
+
+ private String[] scratchColumnTypeNames;
+
+ private DataTypePhysicalVariation[] scratchDataTypePhysicalVariations;
+
+ /**
+ * Constructor for VectorizedRowBatchCtx
+ */
+ public NdpVectorizedRowBatchCtx() {
+ }
+
+ public NdpVectorizedRowBatchCtx(VectorizedRowBatchCtx vectorizedRowBatchCtx) {
+ this(vectorizedRowBatchCtx.getRowColumnNames(), vectorizedRowBatchCtx.getRowdataTypePhysicalVariations(),
+ vectorizedRowBatchCtx.getDataColumnNums(), vectorizedRowBatchCtx.getDataColumnCount(),
+ vectorizedRowBatchCtx.getPartitionColumnCount(), vectorizedRowBatchCtx.getVirtualColumnCount(),
+ vectorizedRowBatchCtx.getNeededVirtualColumns(), vectorizedRowBatchCtx.getScratchColumnTypeNames(),
+ vectorizedRowBatchCtx.getScratchDataTypePhysicalVariations());
+ }
+
+ @JsonCreator
+ public NdpVectorizedRowBatchCtx(@JsonProperty("rowColumnNames") String[] rowColumnNames,
+ @JsonProperty("rowDataTypePhysicalVariations") DataTypePhysicalVariation[] rowDataTypePhysicalVariations,
+ @JsonProperty("dataColumnNums") int[] dataColumnNums, @JsonProperty("dataColumnCount") int dataColumnCount,
+ @JsonProperty("partitionColumnCount") int partitionColumnCount,
+ @JsonProperty("virtualColumnCount") int virtualColumnCount,
+ @JsonProperty("neededVirtualColumns") VirtualColumn[] neededVirtualColumns,
+ @JsonProperty("scratchColumnTypeNames") String[] scratchColumnTypeNames,
+ @JsonProperty("scratchDataTypePhysicalVariations")
+ DataTypePhysicalVariation[] scratchDataTypePhysicalVariations) {
+ this.rowColumnNames = rowColumnNames;
+ this.rowDataTypePhysicalVariations = rowDataTypePhysicalVariations;
+ this.dataColumnNums = dataColumnNums;
+ this.dataColumnCount = dataColumnCount;
+ this.partitionColumnCount = partitionColumnCount;
+ this.virtualColumnCount = virtualColumnCount;
+ this.neededVirtualColumns = neededVirtualColumns;
+ this.scratchColumnTypeNames = scratchColumnTypeNames;
+ this.scratchDataTypePhysicalVariations = scratchDataTypePhysicalVariations;
+ }
+
+ private ColumnVector createColumnVectorFromRowColumnTypeInfos(int columnNum, TypeInfo[] rowColumnTypeInfos) {
+ TypeInfo typeInfo = rowColumnTypeInfos[columnNum];
+ DataTypePhysicalVariation dataTypePhysicalVariation = DataTypePhysicalVariation.NONE;
+ if (rowDataTypePhysicalVariations != null) {
+ dataTypePhysicalVariation = rowDataTypePhysicalVariations[columnNum];
+ }
+ return VectorizedBatchUtil.createColumnVector(typeInfo, dataTypePhysicalVariation);
+ }
+
+ public boolean isVirtualColumnNeeded(String virtualColumnName) {
+ for (VirtualColumn neededVirtualColumn : neededVirtualColumns) {
+ if (neededVirtualColumn.getName().equals(virtualColumnName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Creates a Vectorized row batch and the column vectors.
+ *
+ * @return VectorizedRowBatch
+ */
+ public VectorizedRowBatch createVectorizedRowBatch(TypeInfo[] rowColumnTypeInfos) {
+ final int nonScratchColumnCount = rowColumnTypeInfos.length;
+ final int totalColumnCount = nonScratchColumnCount + scratchColumnTypeNames.length;
+ VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+ if (dataColumnNums == null) {
+ // All data and partition columns.
+ for (int i = 0; i < nonScratchColumnCount; i++) {
+ result.cols[i] = createColumnVectorFromRowColumnTypeInfos(i, rowColumnTypeInfos);
+ }
+ } else {
+ // Create only needed/included columns data columns.
+ for (int i = 0; i < dataColumnNums.length; i++) {
+ int columnNum = dataColumnNums[i];
+ Preconditions.checkState(columnNum < nonScratchColumnCount);
+ result.cols[columnNum] = createColumnVectorFromRowColumnTypeInfos(columnNum, rowColumnTypeInfos);
+ }
+ // Always create partition and virtual columns.
+ final int partitionEndColumnNum = dataColumnCount + partitionColumnCount;
+ for (int partitionColumnNum = dataColumnCount;
+ partitionColumnNum < partitionEndColumnNum; partitionColumnNum++) {
+ result.cols[partitionColumnNum] = VectorizedBatchUtil.createColumnVector(
+ rowColumnTypeInfos[partitionColumnNum]);
+ }
+ final int virtualEndColumnNum = partitionEndColumnNum + virtualColumnCount;
+ for (int virtualColumnNum = partitionEndColumnNum;
+ virtualColumnNum < virtualEndColumnNum; virtualColumnNum++) {
+ String virtualColumnName = rowColumnNames[virtualColumnNum];
+ if (!isVirtualColumnNeeded(virtualColumnName)) {
+ continue;
+ }
+ result.cols[virtualColumnNum] = VectorizedBatchUtil.createColumnVector(
+ rowColumnTypeInfos[virtualColumnNum]);
+ }
+ }
+
+ for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+ String typeName = scratchColumnTypeNames[i];
+ DataTypePhysicalVariation dataTypePhysicalVariation = scratchDataTypePhysicalVariations[i];
+ result.cols[nonScratchColumnCount + i] = VectorizedBatchUtil.createColumnVector(typeName,
+ dataTypePhysicalVariation);
+ }
+
+ // UNDONE: Also remember virtualColumnCount...
+ result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
+ result.reset();
+ return result;
+ }
+
+ public static void getPartitionValues(NdpVectorizedRowBatchCtx vrbCtx, Configuration hiveConf, FileSplit split,
+ Object[] partitionValues, TypeInfo[] rowColumnTypeInfos) {
+ MapWork mapWork = Utilities.getMapWork(hiveConf);
+ getPartitionValues(vrbCtx, mapWork, split, partitionValues, rowColumnTypeInfos);
+ }
+
+ public static void getPartitionValues(NdpVectorizedRowBatchCtx vrbCtx, MapWork mapWork, FileSplit split,
+ Object[] partitionValues, TypeInfo[] rowColumnTypeInfos) {
+ Map pathToPartitionInfo = mapWork.getPathToPartitionInfo();
+ try {
+ PartitionDesc partDesc = HiveFileFormatUtils.getFromPathRecursively(pathToPartitionInfo, split.getPath(),
+ IOPrepareCache.get().getPartitionDescMap());
+ getPartitionValues(vrbCtx, partDesc, partitionValues, rowColumnTypeInfos);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void getPartitionValues(NdpVectorizedRowBatchCtx vrbCtx, PartitionDesc partDesc,
+ Object[] partitionValues, TypeInfo[] rowColumnTypeInfos) {
+
+ LinkedHashMap partSpec = partDesc.getPartSpec();
+
+ for (int i = 0; i < vrbCtx.partitionColumnCount; i++) {
+ Object objectValue;
+ if (partSpec == null) {
+ // For partition-less table, initialize partValue to empty string.
+ // We can have partition-less table even if we have partition keys
+ // when there is only only partition selected and the partition key is not
+ // part of the projection/include list.
+ objectValue = null;
+ } else {
+ String key = vrbCtx.rowColumnNames[vrbCtx.dataColumnCount + i];
+
+ // Create a Standard java object Inspector
+ TypeInfo partColTypeInfo = rowColumnTypeInfos[vrbCtx.dataColumnCount + i];
+ ObjectInspector objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+ partColTypeInfo);
+ objectValue = ObjectInspectorConverters.
+ getConverter(PrimitiveObjectInspectorFactory.
+ javaStringObjectInspector, objectInspector).
+ convert(partSpec.get(key));
+ if (partColTypeInfo instanceof CharTypeInfo) {
+ objectValue = ((HiveChar) objectValue).getStrippedValue();
+ }
+ }
+ partitionValues[i] = objectValue;
+ }
+ }
+
+ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partitionValues,
+ TypeInfo[] rowColumnTypeInfos) {
+ addPartitionColsToBatch(batch.cols, partitionValues, rowColumnTypeInfos);
+ }
+
+ public void addPartitionColsToBatch(ColumnVector[] cols, Object[] partitionValues, TypeInfo[] rowColumnTypeInfos) {
+ if (partitionValues != null) {
+ for (int i = 0; i < partitionColumnCount; i++) {
+ Object value = partitionValues[i];
+
+ int colIndex = dataColumnCount + i;
+ String partitionColumnName = rowColumnNames[colIndex];
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex];
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
+ case BOOLEAN: {
+ LongColumnVector lcv = (LongColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(lcv);
+ } else {
+ if (value instanceof Boolean) {
+ lcv.fill((Boolean) value ? 1 : 0);
+ }
+ }
+ }
+ break;
+ case BYTE: {
+ LongColumnVector lcv = (LongColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(lcv);
+ } else {
+ lcv.fill((Byte) value);
+ }
+ }
+ break;
+ case SHORT: {
+ LongColumnVector lcv = (LongColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(lcv);
+ } else {
+ lcv.fill((Short) value);
+ }
+ }
+ break;
+ case INT: {
+ LongColumnVector lcv = (LongColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(lcv);
+ } else {
+ lcv.fill((Integer) value);
+ }
+ }
+ break;
+ case LONG: {
+ LongColumnVector lcv = (LongColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(lcv);
+ } else {
+ lcv.fill((Long) value);
+ }
+ }
+ break;
+ case DATE: {
+ LongColumnVector lcv = (LongColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(lcv);
+ } else {
+ lcv.fill(DateWritableV2.dateToDays((Date) value));
+ }
+ }
+ break;
+ case TIMESTAMP: {
+ TimestampColumnVector lcv = (TimestampColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(lcv);
+ } else {
+ lcv.fill(((Timestamp) value).toSqlTimestamp());
+ }
+ }
+ break;
+ case INTERVAL_YEAR_MONTH: {
+ LongColumnVector lcv = (LongColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(lcv);
+ } else {
+ lcv.fill(((HiveIntervalYearMonth) value).getTotalMonths());
+ }
+ }
+ case INTERVAL_DAY_TIME: {
+ IntervalDayTimeColumnVector icv = (IntervalDayTimeColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(icv);
+ } else {
+ icv.fill(((HiveIntervalDayTime) value));
+ }
+ }
+ case FLOAT: {
+ DoubleColumnVector dcv = (DoubleColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(dcv);
+ } else {
+ dcv.fill((Float) value);
+ }
+ }
+ break;
+ case DOUBLE: {
+ DoubleColumnVector dcv = (DoubleColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(dcv);
+ } else {
+ dcv.fill((Double) value);
+ }
+ }
+ break;
+ case DECIMAL: {
+ DecimalColumnVector dv = (DecimalColumnVector) cols[colIndex];
+ if (value == null) {
+ setNull(dv);
+ } else {
+ dv.fill((HiveDecimal) value);
+ }
+ }
+ break;
+ case BINARY: {
+ BytesColumnVector bcv = (BytesColumnVector) cols[colIndex];
+ byte[] bytes = (byte[]) value;
+ if (bytes == null) {
+ setNull(bcv);
+ } else {
+ bcv.fill(bytes);
+ }
+ }
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR: {
+ BytesColumnVector bcv = (BytesColumnVector) cols[colIndex];
+ String sVal = value.toString();
+ if (sVal == null) {
+ setNull(bcv);
+ } else {
+ bcv.fill(sVal.getBytes());
+ }
+ }
+ break;
+ default:
+ throw new RuntimeException(
+ "Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory()
+ + " for column " + partitionColumnName);
+ }
+ }
+ }
+ }
+
+ private void setNull(ColumnVector cv) {
+ cv.noNulls = false;
+ cv.isNull[0] = true;
+ cv.isRepeating = true;
+ }
+
+ @JsonProperty
+ public String[] getRowColumnNames() {
+ return rowColumnNames;
+ }
+
+ @JsonProperty
+ public DataTypePhysicalVariation[] getRowDataTypePhysicalVariations() {
+ return rowDataTypePhysicalVariations;
+ }
+
+ @JsonProperty
+ public int[] getDataColumnNums() {
+ return dataColumnNums;
+ }
+
+ @JsonProperty
+ public int getDataColumnCount() {
+ return dataColumnCount;
+ }
+
+ @JsonProperty
+ public int getPartitionColumnCount() {
+ return partitionColumnCount;
+ }
+
+ @JsonProperty
+ public int getVirtualColumnCount() {
+ return virtualColumnCount;
+ }
+
+ @JsonProperty
+ public VirtualColumn[] getNeededVirtualColumns() {
+ return neededVirtualColumns;
+ }
+
+ @JsonProperty
+ public String[] getScratchColumnTypeNames() {
+ return scratchColumnTypeNames;
+ }
+
+ @JsonProperty
+ public DataTypePhysicalVariation[] getScratchDataTypePhysicalVariations() {
+ return scratchDataTypePhysicalVariations;
+ }
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataAdapter.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataAdapter.java
index 6833bda2945e082e0b3fd0ddd47335e2a6845d68..a45e60bccf7f3df65dae9b32aa6d28aafba27945 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataAdapter.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataAdapter.java
@@ -21,153 +21,91 @@ package org.apache.hadoop.hive.ql.omnidata.reader;
import static org.apache.hadoop.hive.ql.omnidata.OmniDataUtils.addPartitionValues;
-
import com.huawei.boostkit.omnidata.exception.OmniDataException;
import com.huawei.boostkit.omnidata.exception.OmniErrorCode;
-import com.huawei.boostkit.omnidata.model.Predicate;
import com.huawei.boostkit.omnidata.model.TaskSource;
import com.huawei.boostkit.omnidata.model.datasource.DataSource;
-import com.huawei.boostkit.omnidata.model.datasource.hdfs.HdfsDataSource;
+import com.huawei.boostkit.omnidata.model.datasource.hdfs.HdfsOrcDataSource;
+import com.huawei.boostkit.omnidata.model.datasource.hdfs.HdfsParquetDataSource;
import com.huawei.boostkit.omnidata.reader.impl.DataReaderImpl;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TaskExecutionException;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.omnidata.OmniDataUtils;
-import org.apache.hadoop.hive.ql.omnidata.config.NdpConf;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
import org.apache.hadoop.hive.ql.omnidata.decode.PageDeserializer;
-import org.apache.hadoop.hive.ql.omnidata.decode.type.DecodeType;
import org.apache.hadoop.hive.ql.omnidata.operator.predicate.NdpPredicateInfo;
import org.apache.hadoop.hive.ql.omnidata.status.NdpStatusManager;
import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.*;
-
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Queue;
/**
* Obtains data from OmniData through OmniDataAdapter and converts the data into Hive List.
- * If the OmniData task fails due to an exception, the task will be retried.
- * The maximum number of retry times is 4.
*/
public class OmniDataAdapter implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(OmniDataAdapter.class);
+ /**
+ * The maximum number of retry times is 4.
+ */
private static final int TASK_FAILED_TIMES = 4;
- private DataSource dataSource;
-
- private Queue batchVectors;
-
- private NdpPredicateInfo ndpPredicateInfo;
+ private TaskSource taskSource;
private List omniDataHosts;
- private int ndpReplicationNum;
-
- public OmniDataAdapter(DataSource dataSource, Configuration conf, FileSplit fileSplit,
- NdpPredicateInfo ndpPredicateInfo) {
- this.dataSource = dataSource;
- if (dataSource instanceof HdfsDataSource && ndpPredicateInfo.getHasPartitionColumn()) {
- this.ndpPredicateInfo = addPartitionValues(ndpPredicateInfo, ((HdfsDataSource) dataSource).getPath(),
- HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
+ private PageDeserializer deserializer;
+
+ public OmniDataAdapter(Configuration conf, FileSplit fileSplit, NdpPredicateInfo ndpPredicateInfo,
+ PageDeserializer deserializer) {
+ this.deserializer = deserializer;
+ String path = fileSplit.getPath().toString();
+ long start = fileSplit.getStart();
+ long length = fileSplit.getLength();
+ // data source information, for connecting to data source.
+ DataSource dataSource;
+ if (ndpPredicateInfo.getDataFormat().toLowerCase(Locale.ENGLISH).contains("parquet")) {
+ dataSource = new HdfsParquetDataSource(path, start, length, false);
} else {
- this.ndpPredicateInfo = ndpPredicateInfo;
+ dataSource = new HdfsOrcDataSource(path, start, length, false);
}
- ndpReplicationNum = NdpConf.getNdpReplicationNum(conf);
- omniDataHosts = getOmniDataHosts(conf, fileSplit);
- }
-
- private List getOmniDataHosts(Configuration conf, FileSplit fileSplit) {
- List omniDataHosts = new ArrayList<>();
- List dataNodeHosts = getDataNodeHosts(conf, fileSplit);
- // shuffle
- Collections.shuffle(dataNodeHosts);
- dataNodeHosts.forEach(dn -> {
- // possibly null
- if (conf.get(dn) != null) {
- omniDataHosts.add(conf.get(dn));
- }
- });
- // add a random available datanode
- String randomDataNodeHost = NdpStatusManager.getRandomAvailableDataNodeHost(conf, dataNodeHosts);
- if (randomDataNodeHost.length() > 0 && conf.get(randomDataNodeHost) != null) {
- omniDataHosts.add(conf.get(randomDataNodeHost));
- }
- return omniDataHosts;
- }
-
- private List getDataNodeHosts(Configuration conf, FileSplit fileSplit) {
- List hosts = new ArrayList<>();
- try {
- BlockLocation[] blockLocations = fileSplit.getPath()
- .getFileSystem(conf)
- .getFileBlockLocations(fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength());
- for (BlockLocation block : blockLocations) {
- for (String host : block.getHosts()) {
- if ("localhost".equals(host)) {
- List dataNodeHosts = new ArrayList<>(
- Arrays.asList(conf.get(NdpStatusManager.NDP_DATANODE_HOSTNAMES)
- .split(NdpStatusManager.NDP_DATANODE_HOSTNAME_SEPARATOR)));
- if (dataNodeHosts.size() > ndpReplicationNum) {
- hosts.addAll(dataNodeHosts.subList(0, ndpReplicationNum));
- } else {
- hosts.addAll(dataNodeHosts);
- }
- return hosts;
- } else {
- hosts.add(host);
- }
- if (ndpReplicationNum == hosts.size()) {
- return hosts;
- }
- }
- }
- } catch (IOException e) {
- LOGGER.error("OmniDataAdapter getDataNodeHosts() failed", e);
+ if (ndpPredicateInfo.getHasPartitionColumn()) {
+ addPartitionValues(ndpPredicateInfo, path, HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
}
- return hosts;
+ this.omniDataHosts = NdpStatusManager.getOmniDataHosts(conf, fileSplit,
+ OmniDataConf.getOmniDataReplicationNum(conf));
+ this.taskSource = new TaskSource(dataSource, ndpPredicateInfo.getPredicate(), 1048576);
}
public Queue getBatchFromOmniData() throws UnknownHostException {
- Predicate predicate = ndpPredicateInfo.getPredicate();
- TaskSource taskSource = new TaskSource(dataSource, predicate, 1048576);
- DecodeType[] columnTypes = new DecodeType[ndpPredicateInfo.getDecodeTypes().size()];
- for (int index = 0; index < columnTypes.length; index++) {
- String codeType = ndpPredicateInfo.getDecodeTypes().get(index);
- if (ndpPredicateInfo.getDecodeTypesWithAgg().get(index)) {
- columnTypes[index] = OmniDataUtils.transOmniDataAggDecodeType(codeType);
- } else {
- columnTypes[index] = OmniDataUtils.transOmniDataDecodeType(codeType);
- }
- }
-
- PageDeserializer deserializer = new PageDeserializer(columnTypes);
-
Queue pages = new LinkedList<>();
int failedTimes = 0;
Properties properties = new Properties();
+ // If the OmniData task fails due to an exception, the task will look for the next available OmniData host
for (String omniDataHost : omniDataHosts) {
String ipAddress = InetAddress.getByName(omniDataHost).getHostAddress();
properties.put("omnidata.client.target.list", ipAddress);
DataReaderImpl> dataReader = null;
try {
- dataReader = new DataReaderImpl<>(properties, taskSource,
- deserializer);
+ dataReader = new DataReaderImpl<>(properties, taskSource, deserializer);
do {
List page = dataReader.getNextPageBlocking();
if (page != null) {
pages.addAll(page);
}
} while (!dataReader.isFinished());
+ dataReader.close();
break;
} catch (OmniDataException omniDataException) {
LOGGER.warn("OmniDataAdapter failed node info [hostname :{}]", omniDataHost);
@@ -199,10 +137,14 @@ public class OmniDataAdapter implements Serializable {
LOGGER.warn("OmniDataException: OMNIDATA_ERROR.");
}
failedTimes++;
+ pages.clear();
+ if (dataReader != null) {
+ dataReader.close();
+ }
} catch (Exception e) {
LOGGER.error("OmniDataAdapter getBatchFromOmnidata() has error:", e);
failedTimes++;
- } finally {
+ pages.clear();
if (dataReader != null) {
dataReader.close();
}
@@ -215,29 +157,4 @@ public class OmniDataAdapter implements Serializable {
}
return pages;
}
-
- public boolean nextBatchFromOmniData(VectorizedRowBatch batch) throws UnknownHostException {
- if (batchVectors == null) {
- batchVectors = getBatchFromOmniData();
- }
- if (!batchVectors.isEmpty()) {
- ColumnVector[] batchVector = batchVectors.poll();
- // channelCount: column, positionCount: row
- int channelCount = batchVector.length;
- int positionCount = batchVector[0].isNull.length;
- if (ndpPredicateInfo.getIsPushDownAgg()) {
- // agg raw return
- System.arraycopy(batchVector, 0, batch.cols, 0, channelCount);
- } else {
- for (int i = 0; i < channelCount; i++) {
- int columnId = ndpPredicateInfo.getOutputColumns().get(i);
- batch.cols[columnId] = batchVector[i];
- }
- }
- batch.size = positionCount;
- return true;
- }
- return false;
- }
-
}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataOrcRecordReader.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataOrcRecordReader.java
deleted file mode 100644
index b7296ec6505309ed6168ef074536ce768752c8ff..0000000000000000000000000000000000000000
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataOrcRecordReader.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (C) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved.
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.omnidata.reader;
-
-import com.huawei.boostkit.omnidata.model.datasource.DataSource;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.RecordReader;
-import org.apache.hadoop.hive.ql.omnidata.operator.predicate.NdpPredicateInfo;
-import org.apache.hadoop.mapred.FileSplit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * OmniDataOrcRecordReader
- */
-public class OmniDataOrcRecordReader implements RecordReader {
- static final Logger LOG = LoggerFactory.getLogger(OmniDataOrcRecordReader.class);
-
- private OmniDataAdapter dataAdapter;
-
- public OmniDataOrcRecordReader(Configuration conf, FileSplit fileSplit, DataSource dataSource,
- NdpPredicateInfo ndpPredicateInfo) {
- this.dataAdapter = new OmniDataAdapter(dataSource, conf, fileSplit, ndpPredicateInfo);
- }
-
- private boolean ensureBatch() {
- throw new UnsupportedOperationException("unsupported");
- }
-
- @Override
- public long getRowNumber() {
- return 0;
- }
-
- @Override
- public float getProgress() throws IOException {
- return 0;
- }
-
- @Override
- public boolean hasNext() {
- return ensureBatch();
- }
-
- @Override
- public void seekToRow(long row) {
- throw new UnsupportedOperationException("unsupported");
- }
-
- @Override
- public Object next(Object previous) throws IOException {
- throw new UnsupportedOperationException("unsupported");
- }
-
- @Override
- public boolean nextBatch(VectorizedRowBatch theirBatch) {
- boolean ret = false;
- try {
- ret = dataAdapter.nextBatchFromOmniData(theirBatch);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return ret;
- }
-
- @Override
- public void close() throws IOException {
- }
-
-}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataParquetRecordReader.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataParquetRecordReader.java
deleted file mode 100644
index dfe600dbfcff84ddbab9e97f093e13e3c6300f7a..0000000000000000000000000000000000000000
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataParquetRecordReader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved.
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.omnidata.reader;
-
-import com.huawei.boostkit.omnidata.model.datasource.DataSource;
-import com.huawei.boostkit.omnidata.model.datasource.hdfs.HdfsParquetDataSource;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.io.DataCache;
-import org.apache.hadoop.hive.common.io.FileMetadataCache;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
-import org.apache.hadoop.hive.ql.omnidata.operator.predicate.NdpPredicateInfo;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * OmniDataParquetRecordReader
- */
-public class OmniDataParquetRecordReader extends VectorizedParquetRecordReader {
- static final Logger LOG = LoggerFactory.getLogger(OmniDataParquetRecordReader.class);
-
- private OmniDataAdapter dataAdapter;
-
- public OmniDataParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
- DataCache dataCache, Configuration cacheConf, NdpPredicateInfo ndpPredicateInfo) {
- super(oldInputSplit, conf, metadataCache, dataCache, cacheConf);
-
- String path = ((FileSplit) oldInputSplit).getPath().toString();
- long start = ((FileSplit) oldInputSplit).getStart();
- long length = ((FileSplit) oldInputSplit).getLength();
- DataSource dataSource = new HdfsParquetDataSource(path, start, length, false);
-
- this.dataAdapter = new OmniDataAdapter(dataSource, conf, (FileSplit) oldInputSplit, ndpPredicateInfo);
- }
-
- @Override
- public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException {
- if (fileSchema == null) {
- return false;
- } else {
- return dataAdapter.nextBatchFromOmniData(vectorizedRowBatch);
- }
- }
-}
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataReader.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataReader.java
index 26b4ffdd3f9a9375281ba55f7708bf00ba010ef8..395b3c1430b3d1a2e5407bbff7929d25aa258183 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataReader.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/reader/OmniDataReader.java
@@ -19,60 +19,99 @@
package org.apache.hadoop.hive.ql.omnidata.reader;
-import com.huawei.boostkit.omnidata.model.datasource.DataSource;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.omnidata.decode.PageDeserializer;
import org.apache.hadoop.hive.ql.omnidata.operator.predicate.NdpPredicateInfo;
+import org.apache.hadoop.hive.ql.omnidata.physical.NdpVectorizedRowBatchCtx;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Callable;
/**
- * OmniDataReader for agg optimization
+ * OmniDataReader for filter and agg optimization
*
* @since 2022-03-07
*/
-public class OmniDataReader implements Callable> {
+public class OmniDataReader implements Callable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OmniDataReader.class);
- private DataSource dataSource;
+ private NdpPredicateInfo ndpPredicateInfo;
- private Configuration conf;
+ private NdpVectorizedRowBatchCtx ndpVectorizedRowBatchCtx;
- private FileSplit fileSplit;
+ private TypeInfo[] rowColumnTypeInfos;
- private NdpPredicateInfo ndpPredicateInfo;
+ private OmniDataAdapter omniDataAdapter;
- public OmniDataReader(DataSource dataSource, Configuration conf, FileSplit fileSplit,
- NdpPredicateInfo ndpPredicateInfo) {
- this.dataSource = dataSource;
- this.conf = conf;
- this.fileSplit = fileSplit;
+ private Object[] partitionValues = null;
+
+ public OmniDataReader(Configuration conf, FileSplit fileSplit, PageDeserializer deserializer,
+ NdpPredicateInfo ndpPredicateInfo, TypeInfo[] rowColumnTypeInfos) {
this.ndpPredicateInfo = ndpPredicateInfo;
+ this.rowColumnTypeInfos = rowColumnTypeInfos;
+ this.omniDataAdapter = new OmniDataAdapter(conf, fileSplit, ndpPredicateInfo, deserializer);
+ if (ndpPredicateInfo.getNdpVectorizedRowBatchCtx() != null) {
+ this.ndpVectorizedRowBatchCtx = ndpPredicateInfo.getNdpVectorizedRowBatchCtx();
+ int partitionColumnCount = ndpVectorizedRowBatchCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ // set partitionValues
+ NdpVectorizedRowBatchCtx.getPartitionValues(ndpVectorizedRowBatchCtx, conf, fileSplit, partitionValues,
+ this.rowColumnTypeInfos);
+ }
+ }
}
@Override
- public List call() throws UnknownHostException {
- OmniDataAdapter omniDataAdapter = new OmniDataAdapter(dataSource, conf, fileSplit, ndpPredicateInfo);
+ public Queue call() throws UnknownHostException {
Queue pages = omniDataAdapter.getBatchFromOmniData();
- return getVectorizedRowBatch(pages);
+ return ndpPredicateInfo.getIsPushDownAgg()
+ ? createVectorizedRowBatchWithAgg(pages)
+ : createVectorizedRowBatch(pages);
}
- private List getVectorizedRowBatch(Queue pages) {
- List rowBatches = new ArrayList<>();
- if (!pages.isEmpty()) {
+ private Queue createVectorizedRowBatchWithAgg(Queue pages) {
+ Queue rowBatches = new LinkedList<>();
+ while (!pages.isEmpty()) {
ColumnVector[] columnVectors = pages.poll();
int channelCount = columnVectors.length;
int positionCount = columnVectors[0].isNull.length;
VectorizedRowBatch rowBatch = new VectorizedRowBatch(channelCount, positionCount);
+ // agg: copy columnVectors to rowBatch.cols
System.arraycopy(columnVectors, 0, rowBatch.cols, 0, channelCount);
rowBatches.add(rowBatch);
}
return rowBatches;
}
+
+ private Queue createVectorizedRowBatch(Queue pages) {
+ Queue rowBatches = new LinkedList<>();
+ while (!pages.isEmpty()) {
+ ColumnVector[] columnVectors = pages.poll();
+ int channelCount = columnVectors.length;
+ int positionCount = columnVectors[0].isNull.length;
+ // creates a vectorized row batch and the column vectors
+ VectorizedRowBatch rowBatch = ndpVectorizedRowBatchCtx.createVectorizedRowBatch(rowColumnTypeInfos);
+ for (int i = 0; i < channelCount; i++) {
+ int columnId = ndpPredicateInfo.getOutputColumns().get(i);
+ rowBatch.cols[columnId] = columnVectors[i];
+ }
+ rowBatch.size = positionCount;
+ if (partitionValues != null) {
+ ndpVectorizedRowBatchCtx.addPartitionColsToBatch(rowBatch, partitionValues, rowColumnTypeInfos);
+ }
+ rowBatches.add(rowBatch);
+ }
+ return rowBatches;
+ }
+
}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/status/NdpStatusManager.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/status/NdpStatusManager.java
index 5ab221859c5b05bf9a8602f38cb36f33c4740f45..ac999284b938928b99c63916a6fbeb364b265ccd 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/status/NdpStatusManager.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/omnidata/status/NdpStatusManager.java
@@ -26,13 +26,17 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryForever;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.omnidata.config.NdpConf;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -67,44 +71,46 @@ public class NdpStatusManager {
/**
* Get OmniData host resources data from ZooKeeper
*
- * @param ndpConf hive conf
+ * @param conf hive conf
* @return hostname -> ndp status
*/
- public static Map getNdpZookeeperData(NdpConf ndpConf) {
- if (ndpConf.getNdpZookeeperSecurityEnabled()) {
- enableKrb5(ndpConf);
+ public static Map getNdpZookeeperData(Configuration conf) {
+ Map ndpMap = new HashMap<>();
+ String parentPath = OmniDataConf.getOmniDataZookeeperStatusNode(conf);
+ String quorumServer = OmniDataConf.getOmniDataZookeeperQuorumServer(conf);
+ String confPath = OmniDataConf.getOmniDataZookeeperConfPath(conf);
+ if (parentPath == null || quorumServer == null || confPath == null) {
+ LOG.error("OmniData Hive failed to get Zookeeper parameters, "
+ + "please set the following parameters: {} {} {}",
+ OmniDataConf.OMNIDATA_HIVE_ZOOKEEPER_QUORUM_SERVER, OmniDataConf.OMNIDATA_HIVE_ZOOKEEPER_STATUS_NODE,
+ OmniDataConf.OMNIDATA_HIVE_ZOOKEEPER_CONF_PATH);
+ return ndpMap;
+ }
+ if (OmniDataConf.getOmniDataZookeeperSecurityEnabled(conf)) {
+ enableKrb5(confPath);
}
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
- .connectString(ndpConf.getNdpZookeeperQuorumServer())
- .sessionTimeoutMs(ndpConf.getNdpZookeeperSessionTimeout())
- .connectionTimeoutMs(ndpConf.getNdpZookeeperConnectionTimeout())
- .retryPolicy(new RetryForever(ndpConf.getNdpZookeeperRetryInterval()))
+ .connectString(quorumServer)
+ .sessionTimeoutMs(OmniDataConf.getOmniDataZookeeperSessionTimeout(conf))
+ .connectionTimeoutMs(OmniDataConf.getOmniDataZookeeperConnectionTimeout(conf))
+ .retryPolicy(new RetryForever(OmniDataConf.getOmniDataZookeeperRetryInterval(conf)))
.build();
zkClient.start();
- Map ndpMap = new HashMap<>();
- String parentPath = ndpConf.getNdpZookeeperStatusNode();
- try {
- // verify the path from ZooKeeper
- Stat stat = zkClient.checkExists().forPath(parentPath);
- if (stat == null) {
- LOG.error("OmniData Hive failed to get parent node from ZooKeeper");
- return ndpMap;
- }
- } catch (Exception e) {
- LOG.error("OmniData Hive failed to get host resources data from ZooKeeper", e);
+ if (!verifyZookeeperPath(zkClient, parentPath)) {
return ndpMap;
}
InterProcessMutex lock = new InterProcessMutex(zkClient, parentPath);
try {
- if (lock.acquire(ndpConf.getNdpZookeeperRetryInterval(), TimeUnit.MILLISECONDS)) {
+ if (lock.acquire(OmniDataConf.getOmniDataZookeeperRetryInterval(conf), TimeUnit.MILLISECONDS)) {
List childrenPaths = zkClient.getChildren().forPath(parentPath);
ObjectMapper mapper = new ObjectMapper();
for (String path : childrenPaths) {
- if (!path.contains("-lock-")) {
- byte[] data = zkClient.getData().forPath(parentPath + "/" + path);
- NdpStatusInfo statusInfo = mapper.readValue(data, NdpStatusInfo.class);
- ndpMap.put(path, statusInfo);
+ if (path.contains("-lock-")) {
+ continue;
}
+ byte[] data = zkClient.getData().forPath(parentPath + "/" + path);
+ NdpStatusInfo statusInfo = mapper.readValue(data, NdpStatusInfo.class);
+ ndpMap.put(path, statusInfo);
}
}
} catch (Exception e) {
@@ -120,9 +126,24 @@ public class NdpStatusManager {
return ndpMap;
}
- private static void enableKrb5(NdpConf ndpConf) {
- System.setProperty(KRB5_LOGIN_CONF_KEY, ndpConf.getNdpZookeeperConfPath() + "/" + LOGIN_CONFIG_FILE);
- System.setProperty(KRB5_CONF_KEY, ndpConf.getNdpZookeeperConfPath() + "/" + KRB5_CONFIG_FILE);
+ private static boolean verifyZookeeperPath(CuratorFramework zkClient, String path) {
+ try {
+ // verify the path from ZooKeeper
+ Stat stat = zkClient.checkExists().forPath(path);
+ if (stat == null) {
+ LOG.error("OmniData Hive failed to get parent node from ZooKeeper");
+ return false;
+ }
+ } catch (Exception e) {
+ LOG.error("OmniData Hive failed to get host resources data from ZooKeeper", e);
+ return false;
+ }
+ return true;
+ }
+
+ private static void enableKrb5(String confPath) {
+ System.setProperty(KRB5_LOGIN_CONF_KEY, confPath + "/" + LOGIN_CONFIG_FILE);
+ System.setProperty(KRB5_CONF_KEY, confPath + "/" + KRB5_CONFIG_FILE);
System.setProperty(KRB5_SASL_CLIENT_CONF_KEY, "true");
}
@@ -172,4 +193,94 @@ public class NdpStatusManager {
return dataNodeHosts.get(randomIndex);
}
+ /**
+ * If the number of Replication is not specified, the default value is 3
+ *
+ * @param conf hive config
+ * @param fileSplit fileSplit
+ * @return OmniData hosts
+ */
+ public static List getOmniDataHosts(Configuration conf, FileSplit fileSplit) {
+ return getOmniDataHosts(conf, fileSplit, 3);
+ }
+
+ /**
+ * get the OmniData hosts.
+ *
+ * @param conf hive config
+ * @param fileSplit fileSplit
+ * @param ndpReplicationNum hdfs replication
+ * @return OmniData hosts
+ */
+ public static List getOmniDataHosts(Configuration conf, FileSplit fileSplit, int ndpReplicationNum) {
+ List omniDataHosts = new ArrayList<>();
+ List dataNodeHosts = getDataNodeHosts(conf, fileSplit, ndpReplicationNum);
+ // shuffle
+ Collections.shuffle(dataNodeHosts);
+ dataNodeHosts.forEach(dn -> {
+ // possibly null
+ if (conf.get(dn) != null) {
+ omniDataHosts.add(conf.get(dn));
+ }
+ });
+ // add a random available datanode
+ String randomDataNodeHost = NdpStatusManager.getRandomAvailableDataNodeHost(conf, dataNodeHosts);
+ if (randomDataNodeHost.length() > 0 && conf.get(randomDataNodeHost) != null) {
+ omniDataHosts.add(conf.get(randomDataNodeHost));
+ }
+ return omniDataHosts;
+ }
+
+ /**
+ * get the DataNode hosts.
+ *
+ * @param conf hive config
+ * @param fileSplit fileSplit
+ * @param ndpReplicationNum hdfs replication
+ * @return DataNode hosts
+ */
+ public static List getDataNodeHosts(Configuration conf, FileSplit fileSplit, int ndpReplicationNum) {
+ List hosts = new ArrayList<>();
+ try {
+ BlockLocation[] blockLocations = fileSplit.getPath()
+ .getFileSystem(conf)
+ .getFileBlockLocations(fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength());
+ for (BlockLocation block : blockLocations) {
+ addHostsByBlock(conf, ndpReplicationNum, block, hosts);
+ }
+ } catch (IOException e) {
+ LOG.error("NdpStatusManager getDataNodeHosts() failed", e);
+ }
+ return hosts;
+ }
+
+ /**
+ * Add datanode by block
+ *
+ * @param conf hive config
+ * @param ndpReplicationNum hdfs replication
+ * @param block hdfs block
+ * @param hosts DataNode hosts
+ * @throws IOException block.getHosts()
+ */
+ public static void addHostsByBlock(Configuration conf, int ndpReplicationNum, BlockLocation block,
+ List hosts) throws IOException {
+ for (String host : block.getHosts()) {
+ if ("localhost".equals(host)) {
+ List dataNodeHosts = new ArrayList<>(Arrays.asList(
+ conf.get(NdpStatusManager.NDP_DATANODE_HOSTNAMES)
+ .split(NdpStatusManager.NDP_DATANODE_HOSTNAME_SEPARATOR)));
+ if (dataNodeHosts.size() > ndpReplicationNum) {
+ hosts.addAll(dataNodeHosts.subList(0, ndpReplicationNum));
+ } else {
+ hosts.addAll(dataNodeHosts);
+ }
+ } else {
+ hosts.add(host);
+ }
+ if (ndpReplicationNum == hosts.size()) {
+ return;
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index b171fbe7ad0062d2c6fb8c9085be851248c8e9e9..52e7003c818a74c0f05b353f629f9eefad043d0f 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcCtx.ConstantPropagateOption;
import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
@@ -711,9 +712,14 @@ public class TezCompiler extends TaskCompiler {
new AnnotateRunTimeStatsOptimizer().resolve(physicalCtx);
}
- // tez push down entrance
- NdpPlanResolver ndpPlanResolver = new NdpPlanResolver();
- ndpPlanResolver.resolve(physicalCtx);
+ if (OmniDataConf.getOmniDataEnabled(conf)) {
+ // Tez OmniData entrance
+ NdpPlanResolver ndpPlanResolver = new NdpPlanResolver();
+ ndpPlanResolver.resolve(physicalCtx);
+ } else {
+ LOG.debug("Skipping OmniData pushdown");
+ }
+
perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "optimizeTaskPlan");
}
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 062ece416435cd1b2f519a78fa26de6418cd025c..994305f26d66d6fad66129d42cec20cf0baf5e4c 100644
--- a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -75,6 +75,10 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
private String tmpStatsDir;
private String ndpPredicateInfoStr;
+ private TypeInfo[] rowColumnTypeInfos;
+ private boolean isPushDownFilter = false;
+ private boolean isPushDownAgg = false;
+ private double omniDataSelectivity = 1.0d;
private ExprNodeGenericFuncDesc filterExpr;
private Serializable filterObject;
private String serializedFilterExpr;
@@ -93,16 +97,13 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
private transient List referencedColumns;
public static final String FILTER_EXPR_CONF_STR =
- "hive.io.filter.expr.serialized";
-
- public static final String NDP_PREDICATE_EXPR_CONF_STR =
- "hive.io.ndp.predicate.expr.serialized";
+ "hive.io.filter.expr.serialized";
public static final String FILTER_TEXT_CONF_STR =
- "hive.io.filter.text";
+ "hive.io.filter.text";
public static final String FILTER_OBJECT_CONF_STR =
- "hive.io.filter.object";
+ "hive.io.filter.object";
// input file name (big) to bucket number
private Map bucketFileNameMapping;
@@ -240,6 +241,38 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
this.ndpPredicateInfoStr = ndpPredicateInfoStr;
}
+ public TypeInfo[] getRowColumnTypeInfos() {
+ return rowColumnTypeInfos;
+ }
+
+ public void setRowColumnTypeInfos(TypeInfo[] rowColumnTypeInfos) {
+ this.rowColumnTypeInfos = rowColumnTypeInfos;
+ }
+
+ public boolean isPushDownFilter() {
+ return isPushDownFilter;
+ }
+
+ public void setPushDownFilter(boolean isPushDownFilter) {
+ this.isPushDownFilter = isPushDownFilter;
+ }
+
+ public boolean isPushDownAgg() {
+ return isPushDownAgg;
+ }
+
+ public void setPushDownAgg(boolean isPushDownAgg) {
+ this.isPushDownAgg = isPushDownAgg;
+ }
+
+ public double getOmniDataSelectivity() {
+ return omniDataSelectivity;
+ }
+
+ public void setOmniDataSelectivity(double omniDataSelectivity) {
+ this.omniDataSelectivity = omniDataSelectivity;
+ }
+
// @Signature // XXX
public ExprNodeGenericFuncDesc getFilterExpr() {
return filterExpr;
@@ -492,7 +525,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
private final VectorTableScanDesc vectorTableScanDesc;
public TableScanOperatorExplainVectorization(TableScanDesc tableScanDesc,
- VectorTableScanDesc vectorTableScanDesc) {
+ VectorTableScanDesc vectorTableScanDesc) {
// Native vectorization supported.
super(vectorTableScanDesc, true);
this.tableScanDesc = tableScanDesc;
@@ -513,13 +546,13 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
}
DataTypePhysicalVariation[] projectedColumnDataTypePhysicalVariations =
- vectorTableScanDesc.getProjectedColumnDataTypePhysicalVariations();
+ vectorTableScanDesc.getProjectedColumnDataTypePhysicalVariations();
return BaseExplainVectorization.getColumnAndTypes(
- projectionColumns,
- projectedColumnNames,
- projectedColumnTypeInfos,
- projectedColumnDataTypePhysicalVariations).toString();
+ projectionColumns,
+ projectedColumnNames,
+ projectedColumnTypeInfos,
+ projectedColumnDataTypePhysicalVariations).toString();
}
}
@@ -549,9 +582,9 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
if (getClass().getName().equals(other.getClass().getName())) {
TableScanDesc otherDesc = (TableScanDesc) other;
return Objects.equals(getQualifiedTable(), otherDesc.getQualifiedTable()) &&
- ExprNodeDescUtils.isSame(getFilterExpr(), otherDesc.getFilterExpr()) &&
- getRowLimit() == otherDesc.getRowLimit() &&
- isGatherStats() == otherDesc.isGatherStats();
+ ExprNodeDescUtils.isSame(getFilterExpr(), otherDesc.getFilterExpr()) &&
+ getRowLimit() == otherDesc.getRowLimit() &&
+ isGatherStats() == otherDesc.isGatherStats();
}
return false;
}
@@ -559,4 +592,4 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
public boolean isFullAcidTable() {
return isTranscationalTable() && !getAcidOperationalProperties().isInsertOnly();
}
-}
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
new file mode 100644
index 0000000000000000000000000000000000000000..6a2a0f075f80f6d1bb3d1c871194657945107a71
--- /dev/null
+++ b/omnidata/omnidata-hive-connector/connector/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.omnidata.config.OmniDataConf;
+import org.apache.hadoop.hive.ql.omnidata.physical.NdpStatisticsUtils;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+public abstract class TezSplitGrouper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TezSplitGrouper.class);
+
+ /**
+ * Specify the number of splits desired to be created
+ */
+ public static final String TEZ_GROUPING_SPLIT_COUNT = "tez.grouping.split-count";
+ /**
+ * Limit the number of splits in a group by the total length of the splits in the group
+ */
+ public static final String TEZ_GROUPING_SPLIT_BY_LENGTH = "tez.grouping.by-length";
+ public static final boolean TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT = true;
+ /**
+ * Limit the number of splits in a group by the number of splits in the group
+ */
+ public static final String TEZ_GROUPING_SPLIT_BY_COUNT = "tez.grouping.by-count";
+ public static final boolean TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT = false;
+
+ /**
+ * The multiplier for available queue capacity when determining number of
+ * tasks for a Vertex. 1.7 with 100% queue available implies generating a
+ * number of tasks roughly equal to 170% of the available containers on the
+ * queue. This enables multiple waves of mappers where the final wave is slightly smaller
+ * than the remaining waves. The gap helps overlap the final wave with any slower tasks
+ * from previous waves and tries to hide the delays from the slower tasks. Good values for
+ * this are 1.7, 2.7, 3.7 etc. Increase the number of waves to make the tasks smaller or
+ * shorter.
+ */
+ public static final String TEZ_GROUPING_SPLIT_WAVES = "tez.grouping.split-waves";
+ public static final float TEZ_GROUPING_SPLIT_WAVES_DEFAULT = 1.7f;
+
+ /**
+ * Upper bound on the size (in bytes) of a grouped split, to avoid generating excessively large splits.
+ */
+ public static final String TEZ_GROUPING_SPLIT_MAX_SIZE = "tez.grouping.max-size";
+ public static final long TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT = 1024*1024*1024L;
+
+ /**
+ * Lower bound on the size (in bytes) of a grouped split, to avoid generating too many small splits.
+ */
+ public static final String TEZ_GROUPING_SPLIT_MIN_SIZE = "tez.grouping.min-size";
+ public static final long TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT = 50*1024*1024L;
+
+ /**
+ * This factor is used to decrease the per group desired (length and count) limits for groups
+ * created by combining splits within a rack. Since reading this split involves reading data intra
+ * rack, the group is made smaller to cover up for the increased latencies in doing intra rack
+ * reads. The value should be a fraction <= 1.
+ */
+ public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION =
+ "tez.grouping.rack-split-reduction";
+ public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = 0.75f;
+
+ /**
+ * Repeated invocations of grouping on the same splits with the same parameters will produce the
+ * same groups. This may help in cache reuse but may cause hot-spotting on nodes when there are a
+ * large number of jobs reading the same hot data. True by default.
+ */
+ public static final String TEZ_GROUPING_REPEATABLE = "tez.grouping.repeatable";
+ public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true;
+
+ /**
+ * Generate node local splits only. This prevents fallback to rack locality etc, and overrides
+ * the target size for small splits.
+ */
+ public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
+ public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;
+
+
+ static class LocationHolder {
+ List splits;
+ int headIndex = 0;
+ LocationHolder(int capacity) {
+ splits = new ArrayList(capacity);
+ }
+ boolean isEmpty() {
+ return (headIndex == splits.size());
+ }
+ SplitContainer getUnprocessedHeadSplit() {
+ while (!isEmpty()) {
+ SplitContainer holder = splits.get(headIndex);
+ if (!holder.isProcessed()) {
+ return holder;
+ }
+ incrementHeadIndex();
+ }
+ return null;
+ }
+ void incrementHeadIndex() {
+ headIndex++;
+ }
+ }
+
+ private static final SplitSizeEstimatorWrapper DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimatorWrapper();
+
+ static final class DefaultSplitSizeEstimatorWrapper implements SplitSizeEstimatorWrapper {
+
+ @Override
+ public long getEstimatedSize(SplitContainer splitContainer) throws IOException,
+ InterruptedException {
+ return splitContainer.getLength();
+ }
+ }
+
+ private static final SplitLocationProviderWrapper DEFAULT_SPLIT_LOCATION_PROVIDER = new DefaultSplitLocationProvider();
+
+ static final class DefaultSplitLocationProvider implements SplitLocationProviderWrapper {
+
+ @Override
+ public String[] getPreferredLocations(SplitContainer splitContainer) throws IOException,
+ InterruptedException {
+ return splitContainer.getPreferredLocations();
+ }
+ }
+
+ Map createLocationsMap(Configuration conf) {
+ if (conf.getBoolean(TEZ_GROUPING_REPEATABLE,
+ TEZ_GROUPING_REPEATABLE_DEFAULT)) {
+ return new TreeMap();
+ }
+ return new HashMap();
+ }
+
+
+
+ public List getGroupedSplits(Configuration conf,
+ List originalSplits,
+ int desiredNumSplits,
+ String wrappedInputFormatName,
+ SplitSizeEstimatorWrapper estimator,
+ SplitLocationProviderWrapper locationProvider) throws
+ IOException, InterruptedException {
+ LOG.info("Grouping splits in Tez");
+ Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
+
+ int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0);
+ if (configNumSplits > 0) {
+ // always use config override if specified
+ desiredNumSplits = configNumSplits;
+ LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
+ }
+
+ if (estimator == null) {
+ estimator = DEFAULT_SPLIT_ESTIMATOR;
+ }
+ if (locationProvider == null) {
+ locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER;
+ }
+
+ List groupedSplits = null;
+ String emptyLocation = "EmptyLocation";
+ String localhost = "localhost";
+ String[] emptyLocations = {emptyLocation};
+ groupedSplits = new ArrayList(desiredNumSplits);
+
+ boolean allSplitsHaveLocalhost = true;
+
+ long totalLength = 0;
+ Map distinctLocations = createLocationsMap(conf);
+ // go through splits and add them to locations
+ for (SplitContainer split : originalSplits) {
+ totalLength += estimator.getEstimatedSize(split);
+ String[] locations = locationProvider.getPreferredLocations(split);
+ if (locations == null || locations.length == 0) {
+ locations = emptyLocations;
+ allSplitsHaveLocalhost = false;
+ }
+ for (String location : locations ) {
+ if (location == null) {
+ location = emptyLocation;
+ allSplitsHaveLocalhost = false;
+ }
+ if (!location.equalsIgnoreCase(localhost)) {
+ allSplitsHaveLocalhost = false;
+ }
+ distinctLocations.put(location, null);
+ }
+ }
+
+ if (! (configNumSplits > 0 ||
+ originalSplits.size() == 0)) {
+ // numSplits has not been overridden by config
+ // numSplits has been set at runtime
+ // there are splits generated
+ // desired splits is less than number of splits generated
+ // Do sanity checks
+
+ int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();
+ long lengthPerGroup = totalLength/splitCount;
+
+ long maxLengthPerGroup = conf.getLong(
+ TEZ_GROUPING_SPLIT_MAX_SIZE,
+ TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
+ long minLengthPerGroup = conf.getLong(
+ TEZ_GROUPING_SPLIT_MIN_SIZE,
+ TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
+ if (maxLengthPerGroup < minLengthPerGroup ||
+ minLengthPerGroup <=0) {
+ throw new TezUncheckedException(
+ "Invalid max/min group lengths. Required min>0, max>=min. " +
+ " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
+ }
+ if (lengthPerGroup > maxLengthPerGroup) {
+ // splits too big to work. Need to override with max size.
+ int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
+ LOG.info("Desired splits: " + desiredNumSplits + " too small. " +
+ " Desired splitLength: " + lengthPerGroup +
+ " Max splitLength: " + maxLengthPerGroup +
+ " New desired splits: " + newDesiredNumSplits +
+ " Total length: " + totalLength +
+ " Original splits: " + originalSplits.size());
+
+ desiredNumSplits = newDesiredNumSplits;
+ } else if (lengthPerGroup < minLengthPerGroup) {
+ // splits too small to work. Need to override with size.
+ int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
+ /**
+ * This is a workaround for systems like S3 that pass the same
+ * fake hostname for all splits.
+ */
+ if (!allSplitsHaveLocalhost) {
+ desiredNumSplits = newDesiredNumSplits;
+ }
+
+ LOG.info("Desired splits: " + desiredNumSplits + " too large. " +
+ " Desired splitLength: " + lengthPerGroup +
+ " Min splitLength: " + minLengthPerGroup +
+ " New desired splits: " + newDesiredNumSplits +
+ " Final desired splits: " + desiredNumSplits +
+ " All splits have localhost: " + allSplitsHaveLocalhost +
+ " Total length: " + totalLength +
+ " Original splits: " + originalSplits.size());
+ }
+ }
+ List tmpGroupedSplits = null;
+ if (desiredNumSplits == 0 ||
+ originalSplits.size() == 0 ||
+ desiredNumSplits >= originalSplits.size()) {
+ // nothing set. so return all the splits as is
+ LOG.info("Using original number of splits: " + originalSplits.size() +
+ " desired splits: " + desiredNumSplits);
+ tmpGroupedSplits = new ArrayList(originalSplits.size());
+ for (SplitContainer split : originalSplits) {
+ GroupedSplitContainer newSplit =
+ new GroupedSplitContainer(1, wrappedInputFormatName, cleanupLocations(locationProvider.getPreferredLocations(split)),
+ null);
+ newSplit.addSplit(split);
+ tmpGroupedSplits.add(newSplit);
+ }
+ }
+
+ long lengthPerGroup = totalLength/desiredNumSplits;
+
+ // OmniData optimized
+ if (configNumSplits <= 0 && OmniDataConf.getOmniDataExistsTablePushDown(conf)) {
+ double newLengthPerGroup = NdpStatisticsUtils.optimizeLengthPerGroup(conf) * (double) lengthPerGroup;
+ lengthPerGroup = (long) newLengthPerGroup;
+ }
+
+ int numNodeLocations = distinctLocations.size();
+ int numSplitsPerLocation = originalSplits.size()/numNodeLocations;
+ int numSplitsInGroup = originalSplits.size()/desiredNumSplits;
+
+ // allocation loop here so that we have a good initial size for the lists
+ for (String location : distinctLocations.keySet()) {
+ distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
+ }
+
+ Set locSet = new HashSet();
+ for (SplitContainer split : originalSplits) {
+ locSet.clear();
+ String[] locations = locationProvider.getPreferredLocations(split);
+ if (locations == null || locations.length == 0) {
+ locations = emptyLocations;
+ }
+ for (String location : locations) {
+ if (location == null) {
+ location = emptyLocation;
+ }
+ locSet.add(location);
+ }
+ for (String location : locSet) {
+ LocationHolder holder = distinctLocations.get(location);
+ holder.splits.add(split);
+ }
+ }
+
+ boolean groupByLength = conf.getBoolean(
+ TEZ_GROUPING_SPLIT_BY_LENGTH,
+ TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
+ boolean groupByCount = conf.getBoolean(
+ TEZ_GROUPING_SPLIT_BY_COUNT,
+ TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
+ boolean nodeLocalOnly = conf.getBoolean(
+ TEZ_GROUPING_NODE_LOCAL_ONLY,
+ TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT);
+ if (!(groupByLength || groupByCount)) {
+ throw new TezUncheckedException(
+ "None of the grouping parameters are true: "
+ + TEZ_GROUPING_SPLIT_BY_LENGTH + ", "
+ + TEZ_GROUPING_SPLIT_BY_COUNT);
+ }
+ LOG.info("Desired numSplits: " + desiredNumSplits +
+ " lengthPerGroup: " + lengthPerGroup +
+ " numLocations: " + numNodeLocations +
+ " numSplitsPerLocation: " + numSplitsPerLocation +
+ " numSplitsInGroup: " + numSplitsInGroup +
+ " totalLength: " + totalLength +
+ " numOriginalSplits: " + originalSplits.size() +
+ " . Grouping by length: " + groupByLength +
+ " count: " + groupByCount +
+ " nodeLocalOnly: " + nodeLocalOnly);
+
+ // go through locations and group splits
+ int splitsProcessed = 0;
+ List group = new ArrayList(numSplitsInGroup);
+ Set groupLocationSet = new HashSet(10);
+ boolean allowSmallGroups = false;
+ boolean doingRackLocal = false;
+ int iterations = 0;
+ while (splitsProcessed < originalSplits.size()) {
+ iterations++;
+ int numFullGroupsCreated = 0;
+ for (Map.Entry entry : distinctLocations.entrySet()) {
+ group.clear();
+ groupLocationSet.clear();
+ String location = entry.getKey();
+ LocationHolder holder = entry.getValue();
+ SplitContainer splitContainer = holder.getUnprocessedHeadSplit();
+ if (splitContainer == null) {
+ // all splits on node processed
+ continue;
+ }
+ int oldHeadIndex = holder.headIndex;
+ long groupLength = 0;
+ int groupNumSplits = 0;
+ do {
+ group.add(splitContainer);
+ groupLength += estimator.getEstimatedSize(splitContainer);
+ groupNumSplits++;
+ holder.incrementHeadIndex();
+ splitContainer = holder.getUnprocessedHeadSplit();
+ } while(splitContainer != null
+ && (!groupByLength ||
+ (groupLength + estimator.getEstimatedSize(splitContainer) <= lengthPerGroup))
+ && (!groupByCount ||
+ (groupNumSplits + 1 <= numSplitsInGroup)));
+
+ if (holder.isEmpty()
+ && !allowSmallGroups
+ && (!groupByLength || groupLength < lengthPerGroup/2)
+ && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
+ // group too small, reset it
+ holder.headIndex = oldHeadIndex;
+ continue;
+ }
+
+ numFullGroupsCreated++;
+
+ // One split group created
+ String[] groupLocation = {location};
+ if (location == emptyLocation) {
+ groupLocation = null;
+ } else if (doingRackLocal) {
+ for (SplitContainer splitH : group) {
+ String[] locations = locationProvider.getPreferredLocations(splitH);
+ if (locations != null) {
+ for (String loc : locations) {
+ if (loc != null) {
+ groupLocationSet.add(loc);
+ }
+ }
+ }
+ }
+ groupLocation = groupLocationSet.toArray(groupLocation);
+ }
+ GroupedSplitContainer groupedSplit =
+ new GroupedSplitContainer(group.size(), wrappedInputFormatName,
+ groupLocation,
+ // pass rack local hint directly to AM
+ ((doingRackLocal && location != emptyLocation)?location:null));
+ for (SplitContainer groupedSplitContainer : group) {
+ groupedSplit.addSplit(groupedSplitContainer);
+ Preconditions.checkState(groupedSplitContainer.isProcessed() == false,
+ "Duplicates in grouping at location: " + location);
+ groupedSplitContainer.setIsProcessed(true);
+ splitsProcessed++;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Grouped " + group.size()
+ + " length: " + groupedSplit.getLength()
+ + " split at: " + location);
+ }
+ groupedSplits.add(groupedSplit);
+ }
+
+ if (!doingRackLocal && numFullGroupsCreated < 1) {
+ // no node could create a regular node-local group.
+
+ // Allow small groups if that is configured.
+ if (nodeLocalOnly && !allowSmallGroups) {
+ LOG.info(
+ "Allowing small groups early after attempting to create full groups at iteration: {}, groupsCreatedSoFar={}",
+ iterations, groupedSplits.size());
+ allowSmallGroups = true;
+ continue;
+ }
+
+ // else go rack-local
+ doingRackLocal = true;
+ // re-create locations
+ int numRemainingSplits = originalSplits.size() - splitsProcessed;
+ Set remainingSplits = new HashSet(numRemainingSplits);
+ // gather remaining splits.
+ for (Map.Entry entry : distinctLocations.entrySet()) {
+ LocationHolder locHolder = entry.getValue();
+ while (!locHolder.isEmpty()) {
+ SplitContainer splitHolder = locHolder.getUnprocessedHeadSplit();
+ if (splitHolder != null) {
+ remainingSplits.add(splitHolder);
+ locHolder.incrementHeadIndex();
+ }
+ }
+ }
+ if (remainingSplits.size() != numRemainingSplits) {
+ throw new TezUncheckedException("Expected: " + numRemainingSplits
+ + " got: " + remainingSplits.size());
+ }
+
+ // doing all this now instead of up front because the number of remaining
+ // splits is expected to be much smaller
+ RackResolver.init(conf);
+ Map