From e67255c310c7408baf5f78d78af7972364599836 Mon Sep 17 00:00:00 2001 From: suixiaoyu Date: Thu, 13 Jun 2024 11:33:10 +0800 Subject: [PATCH] fix code check defects --- .../com/huawei/boostkit/hive/JoinUtils.java | 36 +- .../hive/OmniExecuteWithHookContext.java | 420 ++++++++++-------- .../huawei/boostkit/hive/OmniFilterDesc.java | 21 +- .../boostkit/hive/OmniFilterOperator.java | 41 +- .../huawei/boostkit/hive/OmniGroupByDesc.java | 4 +- .../boostkit/hive/OmniGroupByOperator.java | 101 +++-- .../boostkit/hive/OmniHiveOperator.java | 2 - .../boostkit/hive/OmniJoinOperator.java | 34 +- .../hive/reader/OmniOrcRecordReader.java | 20 +- .../hive/reader/OmniParquetInputFormat.java | 1 - .../hive/reader/OmniParquetRecordReader.java | 90 ++-- .../OmniVectorizedParquetRecordReader.java | 22 +- .../reader/OrcColumnarBatchScanReader.java | 27 +- .../ParquetColumnarBatchScanReader.java | 21 +- .../hive/reader/VecBatchWrapperSerde.java | 13 +- .../hive/shuffle/FixedWidthColumnSerDe.java | 37 +- .../shuffle/FixedWidthColumnSortSerDe.java | 33 +- .../hive/shuffle/OmniVecBatchOrderSerDe.java | 5 +- .../hive/shuffle/OmniVecBatchSerDe.java | 81 ++-- .../boostkit/hive/shuffle/SerDeUtils.java | 42 +- .../shuffle/VariableWidthColumnAscSerDe.java | 43 +- .../shuffle/VariableWidthColumnDescSerDe.java | 44 +- .../shuffle/VariableWidthColumnSerDe.java | 33 +- 23 files changed, 675 insertions(+), 496 deletions(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/JoinUtils.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/JoinUtils.java index d83c1ba9f..c7bdd0041 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/JoinUtils.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/JoinUtils.java @@ -25,11 +25,11 @@ import com.huawei.boostkit.hive.expression.ExpressionUtils; import com.huawei.boostkit.hive.expression.TypeUtils; import nova.hetu.omniruntime.type.DataType; + import org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeGenericFuncEvaluator; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -48,10 +48,11 @@ public class JoinUtils { return getExprNodeColumnEvaluator(joinKeys, false); } - public static List getExprNodeColumnEvaluator(List joinKeys, boolean includeConstant) { + public static List getExprNodeColumnEvaluator(List joinKeys, + boolean isIncludedConstant) { List exprNodeColumnEvaluators = new ArrayList<>(); for (ExprNodeEvaluator joinKey : joinKeys) { - if (includeConstant && joinKey instanceof ExprNodeConstantEvaluator) { + if (isIncludedConstant && joinKey instanceof ExprNodeConstantEvaluator) { exprNodeColumnEvaluators.add(joinKey); } dealChildren(joinKey, exprNodeColumnEvaluators); @@ -70,34 +71,40 @@ public class JoinUtils { Arrays.stream(joinKey.getChildren()).forEach(child -> dealChildren(child, exprNodeColumnEvaluators)); } - public static String[] getExprFromExprNode(List nodes, Map keyColNameToId, ObjectInspector inspector, boolean isBuildTable) { + public static String[] getExprFromExprNode(List nodes, Map keyColNameToId, + ObjectInspector inspector, boolean isBuildTable) { List expressions = new ArrayList<>(); for (ExprNodeEvaluator node : nodes) { if (node instanceof ExprNodeGenericFuncEvaluator) { if (isBuildTable) { expressions.add(TypeUtils.buildExpression(node.getExpr().getTypeInfo(), nodes.indexOf(node))); } else { - expressions.add(ExpressionUtils.buildSimplify((ExprNodeGenericFuncDesc) node.getExpr(), inspector).toString()); + expressions.add(ExpressionUtils.buildSimplify((ExprNodeGenericFuncDesc) node.getExpr(), inspector) + .toString()); } continue; - } else if (node instanceof ExprNodeConstantEvaluator) { + } + if (node instanceof ExprNodeConstantEvaluator) { expressions.add(ExpressionUtils.createLiteralNode(node.getExpr()).toString()); continue; } PrimitiveTypeInfo keyType = (PrimitiveTypeInfo) node.getExpr().getTypeInfo(); PrimitiveTypeInfo inputType = ((AbstractPrimitiveObjectInspector) node.getOutputOI()).getTypeInfo(); - if (!keyType.getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.DECIMAL) || keyType.equals(inputType)) { - expressions.add(TypeUtils.buildExpression(((AbstractPrimitiveObjectInspector) node.getOutputOI()).getTypeInfo(), - isBuildTable? nodes.indexOf(node): keyColNameToId.get(((ExprNodeColumnEvaluator) node).getExpr().getColumn()))); + if (!keyType.getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.DECIMAL) + || keyType.equals(inputType)) { + expressions.add(TypeUtils.buildExpression(((AbstractPrimitiveObjectInspector) node.getOutputOI()) + .getTypeInfo(), isBuildTable ? nodes.indexOf(node) + : keyColNameToId.get(((ExprNodeColumnEvaluator) node).getExpr().getColumn()))); continue; } int returnType = TypeUtils.convertHiveTypeToOmniType(keyType); CastFunctionExpression cast = new CastFunctionExpression(returnType, TypeUtils.getCharWidth(node.getExpr()), ((DecimalTypeInfo) keyType).getPrecision(), ((DecimalTypeInfo) keyType).getScale()); - int fieldID = ((StructObjectInspector) inspector).getStructFieldRef(node.getExpr().getExprString()).getFieldID(); + int fieldID = ((StructObjectInspector) inspector).getStructFieldRef(node.getExpr().getExprString()) + .getFieldID(); int omniType = TypeUtils.convertHiveTypeToOmniType(inputType); - BaseExpression decimalReference = new DecimalReference(fieldID, omniType, ((DecimalTypeInfo) inputType).getPrecision(), - ((DecimalTypeInfo) inputType).getScale()); + BaseExpression decimalReference = new DecimalReference(fieldID, omniType, ((DecimalTypeInfo) inputType) + .getPrecision(), ((DecimalTypeInfo) inputType).getScale()); cast.add(decimalReference); expressions.add(cast.toString()); } @@ -105,8 +112,7 @@ public class JoinUtils { } public static DataType[] getTypeFromInspectors(List inspectors) { - return inspectors.stream().map( - inspector -> TypeUtils.buildInputDataType(((AbstractPrimitiveObjectInspector) inspector).getTypeInfo())) - .toArray(DataType[]::new); + return inspectors.stream().map(inspector -> TypeUtils.buildInputDataType( + ((AbstractPrimitiveObjectInspector) inspector).getTypeInfo())).toArray(DataType[]::new); } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java index e3cfdcf52..a574a3712 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java @@ -18,10 +18,10 @@ package com.huawei.boostkit.hive; -import static com.huawei.boostkit.hive.expression.TypeUtils.checkUnsupportedTimestamp; import static com.huawei.boostkit.hive.expression.TypeUtils.checkOmniJsonWhiteList; import static com.huawei.boostkit.hive.expression.TypeUtils.checkUnsupportedArithmetic; import static com.huawei.boostkit.hive.expression.TypeUtils.checkUnsupportedCast; +import static com.huawei.boostkit.hive.expression.TypeUtils.checkUnsupportedTimestamp; import static com.huawei.boostkit.hive.expression.TypeUtils.convertHiveTypeToOmniType; import static com.huawei.boostkit.hive.expression.TypeUtils.isValidConversion; import static com.huawei.boostkit.hive.expression.TypeUtils.isValidFilterExpression; @@ -53,7 +53,6 @@ import com.huawei.boostkit.hive.shuffle.OmniVecBatchSerDe; import nova.hetu.omniruntime.constants.FunctionType; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.QueryPlan; @@ -75,7 +74,6 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; - import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -99,10 +97,10 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; +import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; @@ -119,7 +117,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -133,15 +130,16 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; public class OmniExecuteWithHookContext implements ExecuteWithHookContext { - private static final Logger LOG = LoggerFactory.getLogger(OmniExecuteWithHookContext.class.getName()); public static final Set OMNI_OPERATOR = new HashSet<>(Arrays.asList(OperatorType.JOIN, OperatorType.MAPJOIN, OperatorType.MERGEJOIN, OperatorType.GROUPBY, OperatorType.SELECT, OperatorType.FILTER, OperatorType.PTF, OperatorType.TABLESCAN, OperatorType.REDUCESINK)); public static final Set SUPPORTED_JOIN = new HashSet<>(Arrays.asList(JoinDesc.INNER_JOIN, JoinDesc.LEFT_OUTER_JOIN, JoinDesc.FULL_OUTER_JOIN, JoinDesc.LEFT_SEMI_JOIN)); public static int stringLength = 1024; - private static final Set SUPPORTED_TYPE = new HashSet<>(Arrays.asList(BOOLEAN, - SHORT, INT, LONG, DOUBLE, STRING, DATE, DECIMAL, VARCHAR, CHAR, VOID)); + + private static final Logger LOG = LoggerFactory.getLogger(OmniExecuteWithHookContext.class.getName()); + private static final Set SUPPORTED_TYPE = new HashSet<> + (Arrays.asList(BOOLEAN, SHORT, INT, LONG, DOUBLE, STRING, DATE, DECIMAL, VARCHAR, CHAR, VOID)); private static final int DECIMAL64_MAX_PRECISION = 18; @@ -181,7 +179,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } private boolean isSupportSqlType(QueryPlan queryPlan) { - if (queryPlan.getQueryProperties() != null && queryPlan.getQueryProperties().isQuery() && queryPlan.getOperationName().equals("QUERY")) { + if (queryPlan.getQueryProperties() != null && queryPlan.getQueryProperties().isQuery() + && queryPlan.getOperationName().equals("QUERY")) { return true; } return queryPlan.getRootTasks().get(0) instanceof ExplainTask; @@ -203,8 +202,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (engine.equals("mr")) { throw new RuntimeException("can't support mr engine"); } - boolean cboEnable = HiveConf.getBoolVar(hookContext.getConf(), HiveConf.ConfVars.HIVE_CBO_ENABLED); - if (!cboEnable) { + boolean isCboEnabled = HiveConf.getBoolVar(hookContext.getConf(), HiveConf.ConfVars.HIVE_CBO_ENABLED); + if (!isCboEnabled) { return; } setTezWork(queryPlan); @@ -243,7 +242,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { continue; } for (Operator op : work.getAllOperators()) { - if (op.getType().equals(OperatorType.MAPJOIN) && !isReplaceable(op, true)) { + if (op.getType().equals(OperatorType.MAPJOIN) && !isReplaceable(op, true) + && op.getConf() instanceof MapJoinDesc) { MapJoinDesc conf = (MapJoinDesc) op.getConf(); Map parentToInput = conf.getParentToInput(); parentOfMapJoinOpCannotReplace.addAll(parentToInput.values()); @@ -267,7 +267,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { dealUnionWork(work); } } - reduceSinkReplaceableWorkName = reduceSinkReplaceableWork.stream().map(BaseWork::getName).collect(Collectors.toSet()); + reduceSinkReplaceableWorkName = reduceSinkReplaceableWork.stream().map(BaseWork::getName) + .collect(Collectors.toSet()); } private void dealUnionWork(BaseWork work) { @@ -289,38 +290,39 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (!omniHiveConf.isEnableOperator(OperatorType.REDUCESINK)) { return false; } - boolean reduceSinkCanReplace = false; if (parentOfMapJoinOpCannotReplace.contains(work.getName()) || !reduceSinkOperatorReplaceable(work)) { return false; } for (Operator op : work.getAllOperators()) { if (stringLength > 1024 && op.getType() == OperatorType.REDUCESINK && op.getColumnExprMap() != null) { - List typeInfos = op.getColumnExprMap().values().stream(). - map(value -> value.getTypeInfo().getTypeName()).collect(Collectors.toList()); + List typeInfos = op.getColumnExprMap().values().stream() + .map(value -> value.getTypeInfo().getTypeName()).collect(Collectors.toList()); if (typeInfos.contains("string")) { return false; } } } + boolean isReduceSinkReplaceable = false; for (BaseWork child : tezWork.getChildren(work)) { TezEdgeProperty.EdgeType edgeType = tezWork.getEdgeType(work, child); if (edgeType == TezEdgeProperty.EdgeType.CONTAINS) { continue; } - reduceSinkCanReplace = replaceableForChild(work, child); - if (!reduceSinkCanReplace) { + isReduceSinkReplaceable = replaceableForChild(work, child); + if (!isReduceSinkReplaceable) { return false; } } // deal with Union work for (BaseWork parent : tezWork.getParents(work)) { TezEdgeProperty.EdgeType edgeType = tezWork.getEdgeType(parent, work); - if (edgeType == TezEdgeProperty.EdgeType.CONTAINS && !inputUnReplaceableWork.contains(parent) && getReduceSinkReplaceable(parent)) { - reduceSinkCanReplace = true; + if (edgeType == TezEdgeProperty.EdgeType.CONTAINS && !inputUnReplaceableWork.contains(parent) + && getReduceSinkReplaceable(parent)) { + isReduceSinkReplaceable = true; reduceSinkReplaceableWork.add(parent); } } - return reduceSinkCanReplace; + return isReduceSinkReplaceable; } private boolean replaceableForChild(BaseWork work, BaseWork child) { @@ -328,12 +330,12 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return false; } TezEdgeProperty.EdgeType edgeType = tezWork.getEdgeType(work, child); - boolean reduceSinkReplaceable = false; + boolean isReduceSinkReplaceable = false; if (isSupportEdgeType(edgeType) && isReplaceable(child.getAnyRootOperator(), true)) { - reduceSinkReplaceable = true; + isReduceSinkReplaceable = true; } if (edgeType == TezEdgeProperty.EdgeType.SIMPLE_EDGE && child instanceof ReduceWork) { - reduceSinkReplaceable = groupByEdgeReplaceable(work, child); + isReduceSinkReplaceable = groupByEdgeReplaceable(work, child); } if (child instanceof MergeJoinWork) { Operator reducer = ((MergeJoinWork) child).getMainWork().getAnyRootOperator(); @@ -341,10 +343,10 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { reducer = (Operator) reducer.getChildOperators().get(0); } if (reducer.getType() == OperatorType.MERGEJOIN && isReplaceable(reducer, false)) { - reduceSinkReplaceable = true; + isReduceSinkReplaceable = true; } } - return reduceSinkReplaceable; + return isReduceSinkReplaceable; } private boolean groupByEdgeReplaceable(BaseWork work, BaseWork child) { @@ -382,7 +384,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } if (work instanceof MergeJoinWork) { BaseWork mainWork = ((MergeJoinWork) work).getMainWork(); - if (!(mainWork instanceof ReduceWork) || ((ReduceWork) ((MergeJoinWork) work).getMainWork()).getReducer().getType() != OperatorType.MERGEJOIN) { + if (!(mainWork instanceof ReduceWork) || ((ReduceWork) ((MergeJoinWork) work) + .getMainWork()).getReducer().getType() != OperatorType.MERGEJOIN) { return true; } } @@ -390,26 +393,26 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } private boolean reduceSinkDescUnReplaceable(ReduceSinkDesc reduceSinkDesc) { - boolean unReplaceable = false; - if (reduceSinkDesc.getKeyCols().isEmpty() && !reduceSinkDesc.getReducerTraits().contains(ReduceSinkDesc.ReducerTraits.UNIFORM) - && reduceSinkDesc.getPartitionCols().size() > 0) { - unReplaceable = true; + boolean isUnReplaceable = false; + if (reduceSinkDesc.getKeyCols().isEmpty() && !reduceSinkDesc.getReducerTraits() + .contains(ReduceSinkDesc.ReducerTraits.UNIFORM) && reduceSinkDesc.getPartitionCols().size() > 0) { + isUnReplaceable = true; } - if (!isUDFSupport(reduceSinkDesc.getKeyCols(), false) || - !isUDFSupport(reduceSinkDesc.getValueCols(), false) || - !isUDFSupport(reduceSinkDesc.getPartitionCols(), false)) { - unReplaceable = true; + if (!isUDFSupport(reduceSinkDesc.getKeyCols(), false) + || !isUDFSupport(reduceSinkDesc.getValueCols(), false) + || !isUDFSupport(reduceSinkDesc.getPartitionCols(), false)) { + isUnReplaceable = true; } if (!reduceSinkDesc.getDistinctColumnIndices().isEmpty()) { - unReplaceable = true; + isUnReplaceable = true; } for (ExprNodeDesc exprNodeDesc : reduceSinkDesc.getPartitionCols()) { if (exprNodeDesc instanceof ExprNodeConstantDesc) { - unReplaceable = true; + isUnReplaceable = true; } } - return unReplaceable; + return isUnReplaceable; } private void initMergeJoinNeedSort() { @@ -423,11 +426,11 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } for (BaseWork parent : tezWork.getParents(work)) { TezEdgeProperty.EdgeType edgeType = tezWork.getEdgeType(parent, work); - boolean needSort = reduceSinkReplaceableWork.contains(parent); + boolean isNeedSort = reduceSinkReplaceableWork.contains(parent); if (reduceSinkReplaceableWork.contains(parent)) { addMergeJoinNeedSort(edgeType, reducer); } - replaceSimpleEdge(needSort, work, edgeType); + replaceSimpleEdge(isNeedSort, work, edgeType); } } } @@ -447,15 +450,16 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } if (work instanceof MergeJoinWork) { BaseWork mainWork = ((MergeJoinWork) work).getMainWork(); - if (!(mainWork instanceof ReduceWork) || ((ReduceWork) ((MergeJoinWork) work).getMainWork()).getReducer().getType() != OperatorType.MERGEJOIN) { + if (!(mainWork instanceof ReduceWork) || ((ReduceWork) ((MergeJoinWork) work).getMainWork()) + .getReducer().getType() != OperatorType.MERGEJOIN) { return; } } - boolean groupNeedSort = getGroupNeedSort(work); - boolean reduceSinkCanReplace = reduceSinkReplaceableWork.contains(work); + boolean isGroupNeedSort = getGroupNeedSort(work); + boolean isReduceSinkReplaceable = reduceSinkReplaceableWork.contains(work); VectorizedRowBatchCtx vectorizedRowBatchCtx = work.getVectorizedRowBatchCtx(); for (Operator op : work.getAllOperators()) { - traverseTezOperator(op, work, groupNeedSort, reduceSinkCanReplace, vectorizedRowBatchCtx); + traverseTezOperator(op, work, isGroupNeedSort, isReduceSinkReplaceable, vectorizedRowBatchCtx); } } @@ -476,7 +480,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (hasUnsupported) { return true; } - // If project generates new type ,also need to check + // If project generates new type, also need to check hasUnsupported = selectHasUnsupported(work); return hasUnsupported; } @@ -501,17 +505,18 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (!op.getType().equals(OperatorType.TABLESCAN)) { continue; } - RowSchema rowSchema = op.getSchema(); TableScanDesc tableScanDesc = (TableScanDesc) op.getConf(); List fieldNames = new ArrayList<>(); fieldNames.addAll(tableScanDesc.getNeededColumns()); - LinkedHashMap partSpec = mapWork.getAliasToPartnInfo().get(tableScanDesc.getAlias()).getPartSpec(); + LinkedHashMap partSpec = mapWork.getAliasToPartnInfo().get(tableScanDesc.getAlias()) + .getPartSpec(); if (partSpec != null) { fieldNames.addAll(partSpec.keySet()); } if (fieldNames.isEmpty()) { continue; } + RowSchema rowSchema = op.getSchema(); for (String field : fieldNames) { TypeInfo typeInfo = rowSchema.getColumnInfo(field).getType(); if (fieldTypeUnsupported(typeInfo)) { @@ -551,11 +556,13 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { private boolean tableScanSupport(Operator op) { TableScanDesc tableScanDesc = (TableScanDesc) op.getConf(); - if (tableScanDesc.getNeededColumns().isEmpty() || !tableScanDesc.getVirtualCols().isEmpty() || tableScanDesc.getAcidOperationalProperties() != null) { + if (tableScanDesc.getNeededColumns().isEmpty() || !tableScanDesc.getVirtualCols().isEmpty() + || tableScanDesc.getAcidOperationalProperties() != null) { return false; } Table tableMetadata = tableScanDesc.getTableMetadata(); - if (tableMetadata != null && (!checkInputFormat(tableMetadata.getInputFormatClass().getName()) || tableMetadata.getParameters().getOrDefault("transactional", "").equals("true"))) { + if (tableMetadata != null && (!checkInputFormat(tableMetadata.getInputFormatClass().getName()) + || tableMetadata.getParameters().getOrDefault("transactional", "").equals("true"))) { return false; } for (String name : tableScanDesc.getNeededColumns()) { @@ -564,16 +571,17 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } } if (tableScanDesc.isVectorized()) { - TypeInfo[] columnTypeInfos = ((VectorTableScanDesc) tableScanDesc.getVectorDesc()).getProjectedColumnTypeInfos(); + TypeInfo[] columnTypeInfos = ((VectorTableScanDesc) tableScanDesc.getVectorDesc()) + .getProjectedColumnTypeInfos(); for (int id : tableScanDesc.getNeededColumnIDs()) { - if (columnTypeInfos[id].getTypeName() == "timestamp") { + if (columnTypeInfos[id].getTypeName().equals("timestamp")) { return false; } } } else if (tableMetadata != null && tableMetadata.getCols() != null) { List colList = tableMetadata.getCols(); for (int id : tableScanDesc.getNeededColumnIDs()) { - if (colList.get(id).getType() == "timestamp") { + if (colList.get(id).getType().equals("timestamp")) { return false; } } @@ -597,8 +605,9 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return false; } - private void traverseTezOperator(Operator op, BaseWork work, boolean groupNeedSort, boolean reduceSinkCanReplace, VectorizedRowBatchCtx vectorizedRowBatchCtx) { - if (!currentReplaceable(op, reduceSinkCanReplace)) { + private void traverseTezOperator(Operator op, BaseWork work, boolean isGroupNeedSort, + boolean isReduceSinkReplaceable, VectorizedRowBatchCtx vectorizedRowBatchCtx) { + if (!currentReplaceable(op, isReduceSinkReplaceable)) { return; } if (work instanceof MapWork && op.getType().equals(OperatorType.TABLESCAN) && isReplaceable(op, false)) { @@ -606,104 +615,78 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { MapWork mapWork = (MapWork) work; for (Map.Entry> entry : mapWork.getAliasToWork().entrySet()) { if (entry.getValue().equals(op)) { - mapWork.getAliasToWork().put(entry.getKey(), replaceWithOmniOperator(new OperatorInfo(op, work.getVectorMode(), vectorizedRowBatchCtx, reduceSinkCanReplace), 0)); + mapWork.getAliasToWork().put(entry.getKey(), replaceWithOmniOperator( + new OperatorInfo(op, work.getVectorMode(), vectorizedRowBatchCtx, isReduceSinkReplaceable), 0)); replaceable.add(op); } } } else if (work instanceof ReduceWork && ((ReduceWork) work).getReducer() == op) { - if (op.getType() != null && op.getType().equals(OperatorType.GROUPBY) && groupNeedSort) { + if (op.getType() != null && op.getType().equals(OperatorType.GROUPBY) && isGroupNeedSort) { groupByOperatorNeedSort.add(op); } ReduceWork reduceWork = (ReduceWork) work; - reduceWork.setReducer(replaceWithOmniOperator(new OperatorInfo(op, work.getVectorMode(), vectorizedRowBatchCtx, reduceSinkCanReplace), reduceWork.getTagToInput().size())); + reduceWork.setReducer(replaceWithOmniOperator(new OperatorInfo(op, work.getVectorMode(), + vectorizedRowBatchCtx, isReduceSinkReplaceable), reduceWork.getTagToInput().size())); replaceable.add(op); - } else if (work instanceof MergeJoinWork && ((ReduceWork) ((MergeJoinWork) work).getMainWork()).getReducer() == op) { + } else if (work instanceof MergeJoinWork + && ((ReduceWork) ((MergeJoinWork) work).getMainWork()).getReducer() == op) { ReduceWork reduceWork = (ReduceWork) ((MergeJoinWork) work).getMainWork(); - reduceWork.setReducer(replaceWithOmniOperator(new OperatorInfo(op, work.getVectorMode(), vectorizedRowBatchCtx, reduceSinkCanReplace), reduceWork.getTagToInput().size())); + reduceWork.setReducer(replaceWithOmniOperator(new OperatorInfo(op, work.getVectorMode(), + vectorizedRowBatchCtx, isReduceSinkReplaceable), reduceWork.getTagToInput().size())); replaceable.add(op); - } else if (isReplaceable(op, reduceSinkCanReplace)) { - if (op.getType() != null && op.getType().equals(OperatorType.GROUPBY) && groupNeedSort) { + } else if (isReplaceable(op, isReduceSinkReplaceable)) { + if (op.getType() != null && op.getType().equals(OperatorType.GROUPBY) && isGroupNeedSort) { groupByOperatorNeedSort.add(op); } - replaceableInfo.add(new OperatorInfo(op, work.getVectorMode(), vectorizedRowBatchCtx, reduceSinkCanReplace)); + replaceableInfo.add(new OperatorInfo(op, work.getVectorMode(), vectorizedRowBatchCtx, + isReduceSinkReplaceable)); replaceable.add(op); } } - private boolean isReplaceable(Operator operator, boolean reduceSinkCanReplace) { + private boolean isReplaceable(Operator operator, boolean isReduceSinkReplaceable) { if (replaceable.contains(operator)) { return true; } - - return reCheckReplaceable(operator, reduceSinkCanReplace); + return reCheckReplaceable(operator, isReduceSinkReplaceable); } - private boolean reCheckReplaceable(Operator operator, boolean reduceSinkCanReplace) { + private boolean reCheckReplaceable(Operator operator, boolean isReduceSinkReplaceable) { if (operator.getType() != null && operator.getType().equals(OperatorType.MAPJOIN)) { - if (checkMapJoinInput(operator) && currentReplaceable(operator, reduceSinkCanReplace)) { + if (checkMapJoinInput(operator) && currentReplaceable(operator, isReduceSinkReplaceable)) { return true; } } - return currentReplaceable(operator, reduceSinkCanReplace); + return currentReplaceable(operator, isReduceSinkReplaceable); } - private boolean currentReplaceable(Operator operator, boolean reduceSinkCanReplace) { - if (operator.getName().contains("OMNI")) return true; - if (replaceable.contains(operator) || operator.getType() == null) { + private boolean currentReplaceable(Operator operator, boolean isReduceSinkReplaceable) { + if (operator.getName().contains("OMNI") || replaceable.contains(operator) || operator.getType() == null) { return true; } - if (!OMNI_OPERATOR.contains(operator.getType())) { - return false; - } - if (!omniHiveConf.isEnableOperator(operator.getType())) { + if (!OMNI_OPERATOR.contains(operator.getType()) || !omniHiveConf.isEnableOperator(operator.getType())) { return false; } - boolean cboEnable = HiveConf.getBoolVar(hookContext.getConf(), HiveConf.ConfVars.HIVE_CBO_ENABLED); - boolean cboRetPath = HiveConf.getBoolVar(hookContext.getConf(), HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP); + boolean isCboEnable = HiveConf.getBoolVar(hookContext.getConf(), HiveConf.ConfVars.HIVE_CBO_ENABLED); + boolean isCboRetPath = HiveConf.getBoolVar(hookContext.getConf(), HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP); switch (operator.getType()) { case MAPJOIN: return mapJoinReplaceable(operator); case MERGEJOIN: return joinReplaceable(operator); case SELECT: - if (!isUDFSupport(((SelectDesc) operator.getConf()).getColList(), false)) { - return false; - } - if (OmniSelectOperator.isInvalidSelectColumn(((SelectDesc) operator.getConf()).getColList())) { - return false; - } - boolean replaceable = true; - for (Operator parent : operator.getParentOperators()) { - replaceable = replaceable && currentReplaceable(parent, reduceSinkCanReplace); - } - if (!operator.getChildOperators().isEmpty() && operator.getChildOperators().get(0).getType().equals(OperatorType.PTF) - && !currentReplaceable(operator.getChildOperators().get(0), false)) { - replaceable = false; - } - return replaceable; + return selectReplaceable(operator, isReduceSinkReplaceable); case FILTER: - List colList = Collections.singletonList(((FilterDesc) operator.getConf()).getPredicate()); - if ((!isUDFSupport(colList, true) && !isLegalTimestamp(colList)) || !isLegalFilter(colList)) { - return false; - } - boolean result = true; - for (Operator parent : operator.getParentOperators()) { - result = result && currentReplaceable(parent, reduceSinkCanReplace); - } - for (Operator child : operator.getChildOperators()) { - if (child.getType() != null && child.getType().equals(OperatorType.SELECT)) { - SelectDesc conf = (SelectDesc) child.getConf(); - result = result && isUDFSupport(conf.getColList(), false) && isLegalFilter(conf.getColList()); - } - } - return result; + return filterReplaceable(operator, isReduceSinkReplaceable); case PTF: return PTFReplaceable(operator); case GROUPBY: - if (cboEnable && cboRetPath) return false; - return groupByReplaceable(operator, reduceSinkCanReplace); + if (isCboEnable && isCboRetPath) { + return false; + } + return groupByReplaceable(operator, isReduceSinkReplaceable); case REDUCESINK: - return reduceSinkCanReplace; + return isReduceSinkReplaceable; case TABLESCAN: return tableScanSupport(operator); default: @@ -711,6 +694,43 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } } + private boolean selectReplaceable(Operator op, boolean isReduceSinkReplaceable) { + if (!isUDFSupport(((SelectDesc) op.getConf()).getColList(), false)) { + return false; + } + if (OmniSelectOperator.isInvalidSelectColumn(((SelectDesc) op.getConf()).getColList())) { + return false; + } + boolean isReplaceable = true; + for (Operator parent : op.getParentOperators()) { + isReplaceable = isReplaceable && currentReplaceable(parent, isReduceSinkReplaceable); + } + if (!op.getChildOperators().isEmpty() && op.getChildOperators().get(0).getType().equals(OperatorType.PTF) + && !currentReplaceable(op.getChildOperators().get(0), false)) { + isReplaceable = false; + } + return isReplaceable; + } + + private boolean filterReplaceable(Operator op, boolean isReduceSinkReplaceable) { + List colList = Collections.singletonList(((FilterDesc) op.getConf()).getPredicate()); + if ((!isUDFSupport(colList, true) && !isLegalTimestamp(colList)) || !isLegalFilter(colList)) { + return false; + } + boolean isReplaceable = true; + for (Operator parent : op.getParentOperators()) { + isReplaceable = isReplaceable && currentReplaceable(parent, isReduceSinkReplaceable); + } + for (Operator child : op.getChildOperators()) { + if (child.getType() != null && child.getType().equals(OperatorType.SELECT)) { + SelectDesc conf = (SelectDesc) child.getConf(); + isReplaceable = isReplaceable && isUDFSupport(conf.getColList(), false) + && isLegalFilter(conf.getColList()); + } + } + return isReplaceable; + } + private boolean PTFSupportedExpression(List expressions) { for (PTFExpressionDef expression : expressions) { ExprNodeDesc exprNode = expression.getExprNode(); @@ -743,7 +763,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return true; } - private boolean PTFSupportedFunction(FunctionType windowFunctionType, List args, boolean isCountAll) { + private boolean PTFSupportedFunction(FunctionType windowFunctionType, List args, + boolean isCountAll) { if (args != null) { if (windowFunctionType == null || isPTFAggUnsupported(windowFunctionType, args)) { return false; @@ -768,7 +789,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { continue; } for (PTFExpressionDef expressionDef : functionDef.getArgs()) { - if (expressionDef.getExprNode() != null && expressionDef.getExprNode().getTypeInfo().getTypeName() == "timestamp") { + if (expressionDef.getExprNode() != null + && expressionDef.getExprNode().getTypeInfo().getTypeName().equals("timestamp")) { return false; } } @@ -785,8 +807,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return false; } if (mapJoinDesc.getNullSafes() != null) { - for (boolean nullsafe : mapJoinDesc.getNullSafes()) { - if (nullsafe) { + for (boolean isSafeNull : mapJoinDesc.getNullSafes()) { + if (isSafeNull) { return false; } } @@ -808,14 +830,15 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { private boolean joinCondUnsupported(MapJoinDesc mapJoinDesc) { JoinCondDesc[] joinCondDescs = mapJoinDesc.getConds(); if (joinCondDescs.length >= 2) { - if (joinCondDescs[0].getType() == JoinDesc.LEFT_OUTER_JOIN && joinCondDescs[1].getType() == JoinDesc.LEFT_SEMI_JOIN) { + if (joinCondDescs[0].getType() == JoinDesc.LEFT_OUTER_JOIN + && joinCondDescs[1].getType() == JoinDesc.LEFT_SEMI_JOIN) { return true; } } return false; } - private boolean groupByReplaceable(Operator operator, boolean reduceSinkCanReplace) { + private boolean groupByReplaceable(Operator operator, boolean isReduceSinkReplaceable) { GroupByDesc groupByDesc = (GroupByDesc) operator.getConf(); if (groupByDesc.isGroupingSetsPresent() && groupByDesc.getMode() == GroupByDesc.Mode.PARTIALS) { return false; @@ -823,7 +846,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (!isUDFSupport(groupByDesc.getKeys(), false)) { return false; } - if (isChildPTFOperatorUnReplaceable(operator, reduceSinkCanReplace)) { + if (isChildPTFOperatorUnReplaceable(operator, isReduceSinkReplaceable)) { return false; } for (AggregationDesc aggregator : groupByDesc.getAggregators()) { @@ -836,7 +859,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (!isUDFSupport(aggregator.getParameters(), false)) { return false; } - if (aggregator.getGenericUDAFName().equals("avg") && aggregator.getMode() == GenericUDAFEvaluator.Mode.PARTIAL1) { + if (aggregator.getGenericUDAFName().equals("avg") + && aggregator.getMode() == GenericUDAFEvaluator.Mode.PARTIAL1) { return false; } for (ExprNodeDesc parameter : aggregator.getParameters()) { @@ -848,11 +872,12 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return true; } - private boolean isChildPTFOperatorUnReplaceable(Operator operator, boolean reduceSinkCanReplace) { + private boolean isChildPTFOperatorUnReplaceable(Operator operator, + boolean isReduceSinkReplaceable) { Queue> operators = new LinkedBlockingQueue<>(operator.getChildOperators()); while (!operators.isEmpty()) { Operator op = operators.poll(); - if (op instanceof PTFOperator && !currentReplaceable(op, reduceSinkCanReplace)) { + if (op instanceof PTFOperator && !currentReplaceable(op, isReduceSinkReplaceable)) { return true; } operators.addAll(op.getChildOperators()); @@ -862,7 +887,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { private boolean aggParameterNotSupport(AggregationDesc aggregator, ExprNodeDesc parameter) { if (aggregator.getGenericUDAFName().equals("sum") || aggregator.getGenericUDAFName().equals("avg")) { - PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) parameter.getTypeInfo()).getPrimitiveCategory(); + PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = + ((PrimitiveTypeInfo) parameter.getTypeInfo()).getPrimitiveCategory(); return primitiveCategory == STRING || primitiveCategory == CHAR || primitiveCategory == VARCHAR; } return false; @@ -870,7 +896,6 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { private boolean joinReplaceable(Operator operator) { JoinDesc joinDesc = (JoinDesc) operator.getConf(); - JoinCondDesc[] joinCondDescs = joinDesc.getConds(); Map> keys; if (joinDesc instanceof CommonMergeJoinDesc) { keys = ((CommonMergeJoinDesc) joinDesc).getKeys(); @@ -884,8 +909,10 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (!isUDFSupport(exprNodeDescList, false)) { return false; } + JoinCondDesc[] joinCondDescs = joinDesc.getConds(); for (JoinCondDesc joinCondDesc : joinCondDescs) { - if (!SUPPORTED_JOIN.contains(joinCondDesc.getType()) || joinCondDesc.getType() == JoinDesc.FULL_OUTER_JOIN && fullJoinUnReplaceable(joinDesc)) { + if (!SUPPORTED_JOIN.contains(joinCondDesc.getType()) || joinCondDesc.getType() == JoinDesc.FULL_OUTER_JOIN + && fullJoinUnReplaceable(joinDesc)) { return false; } } @@ -904,8 +931,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return true; } if (joinDesc.getNullSafes() != null) { - for (boolean nullsafe : joinDesc.getNullSafes()) { - if (nullsafe) { + for (boolean isSafeNull : joinDesc.getNullSafes()) { + if (isSafeNull) { return true; } } @@ -933,7 +960,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { continue; } PrimitiveTypeInfo primitiveTypeInfo = ((PrimitiveObjectInspector) oi).getTypeInfo(); - if (windowFunctionType == OMNI_AGGREGATION_TYPE_AVG && primitiveTypeInfo.getPrimitiveCategory().equals(DECIMAL) + if (windowFunctionType == OMNI_AGGREGATION_TYPE_AVG + && primitiveTypeInfo.getPrimitiveCategory().equals(DECIMAL) && ((DecimalTypeInfo) primitiveTypeInfo).getPrecision() > DECIMAL64_MAX_PRECISION) { return true; } @@ -951,7 +979,9 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { ExprNodeDesc current = queue.poll(); if (current instanceof ExprNodeGenericFuncDesc) { BaseExpression expr = getBaseExpression(current); - if (expr == null) return false; + if (expr == null) { + return false; + } expressions.add(expr.toString()); current.getChildren().forEach(queue::offer); } else if ((current instanceof ExprNodeColumnDesc || current instanceof ExprNodeConstantDesc) @@ -963,12 +993,13 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } private BaseExpression getBaseExpression(ExprNodeDesc current) { - if (((ExprNodeGenericFuncDesc) current).getGenericUDF() instanceof GenericUDFConcat || - ((ExprNodeGenericFuncDesc) current).getGenericUDF() instanceof GenericUDFCoalesce && current.getChildren().size() >= 3) { + if (((ExprNodeGenericFuncDesc) current).getGenericUDF() instanceof GenericUDFConcat + || ((ExprNodeGenericFuncDesc) current).getGenericUDF() instanceof GenericUDFCoalesce + && current.getChildren().size() >= 3) { return null; } - boolean supportUDF = ExpressionUtils.isSupportUDF(((ExprNodeGenericFuncDesc) current).getGenericUDF()); - if (!supportUDF || checkUnsupportedArithmetic((ExprNodeGenericFuncDesc) current)) { + boolean isSupportedUDF = ExpressionUtils.isSupportUDF(((ExprNodeGenericFuncDesc) current).getGenericUDF()); + if (!isSupportedUDF || checkUnsupportedArithmetic((ExprNodeGenericFuncDesc) current)) { return null; } return getExpression(current); @@ -1055,11 +1086,13 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { for (PartitionDesc partitionDesc : ((MapWork) work).getPathToPartitionInfo().values()) { if (partitionDesc.getInputFileFormatClassName().equals(OrcInputFormat.class.getName())) { partitionDesc.setInputFileFormatClass(OmniOrcInputFormat.class); - } else if (partitionDesc.getInputFileFormatClassName().equals(MapredParquetInputFormat.class.getName())) { + } else if (partitionDesc.getInputFileFormatClassName() + .equals(MapredParquetInputFormat.class.getName())) { partitionDesc.setInputFileFormatClass(OmniParquetInputFormat.class); } partitionDesc.getProperties().setProperty(SERIALIZATION_LIB, VecBatchWrapperSerde.class.getName()); - partitionDesc.getTableDesc().getProperties().setProperty(SERIALIZATION_LIB, VecBatchWrapperSerde.class.getName()); + partitionDesc.getTableDesc().getProperties().setProperty(SERIALIZATION_LIB, + VecBatchWrapperSerde.class.getName()); } } } @@ -1080,9 +1113,9 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } } - private void replaceSimpleEdge(boolean reduceSinkCanReplace, BaseWork child, TezEdgeProperty.EdgeType edgeType) { + private void replaceSimpleEdge(boolean isReduceSinkReplaceable, BaseWork child, TezEdgeProperty.EdgeType edgeType) { if (edgeType == TezEdgeProperty.EdgeType.SIMPLE_EDGE) { - if (reduceSinkCanReplace) { + if (isReduceSinkReplaceable) { // replace all edge of merge join into CUSTOM_SIMPLE_EDGE setParentWorkToCurrentWorkEdge(child, TezEdgeProperty.EdgeType.CUSTOM_SIMPLE_EDGE); } else { @@ -1101,7 +1134,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { private void setParentWorkToCurrentWorkEdge(BaseWork work, TezEdgeProperty.EdgeType edgeType) { for (BaseWork parent : tezWork.getParents(work)) { - if (tezWork.getEdgeType(parent, work) == TezEdgeProperty.EdgeType.BROADCAST_EDGE || parent instanceof UnionWork) { + if (tezWork.getEdgeType(parent, work) == TezEdgeProperty.EdgeType.BROADCAST_EDGE + || parent instanceof UnionWork) { continue; } tezWork.getEdgeProperty(parent, work).setEdgeType(edgeType); @@ -1112,28 +1146,30 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return replaceWithOmniOperator(operatorInfo, 0); } - private Operator replaceWithOmniOperator(OperatorInfo operatorInfo, int originalParentsNum) { - boolean reduceSinkCanReplace = operatorInfo.isReduceSinkCanReplace(); + private Operator replaceWithOmniOperator(OperatorInfo operatorInfo, + int originalParentsNum) { + boolean isReduceSinkReplaceable = operatorInfo.isReduceSinkCanReplace(); Operator current = operatorInfo.getOperator(); - boolean vectorized = operatorInfo.isVectorized(); + boolean isVectorized = operatorInfo.isVectorized(); VectorizedRowBatchCtx vectorizedRowBatchCtx = operatorInfo.getVectorizedRowBatchCtx(); - List> parents = current.getParentOperators(); List> childs = current.getChildOperators(); List> replacedChilds = new ArrayList<>(); - Operator omniOperator = createOmniOperator(current, vectorized, reduceSinkCanReplace); + Operator omniOperator = createOmniOperator(current, + isVectorized, isReduceSinkReplaceable); VectorizationContext outputVectorizationContext = null; - if (vectorized) { + if (isVectorized) { outputVectorizationContext = getOutputVectorizationContext(current); } - Operator vectorToRowOperator = vectorized ? new OmniVectorizedVectorOperator(current.getCompilationOpContext(), new OmniVectorDesc(false), - outputVectorizationContext, vectorizedRowBatchCtx) + Operator vectorToRowOperator = isVectorized + ? new OmniVectorizedVectorOperator(current.getCompilationOpContext(), new OmniVectorDesc(false), + outputVectorizationContext, vectorizedRowBatchCtx) : new OmniVectorOperator(current.getCompilationOpContext(), new OmniVectorDesc(false)); if (childs.size() == 0 && !current.getType().equals(OperatorType.REDUCESINK)) { vectorToRowOperator.setParentOperators(Arrays.asList(omniOperator)); replacedChilds.add(vectorToRowOperator); } for (Operator child : childs) { - if (isReplaceable(child, reduceSinkCanReplace)) { + if (isReplaceable(child, isReduceSinkReplaceable)) { replacedChilds.add(child); replaceOperatorList(child.getParentOperators(), current, omniOperator); } else { @@ -1151,17 +1187,19 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } List> replacedParents = new ArrayList<>(); VectorizationContext inputVectorizationContext = null; - if (vectorized) { - inputVectorizationContext = current instanceof VectorizationOperator ? - ((VectorizationOperator) current).getInputVectorizationContext() : null; + if (isVectorized) { + inputVectorizationContext = current instanceof VectorizationOperator + ? ((VectorizationOperator) current).getInputVectorizationContext() : null; } + List> parents = current.getParentOperators(); for (Operator parent : parents) { - if (isReplaceable(parent, reduceSinkCanReplace)) { + if (isReplaceable(parent, isReduceSinkReplaceable)) { replacedParents.add(parent); replaceOperatorList(parent.getChildOperators(), current, omniOperator); } else { - Operator rowToVectorOperator = vectorized ? new OmniVectorizedVectorOperator(current.getCompilationOpContext(), new OmniVectorDesc(true), - inputVectorizationContext, vectorizedRowBatchCtx) + Operator rowToVectorOperator = isVectorized + ? new OmniVectorizedVectorOperator(current.getCompilationOpContext(), new OmniVectorDesc(true), + inputVectorizationContext, vectorizedRowBatchCtx) : new OmniVectorOperator(parent.getCompilationOpContext(), new OmniVectorDesc(true)); rowToVectorOperator.setChildOperators(Arrays.asList(omniOperator)); rowToVectorOperator.setParentOperators(Arrays.asList(parent)); @@ -1170,11 +1208,13 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } } if (parents.isEmpty() && originalParentsNum > 0) { - Operator rowToVectorOperator = vectorized ? new OmniVectorizedVectorOperator(current.getCompilationOpContext(), new OmniVectorDesc(true), - inputVectorizationContext, vectorizedRowBatchCtx) + Operator rowToVectorOperator = isVectorized + ? new OmniVectorizedVectorOperator(current.getCompilationOpContext(), new OmniVectorDesc(true), + inputVectorizationContext, vectorizedRowBatchCtx) : new OmniVectorOperator(current.getCompilationOpContext(), new OmniVectorDesc(true)); if (mergeJoinNeedSort.contains(current)) { - rowToVectorOperator = new OmniVectorWithSortOperator(current.getCompilationOpContext(), new OmniVectorDesc(true)); + rowToVectorOperator = new OmniVectorWithSortOperator(current.getCompilationOpContext(), + new OmniVectorDesc(true)); } rowToVectorOperator.setChildOperators(Arrays.asList(omniOperator)); replacedParents.add(rowToVectorOperator); @@ -1186,7 +1226,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } private void replaceOperatorList(List> operatorList, - Operator current, Operator replaced) { + Operator current, Operator replaced) { for (int i = 0; i < operatorList.size(); i++) { if (operatorList.get(i) == current) { operatorList.set(i, replaced); @@ -1211,20 +1251,22 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (current instanceof TableScanOperator) { return ((VectorizationOperator) current.getChildOperators().get(0)).getInputVectorizationContext(); } else { - return current instanceof VectorizationContextRegion ? - ((VectorizationContextRegion) current).getOutputVectorizationContext() + return current instanceof VectorizationContextRegion + ? ((VectorizationContextRegion) current).getOutputVectorizationContext() : ((VectorizationOperator) current).getInputVectorizationContext(); } } - private Operator createOmniOperator(Operator current, boolean vectorized, boolean reduceSinkCanReplace) { + private Operator createOmniOperator(Operator current, boolean isVectorized, + boolean isReduceSinkReplaceable) { VectorizationContext vectorizationContext = null; - if (vectorized && current instanceof VectorizationOperator) { + if (isVectorized && current instanceof VectorizationOperator) { vectorizationContext = ((VectorizationOperator) current).getInputVectorizationContext(); } switch (current.getType()) { case SELECT: - OmniSelectOperator omniSelectOperator = new OmniSelectOperator(current.getCompilationOpContext(), (SelectDesc) current.getConf()); + OmniSelectOperator omniSelectOperator = new OmniSelectOperator(current.getCompilationOpContext(), + (SelectDesc) current.getConf()); omniSelectOperator.setSchema(current.getSchema()); return omniSelectOperator; case FILTER: @@ -1242,29 +1284,38 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } case MERGEJOIN: if (mergeJoinNeedSort.contains(current)) { - return new OmniMergeJoinWithSortOperator(current.getCompilationOpContext(), (CommonMergeJoinDesc) current.getConf()); + return new OmniMergeJoinWithSortOperator(current.getCompilationOpContext(), + (CommonMergeJoinDesc) current.getConf()); } else { - return new OmniMergeJoinOperator(current.getCompilationOpContext(), (CommonMergeJoinDesc) current.getConf()); + return new OmniMergeJoinOperator(current.getCompilationOpContext(), + (CommonMergeJoinDesc) current.getConf()); } case GROUPBY: - return new OmniGroupByOperator(current.getCompilationOpContext(), (GroupByDesc) current.getConf(), vectorizationContext, - vectorized ? ((VectorGroupByOperator) current).getOutputVectorizationContext() : null); + return new OmniGroupByOperator(current.getCompilationOpContext(), + (GroupByDesc) current.getConf(), vectorizationContext, + isVectorized ? ((VectorGroupByOperator) current).getOutputVectorizationContext() : null); case PTF: return new OmniPTFOperator(current.getCompilationOpContext(), (PTFDesc) current.getConf()); case REDUCESINK: - OmniReduceSinkOperator omniReduceSinkOperator = new OmniReduceSinkOperator(current.getCompilationOpContext(), (ReduceSinkDesc) current.getConf(), reduceSinkCanReplace); + OmniReduceSinkOperator omniReduceSinkOperator = new OmniReduceSinkOperator( + current.getCompilationOpContext(), + (ReduceSinkDesc) current.getConf(), isReduceSinkReplaceable); ReduceSinkDesc conf = (ReduceSinkDesc) current.getConf(); if (conf.getTopN() == -1) { - omniReduceSinkOperator.getConf().getKeySerializeInfo().getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); - omniReduceSinkOperator.getConf().getValueSerializeInfo().getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); + omniReduceSinkOperator.getConf().getKeySerializeInfo().getProperties().setProperty( + SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); + omniReduceSinkOperator.getConf().getValueSerializeInfo().getProperties().setProperty( + SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); } else { needSortSerdeReduceSinkName.add(conf.getOutputName()); - omniReduceSinkOperator.getConf().getKeySerializeInfo().getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchOrderSerDe.class.getName()); - omniReduceSinkOperator.getConf().getValueSerializeInfo().getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchOrderSerDe.class.getName()); + omniReduceSinkOperator.getConf().getKeySerializeInfo().getProperties().setProperty( + SERIALIZATION_LIB, OmniVecBatchOrderSerDe.class.getName()); + omniReduceSinkOperator.getConf().getValueSerializeInfo().getProperties().setProperty( + SERIALIZATION_LIB, OmniVecBatchOrderSerDe.class.getName()); } return omniReduceSinkOperator; case TABLESCAN: - if (vectorized) { + if (isVectorized) { return new OmniVectorizedTableScanOperator((TableScanOperator) current); } else { return new OmniTableScanOperator((TableScanOperator) current); @@ -1296,10 +1347,12 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { TableDesc keyDesc = ((ReduceWork) work).getKeyDesc(); if (needSortSerdeReduceSinkName.contains(work.getName())) { keyDesc.getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchOrderSerDe.class.getName()); - tagToValueDesc.get(tag).getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchOrderSerDe.class.getName()); + tagToValueDesc.get(tag).getProperties().setProperty( + SERIALIZATION_LIB, OmniVecBatchOrderSerDe.class.getName()); } else { keyDesc.getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); - tagToValueDesc.get(tag).getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); + tagToValueDesc.get(tag).getProperties().setProperty( + SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); } } } @@ -1329,10 +1382,12 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { keyDesc.getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); if (conf.getNoOuterJoin()) { List valueDesc = conf.getValueTblDescs(); - valueDesc.get(entry.getKey()).getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); + valueDesc.get(entry.getKey()).getProperties().setProperty( + SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); } else { List valueDesc = conf.getValueFilteredTblDescs(); - valueDesc.get(entry.getKey()).getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); + valueDesc.get(entry.getKey()).getProperties().setProperty( + SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); } } } @@ -1341,19 +1396,19 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } } - private class OperatorInfo { private Operator operator; - private boolean vectorized; + private boolean isVectorized; private VectorizedRowBatchCtx vectorizedRowBatchCtx; - private boolean reduceSinkCanReplace; + private boolean isReduceSinkReplaceable; - public OperatorInfo(Operator operator, boolean vectorized, VectorizedRowBatchCtx vectorizedRowBatchCtx, boolean reduceSinkCanReplace) { + public OperatorInfo(Operator operator, boolean isVectorized, VectorizedRowBatchCtx vectorizedRowBatchCtx, + boolean isReduceSinkReplaceable) { this.operator = operator; - this.vectorized = vectorized; + this.isVectorized = isVectorized; this.vectorizedRowBatchCtx = vectorizedRowBatchCtx; - this.reduceSinkCanReplace = reduceSinkCanReplace; + this.isReduceSinkReplaceable = isReduceSinkReplaceable; } @Override @@ -1374,7 +1429,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } public boolean isVectorized() { - return vectorized; + return isVectorized; } public VectorizedRowBatchCtx getVectorizedRowBatchCtx() { @@ -1382,8 +1437,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { } public boolean isReduceSinkCanReplace() { - return reduceSinkCanReplace; + return isReduceSinkReplaceable; } } - } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterDesc.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterDesc.java index 20e44d520..7edd8c264 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterDesc.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterDesc.java @@ -1,9 +1,28 @@ +/* + * Copyright (C) 2023-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.huawei.boostkit.hive; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.FilterDesc; -@Explain(displayName = "Omni Filter Operator", explainLevels = {Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED}) +@Explain(displayName = "Omni Filter Operator", explainLevels = {Explain.Level.USER, Explain.Level.DEFAULT, + Explain.Level.EXTENDED}) public class OmniFilterDesc extends FilterDesc { public OmniFilterDesc(FilterDesc filterDesc) { super(filterDesc.getPredicate(), filterDesc.getIsSamplingPred(), filterDesc.getSampleDescr()); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterOperator.java index ff83da16f..7468ff222 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniFilterOperator.java @@ -18,6 +18,8 @@ package com.huawei.boostkit.hive; +import static nova.hetu.omniruntime.operator.config.OverflowConfig.OverflowConfigId.OVERFLOW_CONFIG_NULL; + import com.huawei.boostkit.hive.expression.BaseExpression; import com.huawei.boostkit.hive.expression.ExpressionUtils; import com.huawei.boostkit.hive.expression.ReferenceFactor; @@ -35,9 +37,7 @@ import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.vector.VecBatch; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -54,21 +54,17 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import static nova.hetu.omniruntime.operator.config.OverflowConfig.OverflowConfigId.OVERFLOW_CONFIG_NULL; - public class OmniFilterOperator extends OmniHiveOperator implements Serializable { - private transient OmniFilterAndProjectOperatorFactory filterAndProjectOperatorFactory; - - private transient OmniOperator omniOperator; - private transient Iterator output; - - private static Cache cache= CacheBuilder.newBuilder().concurrencyLevel(8).initialCapacity(10) - .maximumSize(100).recordStats().removalListener(notification ->{ + private static final long serialVersionUID = 1L; + private static Cache cache = CacheBuilder.newBuilder().concurrencyLevel(8).initialCapacity(10) + .maximumSize(100).recordStats().removalListener(notification -> { ((OmniFilterAndProjectOperatorFactory) notification.getValue()).close(); }).build(); + private static boolean hasAddedCloseThread = false; - private static boolean addCloseThread = false; - + private transient OmniFilterAndProjectOperatorFactory filterAndProjectOperatorFactory; + private transient OmniOperator omniOperator; + private transient Iterator output; public OmniFilterOperator() { super(); } @@ -89,17 +85,20 @@ public class OmniFilterOperator extends OmniHiveOperator impleme BaseExpression root; if (predicate instanceof ExprNodeGenericFuncDesc) { root = ExpressionUtils.build((ExprNodeGenericFuncDesc) predicate, inputObjInspectors[0]); - } else if (predicate instanceof ExprNodeColumnDesc) { - root = ExpressionUtils.wrapNotNullExpression((ReferenceFactor) ExpressionUtils.createReferenceNode(predicate, inputObjInspectors[0])); + } else if (predicate instanceof ExprNodeColumnDesc) { + root = ExpressionUtils.wrapNotNullExpression( + (ReferenceFactor) ExpressionUtils.createReferenceNode(predicate, inputObjInspectors[0])); } else { root = ExpressionUtils.createLiteralNode(predicate); } - List allStructFieldRefs = ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs(); + List allStructFieldRefs = ( + (StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs(); DataType[] inputTypes = new DataType[allStructFieldRefs.size()]; String[] projections = new String[allStructFieldRefs.size()]; for (int i = 0; i < allStructFieldRefs.size(); i++) { if (allStructFieldRefs.get(i).getFieldObjectInspector() instanceof PrimitiveObjectInspector) { - PrimitiveTypeInfo typeInfo = ((PrimitiveObjectInspector) allStructFieldRefs.get(i).getFieldObjectInspector()).getTypeInfo(); + PrimitiveTypeInfo typeInfo = ( + (PrimitiveObjectInspector) allStructFieldRefs.get(i).getFieldObjectInspector()).getTypeInfo(); int omniType = TypeUtils.convertHiveTypeToOmniType(typeInfo); inputTypes[i] = TypeUtils.buildInputDataType(typeInfo); projections[i] = TypeUtils.buildExpression(typeInfo, omniType, i); @@ -107,8 +106,8 @@ public class OmniFilterOperator extends OmniHiveOperator impleme } String cacheKey = root.toString() + Arrays.toString(inputTypes) + Arrays.toString(projections); - OmniFilterAndProjectOperatorFactory omniFilterAndProjectOperatorFactory = (OmniFilterAndProjectOperatorFactory) cache - .getIfPresent(cacheKey); + OmniFilterAndProjectOperatorFactory omniFilterAndProjectOperatorFactory = + (OmniFilterAndProjectOperatorFactory) cache.getIfPresent(cacheKey); if (omniFilterAndProjectOperatorFactory != null) { this.filterAndProjectOperatorFactory = omniFilterAndProjectOperatorFactory; this.omniOperator = this.filterAndProjectOperatorFactory.createOperator(); @@ -120,9 +119,9 @@ public class OmniFilterOperator extends OmniHiveOperator impleme this.omniOperator = this.filterAndProjectOperatorFactory.createOperator(); cache.put(cacheKey, this.filterAndProjectOperatorFactory); - if (!addCloseThread) { + if (!hasAddedCloseThread) { Runtime.getRuntime().addShutdownHook(new Thread(() -> cache.invalidateAll())); - addCloseThread = true; + hasAddedCloseThread = true; } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByDesc.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByDesc.java index b27c006a5..79f7d06f8 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByDesc.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByDesc.java @@ -18,11 +18,11 @@ package com.huawei.boostkit.hive; - import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.GroupByDesc; -@Explain(displayName = "Omni Group By Operator", explainLevels = {Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED}) +@Explain(displayName = "Omni Group By Operator", explainLevels = {Explain.Level.USER, Explain.Level.DEFAULT, + Explain.Level.EXTENDED}) public class OmniGroupByDesc extends GroupByDesc { public OmniGroupByDesc(GroupByDesc groupByDesc) { super(groupByDesc.getMode(), groupByDesc.getOutputColumnNames(), groupByDesc.getKeys(), diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java index c1cc317dc..e808b5e89 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hive.ql.exec.GroupByOperator.shouldEmitSummaryRo import com.huawei.boostkit.hive.cache.VectorCache; import com.huawei.boostkit.hive.expression.ExpressionUtils; import com.huawei.boostkit.hive.expression.TypeUtils; + import javolution.util.FastBitSet; import nova.hetu.omniruntime.constants.FunctionType; import nova.hetu.omniruntime.operator.OmniOperator; @@ -49,9 +50,11 @@ import nova.hetu.omniruntime.vector.VarcharVec; import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; import nova.hetu.omniruntime.vector.VecFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; @@ -84,7 +87,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; @@ -102,13 +104,15 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class OmniGroupByOperator extends OmniHiveOperator implements Serializable, VectorizationContextRegion, IConfigureJobConf { +public class OmniGroupByOperator extends OmniHiveOperator implements Serializable, + VectorizationContextRegion, IConfigureJobConf { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(OmniGroupByOperator.class.getName()); + private transient OmniOperatorFactory omniOperatorFactory; private transient OmniOperator omniOperator; private transient List keyFields; - private transient boolean firstRow; + private transient boolean isFirstRow; private transient List> aggChannelFields; private transient ObjectInspector[] keyObjectInspectors; @@ -118,7 +122,7 @@ public class OmniGroupByOperator extends OmniHiveOperator imple private transient ObjectInspector[][] aggregationParameterObjectInspectors; private transient List allStructFieldRefs; private transient int numKeys; - private transient boolean groupingSetsPresent; + private transient boolean isGroupingSetsPresent; private transient List groupingSets; private transient int outputKeyLength; private VectorizationContext vectorizationContext; @@ -141,7 +145,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple super(ctx); } - public OmniGroupByOperator(CompilationOpContext ctx, GroupByDesc conf, VectorizationContext vectorizationContext, VectorizationContext vOutContext) { + public OmniGroupByOperator(CompilationOpContext ctx, GroupByDesc conf, + VectorizationContext vectorizationContext, VectorizationContext vOutContext) { super(ctx); this.conf = new OmniGroupByDesc(conf); this.vectorizationContext = vectorizationContext; @@ -230,7 +235,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple } aggOutputObjectInspectors = new ObjectInspector[aggregateCount]; for (int i = 0; i < aggregateCount; i++) { - ObjectInspector objInsp = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(vecAggrDescs[i].getOutputTypeInfo()); + ObjectInspector objInsp = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo( + vecAggrDescs[i].getOutputTypeInfo()); aggOutputObjectInspectors[i] = objInsp; objectInspectors[i + outputKeyLength] = objInsp; } @@ -243,20 +249,9 @@ public class OmniGroupByOperator extends OmniHiveOperator imple .getStandardStructObjectInspector(aggOutputFieldsNames, Arrays.asList(aggOutputObjectInspectors)); List aggOutputFields = aggOutputObjInspector.getAllStructFieldRefs(); - FunctionType[] aggFunctionTypes = getFunctionTypeFromAggs(aggs); - DataType[][] aggOutputTypes = getTwoDimenOutputDataType(aggOutputFields); - String[] aggChannelsFilter = {null}; - OverflowConfig overflowConfig = new OverflowConfig(OverflowConfig.OverflowConfigId.OVERFLOW_CONFIG_NULL); - OperatorConfig operatorConfig = new OperatorConfig(overflowConfig); - boolean[] isInputRaws = getIsInputRaws(aggs); - boolean[] isOutputPartials = getIsOutputPartials(aggs); - String[] groupByChanel; - String[][] aggChannels; - DataType[] sourceTypes; - // Initialize the constants for the grouping sets, so that they can be re-used for each row - groupingSetsPresent = conf.isGroupingSetsPresent(); - if (groupingSetsPresent) { + isGroupingSetsPresent = conf.isGroupingSetsPresent(); + if (isGroupingSetsPresent) { groupingSets = conf.getListGroupingSets(); int groupingSetsPosition = conf.getGroupingSetPosition(); LongWritable[] newKeysGroupingSets = new LongWritable[groupingSets.size()]; @@ -270,8 +265,11 @@ public class OmniGroupByOperator extends OmniHiveOperator imple } } - if ((allStructFieldRefs.size() == 2) && - allStructFieldRefs.get(0).getFieldObjectInspector() instanceof StandardStructObjectInspector) { + String[] groupByChanel; + String[][] aggChannels; + DataType[] sourceTypes; + if ((allStructFieldRefs.size() == 2) + && allStructFieldRefs.get(0).getFieldObjectInspector() instanceof StandardStructObjectInspector) { List keyStructFieldRefs = ((StandardStructObjectInspector) allStructFieldRefs.get(0).getFieldObjectInspector()) .getAllStructFieldRefs(); @@ -289,6 +287,14 @@ public class OmniGroupByOperator extends OmniHiveOperator imple groupByChanel = getExprFromExprNode(keyFields); aggChannels = getTwoDimenExprFromExprNode(aggChannelFields); } + + FunctionType[] aggFunctionTypes = getFunctionTypeFromAggs(aggs); + DataType[][] aggOutputTypes = getTwoDimenOutputDataType(aggOutputFields); + String[] aggChannelsFilter = {null}; + OverflowConfig overflowConfig = new OverflowConfig(OverflowConfig.OverflowConfigId.OVERFLOW_CONFIG_NULL); + OperatorConfig operatorConfig = new OperatorConfig(overflowConfig); + boolean[] isInputRaws = getIsInputRaws(aggs); + boolean[] isOutputPartials = getIsOutputPartials(aggs); if (numKeys == 0) { omniOperatorFactory = new OmniAggregationWithExprOperatorFactory(groupByChanel, aggChannels, aggChannelsFilter, sourceTypes, aggFunctionTypes, aggOutputTypes, isInputRaws, isOutputPartials, @@ -314,7 +320,7 @@ public class OmniGroupByOperator extends OmniHiveOperator imple outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(outputFieldNames, Arrays.asList(objectInspectors)); createOmniOperator(); - firstRow = true; + isFirstRow = true; constantVec = new Vec[recordConstantColIds.size()]; for (int i = 0; i < constantVec.length; i++) { constantVec[i] = createConstantVec(constantEvaluators.get(i), VectorCache.BATCH); @@ -373,7 +379,6 @@ public class OmniGroupByOperator extends OmniHiveOperator imple private DataType[][] getTwoDimenOutputDataType(List structFields) { List twoDimenDataTypes = new ArrayList<>(); - // TODO Confirming the Multidimensional array processing flow for (StructField structField : structFields) { List dataTypes = new ArrayList<>(); PrimitiveTypeInfo typeInfo = ((PrimitiveObjectInspector) structField.getFieldObjectInspector()) @@ -414,12 +419,12 @@ public class OmniGroupByOperator extends OmniHiveOperator imple List dataTypes = new ArrayList<>(); dataTypes.addAll(getListDataTypeFromStructField(fieldRefs)); for (ExprNodeEvaluator keyField : keyFields) { - if (keyField instanceof ExprNodeConstantEvaluator && !groupingSetsPresent) { + if (keyField instanceof ExprNodeConstantEvaluator && !isGroupingSetsPresent) { constantColIds.offer(dataTypes.size()); dataTypes.add(buildInputDataType(keyField.getExpr().getTypeInfo())); } } - if (groupingSetsPresent) { + if (isGroupingSetsPresent) { dataTypes.add(LongDataType.LONG); } for (List aggChannelField : aggChannelFields) { @@ -434,7 +439,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple return dataTypes.toArray(new DataType[0]); } - private DataType[] getDataTypeFromStructField(List keyStructFieldRefs, List valueStructFieldRefs) { + private DataType[] getDataTypeFromStructField(List keyStructFieldRefs, + List valueStructFieldRefs) { List dataTypes = new ArrayList<>(); dataTypes.addAll(getListDataTypeFromStructField(keyStructFieldRefs)); dataTypes.addAll(getListDataTypeFromStructField(valueStructFieldRefs)); @@ -449,14 +455,17 @@ public class OmniGroupByOperator extends OmniHiveOperator imple List expressions = new ArrayList<>(); for (int i = 0; i < nodes.size(); i++) { if (nodes.get(i) instanceof ExprNodeGenericFuncEvaluator) { - expressions.add(ExpressionUtils.build((ExprNodeGenericFuncDesc) nodes.get(i).getExpr(), inputObjInspectors[0]).toString()); + expressions.add(ExpressionUtils.build( + (ExprNodeGenericFuncDesc) nodes.get(i).getExpr(), inputObjInspectors[0]).toString()); } else if (nodes.get(i) instanceof ExprNodeColumnEvaluator) { String exprStr = ((ExprNodeColumnEvaluator) nodes.get(i)).getExpr().getColumn(); - int columnId = isColumnIdFromExprStr ? getFieldIdFromExprStr(exprStr) : getFieldIdFromFieldName(exprStr); + int columnId = isColumnIdFromExprStr + ? getFieldIdFromExprStr(exprStr) : getFieldIdFromFieldName(exprStr); expressions.add(TypeUtils.buildExpression(nodes.get(i).getExpr().getTypeInfo(), columnId)); } else if (nodes.get(i) instanceof ExprNodeConstantEvaluator) { - if (groupingSetsPresent) { - expressions.add(TypeUtils.buildExpression(nodes.get(i).getExpr().getTypeInfo(), allStructFieldRefs.size())); + if (isGroupingSetsPresent) { + expressions.add(TypeUtils.buildExpression(nodes.get(i).getExpr().getTypeInfo(), + allStructFieldRefs.size())); } else { Integer colId = constantColIds.poll(); expressions.add(TypeUtils.buildExpression(nodes.get(i).getExpr().getTypeInfo(), colId)); @@ -514,7 +523,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple Vec flatVec = (vec instanceof DictionaryVec) ? ((DictionaryVec) vec).expandDictionary() : vec; byte[] rawValueNulls = vec.getRawValueNulls(); DataType.DataTypeId dataTypeId = vec.getType().getId(); - int[] rawValueOffset = (dataTypeId == OMNI_VARCHAR || dataTypeId == OMNI_CHAR) ? ((VarcharVec) flatVec).getRawValueOffset() : new int[0]; + int[] rawValueOffset = (dataTypeId == OMNI_VARCHAR || dataTypeId == OMNI_CHAR) + ? ((VarcharVec) flatVec).getRawValueOffset() : new int[0]; for (int i = 0; i < groupingSetSize; i++) { newVec.setNulls(i * rowCount, rawValueNulls, 0, rowCount); if ((groupingSets.get(i) & mask) == 0) { @@ -564,7 +574,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple if (!node.isEmpty()) { for (ExprNodeEvaluator exprNodeEvaluator : node) { if (exprNodeEvaluator instanceof ExprNodeColumnEvaluator) { - aggChannels.add(getFieldIdFromFieldName(((ExprNodeColumnEvaluator) exprNodeEvaluator).getExpr().getColumn())); + aggChannels.add(getFieldIdFromFieldName( + ((ExprNodeColumnEvaluator) exprNodeEvaluator).getExpr().getColumn())); } } } @@ -643,8 +654,10 @@ public class OmniGroupByOperator extends OmniHiveOperator imple break; case OMNI_DECIMAL128: HiveDecimal hiveDecimal = (HiveDecimal) exprValue; - DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) exprNodeConstantEvaluator.getExpr().getTypeInfo(); - ((Decimal128Vec) newVec).setBigInteger(i, hiveDecimal.bigIntegerBytesScaled(decimalTypeInfo.getScale()), hiveDecimal.signum() == -1); + DecimalTypeInfo decimalTypeInfo = + (DecimalTypeInfo) exprNodeConstantEvaluator.getExpr().getTypeInfo(); + ((Decimal128Vec) newVec).setBigInteger(i, + hiveDecimal.bigIntegerBytesScaled(decimalTypeInfo.getScale()), hiveDecimal.signum() == -1); break; case OMNI_VARCHAR: case OMNI_CHAR: @@ -674,8 +687,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple @Override public void process(Object row, int tag) throws HiveException { VecBatch input = (VecBatch) row; - firstRow = false; - if (groupingSetsPresent) { + isFirstRow = false; + if (isGroupingSetsPresent) { input = expandVecBatch(input); } if (!recordConstantColIds.isEmpty()) { @@ -697,7 +710,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple private VecBatch removeVector(VecBatch vecBatch, int vecIndex) { int vecCount = vecBatch.getVectorCount(); if (vecIndex >= vecCount || vecIndex < 0) { - throw new IllegalArgumentException("The vecIndex exceeds the vecBatch size. vecCount: " + vecCount + ", vecIndex: " + vecIndex); + throw new IllegalArgumentException("The vecIndex exceeds the vecBatch size. vecCount: " + + vecCount + ", vecIndex: " + vecIndex); } Vec[] vecs = new Vec[vecCount - 1]; for (int i = 0, j = 0; j < vecCount; j++) { @@ -713,12 +727,14 @@ public class OmniGroupByOperator extends OmniHiveOperator imple private VecBatch createVecBatch(int pos) { // This VecBatch has only one row of data, and each column is NULL. - List structFields = ((StandardStructObjectInspector) outputObjInspector).getAllStructFieldRefs(); + List structFields = ( + (StandardStructObjectInspector) outputObjInspector).getAllStructFieldRefs(); int vectorCount = structFields.size(); Vec[] vecs = new Vec[vectorCount]; for (int i = 0; i < vectorCount; i++) { ObjectInspector fieldObjectInspector = structFields.get(i).getFieldObjectInspector(); - vecs[i] = VecFactory.createFlatVec(1, buildInputDataType(((PrimitiveObjectInspector) fieldObjectInspector).getTypeInfo())); + vecs[i] = VecFactory.createFlatVec(1, buildInputDataType( + ((PrimitiveObjectInspector) fieldObjectInspector).getTypeInfo())); if (i == pos && pos < outputKeyLength) { ((LongVec) vecs[i]).set(0, (1L << pos) - 1); } else if (i >= numKeys && aggs.get(i - numKeys).getGenericUDAFName().equals("count")) { @@ -739,8 +755,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple protected void closeOp(boolean abort) throws HiveException { if (!abort) { // If there is no grouping key and no row came to this operator - if (firstRow && GroupByOperator.shouldEmitSummaryRow(conf)) { - firstRow = false; + if (isFirstRow && GroupByOperator.shouldEmitSummaryRow(conf)) { + isFirstRow = false; int pos = conf.getGroupingSetPosition(); VecBatch vecBatch = createVecBatch(pos); forward(vecBatch, outputObjInspector); @@ -749,7 +765,8 @@ public class OmniGroupByOperator extends OmniHiveOperator imple while (output.hasNext()) { VecBatch next = output.next(); if (outputObjInspector instanceof StandardStructObjectInspector - && next.getVectorCount() != ((StandardStructObjectInspector) outputObjInspector).getAllStructFieldRefs().size()) { + && next.getVectorCount() != ((StandardStructObjectInspector) outputObjInspector) + .getAllStructFieldRefs().size()) { next = removeVector(next, numKeys - 1); } forward(next, outputObjInspector); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveOperator.java index 66c5fa7db..cb570d0c8 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveOperator.java @@ -22,7 +22,6 @@ import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -136,7 +135,6 @@ public abstract class OmniHiveOperator extends Operator< parents.offer((Operator) parent); } } - } public static VecBatch copyVecBatch(VecBatch vecBatch) { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniJoinOperator.java index e17570433..16e82f993 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniJoinOperator.java @@ -80,7 +80,12 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; public class OmniJoinOperator extends CommonJoinOperator implements Serializable { + public OmniJoinOperator(CompilationOpContext ctx) { + super((ctx)); + } + private static final long serialVersionUID = 1L; + private transient Iterator output; protected static final int SMJ_NEED_ADD_STREAM_TBL_DATA = 2; protected static final int SMJ_NEED_ADD_BUFFERED_TBL_DATA = 3; @@ -102,16 +107,11 @@ public class OmniJoinOperator extends CommonJoinOperator impl protected transient Queue[] bufferData; protected transient DataType[][] streamTypes; protected transient DataType[][] bufferTypes; - private transient Iterator output; protected OmniJoinOperator() { super(); } - public OmniJoinOperator(CompilationOpContext ctx) { - super((ctx)); - } - public OmniJoinOperator(CompilationOpContext ctx, JoinDesc joinDesc) { super(ctx); this.conf = new OmniMergeJoinDesc(joinDesc); @@ -173,7 +173,6 @@ public class OmniJoinOperator extends CommonJoinOperator impl .flatMap(alias -> ((StructObjectInspector) inputObjInspectors[alias]).getAllStructFieldRefs().stream() .flatMap(keyValue -> ((StructObjectInspector) keyValue.getFieldObjectInspector()) .getAllStructFieldRefs().stream())).collect(Collectors.toList()); - DataType[] inputTypes = new DataType[inputFields.size()]; List> colNameToId = new ArrayList<>(); aliasList.forEach(a -> colNameToId.add(new HashMap<>())); int[] fieldNum = new int[aliasList.size()]; @@ -187,11 +186,13 @@ public class OmniJoinOperator extends CommonJoinOperator impl .getAllStructFieldRefs().size()).sum(); } int tagIndex = 0; + DataType[] inputTypes = new DataType[inputFields.size()]; for (int i = 0; i < inputFields.size(); i++) { if (i >= fieldNum[tagIndex]) { ++tagIndex; } - inputTypes[i] = TypeUtils.buildInputDataType(((AbstractPrimitiveObjectInspector) inputFields.get(i).getFieldObjectInspector()).getTypeInfo()); + inputTypes[i] = TypeUtils.buildInputDataType(((AbstractPrimitiveObjectInspector) inputFields.get(i) + .getFieldObjectInspector()).getTypeInfo()); colNameToId.get(tagIndex).put(inputFields.get(i).getFieldName(), i); } int[] outputCols; @@ -235,12 +236,13 @@ public class OmniJoinOperator extends CommonJoinOperator impl List inspectors = IntStream.range(0, bufferIndex + 1).boxed() .flatMap(tableIndex -> ((StructObjectInspector) inputObjInspectors[tableIndex]).getAllStructFieldRefs() .stream().flatMap(keyValue -> ((StructObjectInspector) keyValue.getFieldObjectInspector()) - .getAllStructFieldRefs().stream())).sorted(Comparator.comparing(StructField::getFieldName)) + .getAllStructFieldRefs().stream())).sorted(Comparator.comparing(StructField::getFieldName)) .map(field -> field.getFieldObjectInspector()).collect(Collectors.toList()); Map inputColNameToExprName = new HashMap<>(); for (Map.Entry entry : conf.getColumnExprMap().entrySet()) { ExprNodeColumnDesc exprNodeColumnDesc = (ExprNodeColumnDesc) entry.getValue(); - inputColNameToExprName.put(exprNodeColumnDesc.getColumn().replace("VALUE.", "").replace("KEY.", ""), entry.getKey()); + inputColNameToExprName.put(exprNodeColumnDesc.getColumn() + .replace("VALUE.", "").replace("KEY.", ""), entry.getKey()); } List fieldNames = conf.getColumnExprMap().keySet().stream().sorted().collect(Collectors.toList()); StructObjectInspector exprObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, @@ -250,8 +252,9 @@ public class OmniJoinOperator extends CommonJoinOperator impl } private ExprNodeGenericFuncDesc getResidualFilter() { - List filters = residualJoinFilters.stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); - if (filters.size() ==1) { + List filters = residualJoinFilters.stream() + .map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); + if (filters.size() == 1) { return (ExprNodeGenericFuncDesc) filters.get(0); } try { @@ -310,9 +313,11 @@ public class OmniJoinOperator extends CommonJoinOperator impl protected void processOmni(int opIndex, int bufferIndex) throws HiveException { if (flowControlCode[opIndex] != SCAN_FINISH && resCode[opIndex] == RES_INIT) { if (flowControlCode[opIndex] == SMJ_NEED_ADD_STREAM_TBL_DATA) { - processOmniSmj(opIndex, opIndex, streamData, streamOperators, SMJ_NEED_ADD_STREAM_TBL_DATA, streamTypes); + processOmniSmj(opIndex, opIndex, streamData, streamOperators, + SMJ_NEED_ADD_STREAM_TBL_DATA, streamTypes); } else { - processOmniSmj(opIndex, bufferIndex, bufferData, bufferOperators, SMJ_NEED_ADD_BUFFERED_TBL_DATA, bufferTypes); + processOmniSmj(opIndex, bufferIndex, bufferData, bufferOperators, + SMJ_NEED_ADD_BUFFERED_TBL_DATA, bufferTypes); } } if (resCode[opIndex] == SMJ_FETCH_JOIN_DATA) { @@ -351,7 +356,8 @@ public class OmniJoinOperator extends CommonJoinOperator impl if (data[opIndex].isEmpty() && fetchDone[dataIndex]) { setStatus(operators[opIndex].addInput(createEofVecBatch(types[opIndex])), opIndex); } else { - while (flowControlCode[opIndex] == controlCode && resCode[opIndex] == RES_INIT && !data[opIndex].isEmpty()) { + while (flowControlCode[opIndex] == controlCode + && resCode[opIndex] == RES_INIT && !data[opIndex].isEmpty()) { setStatus(operators[opIndex].addInput(data[opIndex].poll()), opIndex); } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniOrcRecordReader.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniOrcRecordReader.java index 39f11958c..d0fbf3f0b 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniOrcRecordReader.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniOrcRecordReader.java @@ -41,11 +41,9 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; -import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileSplit; @@ -53,22 +51,20 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hive.com.esotericsoftware.kryo.Kryo; import org.apache.hive.com.esotericsoftware.kryo.io.Input; import org.apache.orc.OrcConf; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; import org.apache.orc.TypeDescription; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; public class OmniOrcRecordReader implements RecordReader, StatsProvidingRecordReader { - private static final Map CATEGORY_TO_OMNI_TYPE = new HashMap() { + private static final Map CATEGORY_TO_OMNI_TYPE = + new HashMap() { { put(TypeDescription.Category.SHORT, ShortDataType.SHORT); put(TypeDescription.Category.INT, IntDataType.INTEGER); @@ -85,6 +81,7 @@ public class OmniOrcRecordReader implements RecordReader18) { + if (typeDescription.getPrecision() > 18) { typeIds[i] = Decimal128DataType.DECIMAL128.getId().toValue(); } else { typeIds[i] = Decimal64DataType.DECIMAL64.getId().toValue(); } } else { - typeIds[i] = CATEGORY_TO_OMNI_TYPE.get(requiredSchema.getChildren().get(i).getCategory()).getId().toValue(); + typeIds[i] = CATEGORY_TO_OMNI_TYPE.get(requiredSchema.getChildren() + .get(i).getCategory()).getId().toValue(); } } + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)).filesystem(split.getPath().getFileSystem(conf)); recordReader = new OrcColumnarBatchScanReader(); recordReader.initializeReaderJava(split.getPath().toUri(), readerOptions); recordReader.initializeRecordReaderJava(options); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniParquetInputFormat.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniParquetInputFormat.java index 76850baea..6bed74874 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniParquetInputFormat.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniParquetInputFormat.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniParquetRecordReader.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniParquetRecordReader.java index 5e3dcd42c..dfaacc5fc 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniParquetRecordReader.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniParquetRecordReader.java @@ -22,20 +22,20 @@ import static com.huawei.boostkit.hive.cache.VectorCache.BATCH; import com.google.common.base.Strings; -import nova.hetu.omniruntime.vector.IntVec; -import nova.hetu.omniruntime.vector.LongVec; -import nova.hetu.omniruntime.vector.Decimal128Vec; -import nova.hetu.omniruntime.vector.Vec; -import nova.hetu.omniruntime.vector.VecBatch; import nova.hetu.omniruntime.type.BooleanDataType; import nova.hetu.omniruntime.type.DataType; -import nova.hetu.omniruntime.type.Decimal64DataType; import nova.hetu.omniruntime.type.Decimal128DataType; +import nova.hetu.omniruntime.type.Decimal64DataType; import nova.hetu.omniruntime.type.DoubleDataType; import nova.hetu.omniruntime.type.IntDataType; -import nova.hetu.omniruntime.type.ShortDataType; import nova.hetu.omniruntime.type.LongDataType; +import nova.hetu.omniruntime.type.ShortDataType; import nova.hetu.omniruntime.type.VarcharDataType; +import nova.hetu.omniruntime.vector.LongVec; +import nova.hetu.omniruntime.vector.Decimal128Vec; +import nova.hetu.omniruntime.vector.IntVec; +import nova.hetu.omniruntime.vector.Vec; +import nova.hetu.omniruntime.vector.VecBatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -66,11 +66,6 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.Type; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.statisticslevel.StatisticsFilter; @@ -84,6 +79,10 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,15 +90,15 @@ import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; public class OmniParquetRecordReader implements RecordReader, StatsProvidingRecordReader { - public static final Logger LOG = LoggerFactory.getLogger(OmniParquetRecordReader.class); + private static final Logger LOG = LoggerFactory.getLogger(OmniParquetRecordReader.class); private static final Map PARQUET_TO_OMNI_TYPE = new HashMap() { { @@ -116,7 +115,8 @@ public class OmniParquetRecordReader } }; - private static final Map NULL_ORIGINAL_TO_OMNI_TYPE = new HashMap() { + private static final Map NULL_ORIGINAL_TO_OMNI_TYPE = + new HashMap() { { put(PrimitiveTypeName.INT32, IntDataType.INTEGER); put(PrimitiveTypeName.INT64, LongDataType.LONG); @@ -131,7 +131,7 @@ public class OmniParquetRecordReader protected Vec[] vecs; protected float progress = 0.0f; protected long splitLen; // for getPos() - protected boolean skipTimestampConversion = false; + protected boolean shouldSkipTimestampConversion = false; protected SerDeStats serDeStats; protected JobConf jobConf; protected ProjectionPusher projectionPusher; @@ -166,11 +166,11 @@ public class OmniParquetRecordReader } this.split = getSplit(oldSplit, jobConf); // create a TaskInputOutputContext - if (skipTimestampConversion + if (shouldSkipTimestampConversion ^ HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { conf = new JobConf(oldJobConf); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION, - skipTimestampConversion); + shouldSkipTimestampConversion); } this.isFilterPredicate = filter instanceof FilterCompat.FilterPredicateCompat; this.recordReader = new ParquetColumnarBatchScanReader(); @@ -221,7 +221,6 @@ public class OmniParquetRecordReader throws IOException { ParquetInputSplit split; if (oldSplit instanceof FileSplit) { - final Path finalPath = ((FileSplit) oldSplit).getPath(); final List blocks = fileFooter.getBlocks(); final FileMetaData fileMetaData = fileFooter.getFileMetaData(); @@ -244,8 +243,10 @@ public class OmniParquetRecordReader } } if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + shouldSkipTimestampConversion = !Strings.nullToEmpty( + fileMetaData.getCreatedBy()).startsWith("parquet-mr"); } + final Path finalPath = ((FileSplit) oldSplit).getPath(); split = new ParquetInputSplit(finalPath, splitStart, splitStart + splitLength, splitLength, null, null); return split; } else { @@ -279,7 +280,7 @@ public class OmniParquetRecordReader List blocks = fileFooter.getBlocks(); for (int i = 0; i < blocks.size(); i++) { BlockMetaData block = blocks.get(i); - long totalSize = 0; + long totalSize = 0L; long startIndex = block.getStartingPos(); for (ColumnChunkMetaData col : block.getColumns()) { totalSize += col.getTotalSize(); @@ -287,9 +288,9 @@ public class OmniParquetRecordReader long midPoint = startIndex + totalSize / 2; if (midPoint >= start && midPoint < end) { if (isFilterPredicate) { - boolean drop = StatisticsFilter.canDrop( + boolean canDrop = StatisticsFilter.canDrop( ((FilterCompat.FilterPredicateCompat) filter).getFilterPredicate(), block.getColumns()); - if (!drop) { + if (!canDrop) { res.add(i); } } else { @@ -300,6 +301,12 @@ public class OmniParquetRecordReader return res; } + /** + * getSignedIntValue + * + * @param str value in string format + * @return Integer value in int format + */ public Integer getSignedIntValue(String str) { try { return Integer.parseInt(str); @@ -308,6 +315,12 @@ public class OmniParquetRecordReader } } + /** + * getSignedLongValue + * + * @param str value in string format + * @return Long value in Long format + */ public Long getSignedLongValue(String str) { try { return Long.parseLong(str); @@ -316,6 +329,14 @@ public class OmniParquetRecordReader } } + /** + * convertUnsignedInt + * + * @param vec unsigned int data is stored in IntVec + * @param isDecimal if table structure is decimal + * @param decimalScale pow of 10, which is used to obtain the true value of decimal + * @return Vec vector of the true data type + */ public Vec convertUnsignedInt(IntVec vec, boolean isDecimal, BigInteger decimalScale) { if (isDecimal) { Decimal128Vec deciVec = new Decimal128Vec(vec.getSize()); @@ -324,7 +345,8 @@ public class OmniParquetRecordReader deciVec.setNull(j); continue; } - BigInteger trueValue = new BigInteger(getSignedIntValue(Integer.toUnsignedString(vec.get(j))).toString()); + BigInteger trueValue = new BigInteger(getSignedIntValue(Integer.toUnsignedString(vec.get(j))) + .toString()); trueValue = trueValue.multiply(decimalScale); if (trueValue == null || trueValue.compareTo(BigInteger.ZERO) < 0) { deciVec.setNull(j); @@ -349,6 +371,14 @@ public class OmniParquetRecordReader } } + /** + * convertUnsignedLong + * + * @param vec unsigned long data is stored in LongVec + * @param isDecimal if table structure is decimal + * @param decimalScale pow of 10, which is used to obtain the true value of decimal + * @return Vec vector of the true data type + */ public Vec convertUnsignedLong(LongVec vec, boolean isDecimal, BigInteger decimalScale) { if (isDecimal) { Decimal128Vec deciVec = new Decimal128Vec(vec.getSize()); @@ -357,7 +387,8 @@ public class OmniParquetRecordReader deciVec.setNull(j); continue; } - BigInteger trueValue = new BigInteger(getSignedLongValue(Long.toUnsignedString(vec.get(j))).toString()); + BigInteger trueValue = new BigInteger(getSignedLongValue(Long.toUnsignedString(vec.get(j))) + .toString()); trueValue = trueValue.multiply(decimalScale); if (trueValue == null || trueValue.compareTo(BigInteger.ZERO) < 0) { deciVec.setNull(j); @@ -382,7 +413,12 @@ public class OmniParquetRecordReader } } - public void processUnIntData() { + /** + * processUnsignedData + * resolve problems: 1. when reading unsignedInt/unsignedLong data from Parquet file + * 2. when the real type stored in Parquet file is different with the type specified by talbe structure + */ + public void processUnsignedData() { List types = this.fileSchema.getFields(); TableScanDesc desc = (TableScanDesc) this.tableScanOp.getConf(); ArrayList infoList = this.tableScanOp.getSchema().getSignature(); @@ -421,7 +457,7 @@ public class OmniParquetRecordReader if (batchSize == 0) { return false; } - processUnIntData(); + processUnsignedData(); value.setVecBatch(new VecBatch(vecs, batchSize)); return true; } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniVectorizedParquetRecordReader.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniVectorizedParquetRecordReader.java index 7ecb51e21..86ca14ec5 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniVectorizedParquetRecordReader.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OmniVectorizedParquetRecordReader.java @@ -23,25 +23,23 @@ import static com.huawei.boostkit.hive.converter.VecConverter.CONVERTER_MAP; import com.huawei.boostkit.hive.converter.VecConverter; -import nova.hetu.omniruntime.vector.Decimal128Vec; import nova.hetu.omniruntime.vector.IntVec; import nova.hetu.omniruntime.vector.LongVec; -import nova.hetu.omniruntime.vector.ShortVec; import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.IOPrepareCache; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.parquet.schema.PrimitiveType; @@ -50,11 +48,10 @@ import org.apache.parquet.schema.Type; import java.io.IOException; import java.math.BigInteger; import java.util.Arrays; -import java.util.Map; import java.util.List; +import java.util.Map; public class OmniVectorizedParquetRecordReader extends OmniParquetRecordReader { - private VecConverter[] partColumnConverters; private final Vec[] withPartCol; private final Object[] partitionValues; @@ -88,7 +85,12 @@ public class OmniVectorizedParquetRecordReader extends OmniParquetRecordReader { } } - public void processUnIntData() { + /** + * processUnsignedData + * resolve problems: 1. when reading unsignedInt/unsignedLong data from Parquet file + * 2. when the real type stored in Parquet file is different with the type specified by talbe structure + */ + public void processUnsignedData() { List types = this.fileSchema.getFields(); TableScanDesc desc = (TableScanDesc) this.tableScanOp.getConf(); VectorTableScanDesc tableDesc = (VectorTableScanDesc) desc.getVectorDesc(); @@ -122,7 +124,7 @@ public class OmniVectorizedParquetRecordReader extends OmniParquetRecordReader { if (batchSize == 0) { return false; } - processUnIntData(); + processUnsignedData(); if (partitionValues != null) { for (int i = 0; i < partitionValues.length; i++) { Object[] partValue = new Object[batchSize]; diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OrcColumnarBatchScanReader.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OrcColumnarBatchScanReader.java index b7be77550..89c40ed4c 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OrcColumnarBatchScanReader.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/OrcColumnarBatchScanReader.java @@ -21,7 +21,6 @@ package com.huawei.boostkit.hive.reader; import com.huawei.boostkit.scan.jni.OrcColumnarBatchJniReader; import nova.hetu.omniruntime.type.DataType; - import nova.hetu.omniruntime.vector.BooleanVec; import nova.hetu.omniruntime.vector.Decimal128Vec; import nova.hetu.omniruntime.vector.DoubleVec; @@ -30,30 +29,21 @@ import nova.hetu.omniruntime.vector.LongVec; import nova.hetu.omniruntime.vector.ShortVec; import nova.hetu.omniruntime.vector.VarcharVec; import nova.hetu.omniruntime.vector.Vec; + import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.orc.OrcFile.ReaderOptions; import org.apache.orc.Reader.Options; +import org.apache.orc.TypeDescription; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.orc.TypeDescription; import java.net.URI; import java.sql.Date; -import java.time.LocalDate; -import java.time.temporal.ChronoField; import java.util.ArrayList; import java.util.Arrays; -import java.util.Calendar; import java.util.List; -import java.util.TimeZone; - -import static java.util.Calendar.DAY_OF_MONTH; -import static java.util.Calendar.ERA; -import static java.util.Calendar.MONTH; -import static java.util.Calendar.YEAR; - public class OrcColumnarBatchScanReader { private static final Logger LOGGER = LoggerFactory.getLogger(OrcColumnarBatchScanReader.class); @@ -105,7 +95,8 @@ public class OrcColumnarBatchScanReader { jsonObject.put("type", pl.getType().ordinal()); if (pl.getLiteral() != null) { if (pl.getType() == PredicateLeaf.Type.DATE) { - jsonObject.put("literal", ((int) Math.ceil(((Date) pl.getLiteral()).getTime() * 1.0 / 3600 / 24 / 1000)) + ""); + jsonObject.put("literal", + ((int) Math.ceil(((Date) pl.getLiteral()).getTime() * 1.0 / 3600 / 24 / 1000)) + ""); } else if (pl.getType() == PredicateLeaf.Type.DECIMAL) { int decimalP = schema.findSubtype(pl.getColumnName()).getPrecision(); int decimalS = schema.findSubtype(pl.getColumnName()).getScale(); @@ -114,7 +105,8 @@ public class OrcColumnarBatchScanReader { jsonObject.put("literal", spiltValues[0] + " " + decimalP + " " + decimalS); } else { String scalePadZeroStr = padZeroForDecimals(spiltValues, decimalS); - jsonObject.put("literal", spiltValues[0] + "." + scalePadZeroStr + " " + decimalP + " " + decimalS); + jsonObject.put("literal", + spiltValues[0] + "." + scalePadZeroStr + " " + decimalP + " " + decimalS); } } else { jsonObject.put("literal", pl.getLiteral().toString()); @@ -153,7 +145,9 @@ public class OrcColumnarBatchScanReader { /** * Init Orc reader. * + * @param uri uri * @param options split file options + * @return long */ public long initializeReaderJava(URI uri, ReaderOptions options) { JSONObject job = new JSONObject(); @@ -178,6 +172,7 @@ public class OrcColumnarBatchScanReader { * Init Orc RecordReader. * * @param options split file options + * @return long */ public long initializeRecordReaderJava(Options options) { JSONObject job = new JSONObject(); @@ -292,8 +287,8 @@ public class OrcColumnarBatchScanReader { break; } default: { - throw new RuntimeException("UnSupport type for ColumnarFileScan:" + - DataType.DataTypeId.values()[typeIds[i]]); + throw new RuntimeException("UnSupport type for ColumnarFileScan:" + + DataType.DataTypeId.values()[typeIds[i]]); } } nativeGetId++; diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/ParquetColumnarBatchScanReader.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/ParquetColumnarBatchScanReader.java index 72b61b303..a6dfd261c 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/ParquetColumnarBatchScanReader.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/ParquetColumnarBatchScanReader.java @@ -21,23 +21,23 @@ package com.huawei.boostkit.hive.reader; import com.huawei.boostkit.scan.jni.ParquetColumnarBatchJniReader; import nova.hetu.omniruntime.type.BooleanDataType; -import nova.hetu.omniruntime.type.IntDataType; -import nova.hetu.omniruntime.type.DoubleDataType; +import nova.hetu.omniruntime.type.CharDataType; +import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.type.Date32DataType; import nova.hetu.omniruntime.type.Decimal128DataType; import nova.hetu.omniruntime.type.Decimal64DataType; +import nova.hetu.omniruntime.type.DoubleDataType; +import nova.hetu.omniruntime.type.IntDataType; import nova.hetu.omniruntime.type.LongDataType; import nova.hetu.omniruntime.type.ShortDataType; -import nova.hetu.omniruntime.type.CharDataType; import nova.hetu.omniruntime.type.VarcharDataType; -import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.vector.BooleanVec; -import nova.hetu.omniruntime.vector.IntVec; import nova.hetu.omniruntime.vector.Decimal128Vec; +import nova.hetu.omniruntime.vector.DoubleVec; +import nova.hetu.omniruntime.vector.IntVec; import nova.hetu.omniruntime.vector.LongVec; import nova.hetu.omniruntime.vector.ShortVec; import nova.hetu.omniruntime.vector.VarcharVec; -import nova.hetu.omniruntime.vector.DoubleVec; import nova.hetu.omniruntime.vector.Vec; import org.apache.hadoop.fs.Path; @@ -45,9 +45,8 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.ArrayList; import java.net.URI; +import java.util.List; public class ParquetColumnarBatchScanReader { private static final Logger LOGGER = LoggerFactory.getLogger(ParquetColumnarBatchScanReader.class); @@ -61,7 +60,7 @@ public class ParquetColumnarBatchScanReader { } public long initializeReaderJava(Path path, int capacity, - List rowgroupIndices, List columnIndices, String ugi) { + List rowgroupIndices, List columnIndices, String ugi) { JSONObject job = new JSONObject(); URI uri = path.toUri(); job.put("uri", path.toString()); @@ -110,10 +109,10 @@ public class ParquetColumnarBatchScanReader { } else if (type instanceof Date32DataType) { vecList[i] = new IntVec(vecNativeIds[i]); } else { - throw new RuntimeException("Unsupport type for ColumnarFileScan: " +type.getId()); + throw new RuntimeException("Unsupport type for ColumnarFileScan: " + type.getId()); } } - return (int)rtn; + return (int) rtn; } public void close() { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/VecBatchWrapperSerde.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/VecBatchWrapperSerde.java index 5cc881570..2e3932a43 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/VecBatchWrapperSerde.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/VecBatchWrapperSerde.java @@ -42,17 +42,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; -/** - * A serde class for VecBatchWrapper. - */ @SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES, COMPRESSION}) public class VecBatchWrapperSerde extends AbstractSerDe { + public static final String COMPRESSION = ""; + private static final Logger LOG = LoggerFactory.getLogger(VecBatchWrapperSerde.class); private ObjectInspector inspector; - public static final String COMPRESSION = ""; - @Override public void initialize(Configuration conf, Properties table) { String columnNameProperty = table.getProperty(serdeConstants.LIST_COLUMNS); @@ -85,9 +82,11 @@ public class VecBatchWrapperSerde extends AbstractSerDe { fieldTypes.forEach(fieldType -> { PrimitiveObjectInspector primitiveObjectInspector; if (fieldType instanceof PrimitiveTypeInfo) { - primitiveObjectInspector = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector((PrimitiveTypeInfo) fieldType); + primitiveObjectInspector = PrimitiveObjectInspectorFactory + .getPrimitiveJavaObjectInspector((PrimitiveTypeInfo) fieldType); } else { - primitiveObjectInspector = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.VOID); + primitiveObjectInspector = PrimitiveObjectInspectorFactory + .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.VOID); } structFieldObjectInspectors.add(primitiveObjectInspector); }); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java index 663d9d9d1..b0895a855 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java @@ -19,8 +19,9 @@ package com.huawei.boostkit.hive.shuffle; public class FixedWidthColumnSerDe implements ColumnSerDe { - protected int columnTypeLen; protected static byte[] EMPTY = new byte[16]; + + protected int columnTypeLen; protected transient byte columnNullMarker; public FixedWidthColumnSerDe(int columnTypeLen) { @@ -30,35 +31,37 @@ public class FixedWidthColumnSerDe implements ColumnSerDe { @Override public int serialize(byte[] writeBytes, VecWrapper vecWrapper, int offset) { int index = vecWrapper.index; + int totalLen = offset; if (vecWrapper.isNull[index] == 1) { - writeBytes[offset] = -1; - ++offset; - return offset; + writeBytes[totalLen] = -1; + ++totalLen; + return totalLen; } int valueLen = trimBytes(vecWrapper.value, index * columnTypeLen, columnTypeLen); - writeBytes[offset] = (byte) valueLen; - ++offset; + writeBytes[totalLen] = (byte) valueLen; + ++totalLen; // write value array - System.arraycopy(vecWrapper.value, index * columnTypeLen, writeBytes, offset, valueLen); - offset = offset + valueLen; - return offset; + System.arraycopy(vecWrapper.value, index * columnTypeLen, writeBytes, totalLen, valueLen); + totalLen = totalLen + valueLen; + return totalLen; } @Override public int deserialize(VecSerdeBody vecSerdeBody, byte[] bytes, int offset) { - if (bytes[offset] == -1) { + int totalLen = offset; + if (bytes[totalLen] == -1) { vecSerdeBody.isNull = 1; - ++offset; + ++totalLen; System.arraycopy(EMPTY, 0, vecSerdeBody.value, 0, columnTypeLen); - return offset; + return totalLen; } vecSerdeBody.isNull = 0; - int length = bytes[offset]; - ++offset; - System.arraycopy(bytes, offset, vecSerdeBody.value, 0, length); + int length = bytes[totalLen]; + ++totalLen; + System.arraycopy(bytes, totalLen, vecSerdeBody.value, 0, length); System.arraycopy(EMPTY, 0, vecSerdeBody.value, length, columnTypeLen - length); - offset = offset + length; - return offset; + totalLen = totalLen + length; + return totalLen; } private int trimBytes(byte[] bytes, int start, int length) { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSortSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSortSerDe.java index 4cbe44573..f69a22589 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSortSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSortSerDe.java @@ -24,7 +24,8 @@ public class FixedWidthColumnSortSerDe extends FixedWidthColumnSerDe { private final DataType.DataTypeId dataTypeId; private final boolean invert; - public FixedWidthColumnSortSerDe(int columnTypeLen, byte columnNullMarker, DataType.DataTypeId dataTypeId, boolean invert) { + public FixedWidthColumnSortSerDe(int columnTypeLen, byte columnNullMarker, + DataType.DataTypeId dataTypeId, boolean invert) { super(columnTypeLen); this.columnNullMarker = columnNullMarker; this.dataTypeId = dataTypeId; @@ -33,33 +34,35 @@ public class FixedWidthColumnSortSerDe extends FixedWidthColumnSerDe { @Override public int serialize(byte[] writeBytes, VecWrapper vecWrapper, int offset) { + int totalLen = offset; int index = vecWrapper.index; if (vecWrapper.isNull[index] == 1) { if (columnNullMarker == 1) { - writeBytes[offset++] = (byte) (columnTypeLen - 1); + writeBytes[totalLen++] = (byte) (columnTypeLen - 1); } else { - writeBytes[offset++] = (byte) (columnTypeLen + 1); + writeBytes[totalLen++] = (byte) (columnTypeLen + 1); } - return offset; + return totalLen; } - writeBytes[offset++] = (byte) columnTypeLen; - SerDeUtils.getSerializeByte(vecWrapper.value, index * columnTypeLen, writeBytes, offset, dataTypeId, invert); - offset = offset + columnTypeLen; - return offset; + writeBytes[totalLen++] = (byte) columnTypeLen; + SerDeUtils.getSerializeByte(vecWrapper.value, index * columnTypeLen, writeBytes, totalLen, dataTypeId, invert); + totalLen = totalLen + columnTypeLen; + return totalLen; } @Override public int deserialize(VecSerdeBody vecSerdeBody, byte[] bytes, int offset) { - if (bytes[offset] == columnTypeLen + 1 || bytes[offset] == columnTypeLen - 1) { + int totalLen = offset; + if (bytes[totalLen] == columnTypeLen + 1 || bytes[totalLen] == columnTypeLen - 1) { vecSerdeBody.isNull = 1; - ++offset; + ++totalLen; System.arraycopy(EMPTY, 0, vecSerdeBody.value, 0, columnTypeLen); - return offset; + return totalLen; } vecSerdeBody.isNull = 0; - offset++; - SerDeUtils.getDeserializeByte(bytes, offset, vecSerdeBody.value, 0, dataTypeId, invert); - offset = offset + columnTypeLen; - return offset; + totalLen++; + SerDeUtils.getDeserializeByte(bytes, totalLen, vecSerdeBody.value, 0, dataTypeId, invert); + totalLen = totalLen + columnTypeLen; + return totalLen; } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchOrderSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchOrderSerDe.java index a97cbeb1e..27267cba4 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchOrderSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchOrderSerDe.java @@ -21,10 +21,11 @@ package com.huawei.boostkit.hive.shuffle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.SerDeException; -import javax.annotation.Nullable; import java.util.Properties; -public class OmniVecBatchOrderSerDe extends OmniVecBatchSerDe{ +import javax.annotation.Nullable; + +public class OmniVecBatchOrderSerDe extends OmniVecBatchSerDe { public OmniVecBatchOrderSerDe() throws SerDeException { } public void initialize(@Nullable Configuration configuration, Properties properties) throws SerDeException { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchSerDe.java index fdaa58253..fe3834ed5 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchSerDe.java @@ -18,8 +18,11 @@ package com.huawei.boostkit.hive.shuffle; +import static com.huawei.boostkit.hive.expression.TypeUtils.HIVE_TO_OMNI_TYPE; + import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.type.ShortDataType; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -48,33 +51,14 @@ import java.util.Properties; import javax.annotation.Nullable; -import static com.huawei.boostkit.hive.expression.TypeUtils.HIVE_TO_OMNI_TYPE; - public class OmniVecBatchSerDe extends AbstractSerDe { - public static final Logger LOG = LoggerFactory.getLogger(OmniVecBatchSerDe.class.getName()); - List columnNames; - List columnTypes; - TypeInfo rowTypeInfo; - ObjectInspector cachedObjectInspector; - int serializedSize; - int deserializedSize; - SerDeStats stats; - boolean lastOperationSerialize; - boolean lastOperationDeserialize; - BytesWritable serializeBytesWritable = new BytesWritable(); - private transient VecSerdeBody[] deserializeResult; - - private transient int[] columnTypeLen; + private static final Logger LOG = LoggerFactory.getLogger(OmniVecBatchSerDe.class.getName()); - private transient ColumnSerDe[] columnSerDes; - - private transient boolean[] columnSortOrderIsDesc; - private transient byte[] columnNullMarker; - protected transient boolean isTopN = false; public static final byte ZERO = (byte) 0; public static final byte ONE = (byte) 1; - public static final Map TYPE_LEN = new HashMap() { + public static final Map TYPE_LEN = + new HashMap() { { put(PrimitiveObjectInspector.PrimitiveCategory.BYTE, 1); put(PrimitiveObjectInspector.PrimitiveCategory.SHORT, 2); @@ -88,6 +72,26 @@ public class OmniVecBatchSerDe extends AbstractSerDe { } }; + protected transient boolean isTopN = false; + + List columnNames; + List columnTypes; + TypeInfo rowTypeInfo; + ObjectInspector cachedObjectInspector; + int serializedSize; + int deserializedSize; + SerDeStats stats; + boolean lastOperationSerialize; + boolean lastOperationDeserialize; + BytesWritable serializeBytesWritable = new BytesWritable(); + + private transient VecSerdeBody[] deserializeResult; + private transient int[] columnTypeLen; + private transient ColumnSerDe[] columnSerDes; + private transient boolean[] columnSortOrderIsDesc; + private transient byte[] columnNullMarker; + private long[] fieldId; + public OmniVecBatchSerDe() throws SerDeException { } @@ -113,8 +117,10 @@ public class OmniVecBatchSerDe extends AbstractSerDe { assert this.columnNames.size() == this.columnTypes.size(); this.rowTypeInfo = TypeInfoFactory.getStructTypeInfo(this.columnNames, this.columnTypes); this.cachedObjectInspector = LazyBinaryUtils.getLazyBinaryObjectInspectorFromTypeInfo(this.rowTypeInfo); - LOG.debug("LazyBinarySerDe initialized with: columnNames=" + this.columnNames + " columnTypes=" - + this.columnTypes); + if (LOG.isDebugEnabled()) { + LOG.debug("LazyBinarySerDe initialized with: columnNames=" + this.columnNames + " columnTypes=" + + this.columnTypes); + } this.serializedSize = 0; this.stats = new SerDeStats(); this.lastOperationSerialize = false; @@ -185,7 +191,8 @@ public class OmniVecBatchSerDe extends AbstractSerDe { if (primitiveCategory == PrimitiveObjectInspector.PrimitiveCategory.DECIMAL) { DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) columnTypes.get(i); columnTypeLen[i] = decimalTypeInfo.getPrecision() > 18 ? 16 : 8; - dataTypeId = decimalTypeInfo.getPrecision() > 18 ? DataType.DataTypeId.OMNI_DECIMAL128 : DataType.DataTypeId.OMNI_DECIMAL64; + dataTypeId = decimalTypeInfo.getPrecision() > 18 + ? DataType.DataTypeId.OMNI_DECIMAL128 : DataType.DataTypeId.OMNI_DECIMAL64; } else { columnTypeLen[i] = TYPE_LEN.getOrDefault(primitiveCategory, 0); dataTypeId = HIVE_TO_OMNI_TYPE.getOrDefault(primitiveCategory, ShortDataType.SHORT).getId(); @@ -204,7 +211,8 @@ public class OmniVecBatchSerDe extends AbstractSerDe { } else { writeLen = writeLen + columnTypeLen[i] + 1; if (isTopN) { - columnSerDes[i] = new FixedWidthColumnSortSerDe(columnTypeLen[i], columnNullMarker[i], dataTypeId, columnSortOrderIsDesc[i]); + columnSerDes[i] = new FixedWidthColumnSortSerDe(columnTypeLen[i], + columnNullMarker[i], dataTypeId, columnSortOrderIsDesc[i]); } else { columnSerDes[i] = new FixedWidthColumnSerDe(columnTypeLen[i]); } @@ -229,12 +237,18 @@ public class OmniVecBatchSerDe extends AbstractSerDe { return BytesWritable.class; } - private long[] fieldId; - public void setFieldId(long[] fieldId) { this.fieldId = fieldId; } + /** + * serialize + * + * @param obj obj + * @param objectInspector objectInspector + * @return Writable + * @throws SerDeException SerDeException + */ public Writable serialize(Object obj, ObjectInspector objectInspector) throws SerDeException { if (fieldId.length == 0) { return this.serializeBytesWritable; @@ -252,6 +266,11 @@ public class OmniVecBatchSerDe extends AbstractSerDe { return this.serializeBytesWritable; } + /** + * getSerDeStats + * + * @return SerDeStats + */ public SerDeStats getSerDeStats() { assert this.lastOperationSerialize != this.lastOperationDeserialize; @@ -264,6 +283,12 @@ public class OmniVecBatchSerDe extends AbstractSerDe { return this.stats; } + /** + * deserialize + * + * @param field field + * @return Object + */ public Object deserialize(Writable field) { BytesWritable b = (BytesWritable) field; this.deserializedSize = b.getLength(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/SerDeUtils.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/SerDeUtils.java index 5fb157564..edd87ccd0 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/SerDeUtils.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/SerDeUtils.java @@ -21,7 +21,8 @@ package com.huawei.boostkit.hive.shuffle; import nova.hetu.omniruntime.type.DataType; public class SerDeUtils { - public static void getSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, DataType.DataTypeId typeId, boolean invert) { + public static void getSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, DataType.DataTypeId typeId, boolean invert) { switch (typeId) { case OMNI_BOOLEAN: getBooleanSerializeByte(originalByte, startOriginal, resultByte, startResult, invert); @@ -47,7 +48,8 @@ public class SerDeUtils { } } - public static void getDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, DataType.DataTypeId typeId, boolean invert) { + public static void getDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, DataType.DataTypeId typeId, boolean invert) { switch (typeId) { case OMNI_BOOLEAN: getBooleanDeserializeByte(originalByte, startOriginal, resultByte, startResult, invert); @@ -73,39 +75,46 @@ public class SerDeUtils { } } - public static void getBooleanSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getBooleanSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte(originalByte[startOriginal], invert); } - public static void getBooleanDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getBooleanDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte(originalByte[startOriginal], invert); } - public static void getIntSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getIntSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte((byte) (originalByte[startOriginal + 3] ^ 0x80), invert); resultByte[startResult + 1] = dealByte(originalByte[startOriginal + 2], invert); resultByte[startResult + 2] = dealByte(originalByte[startOriginal + 1], invert); resultByte[startResult + 3] = dealByte(originalByte[startOriginal], invert); } - public static void getIntDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getIntDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte(originalByte[startOriginal + 3], invert); resultByte[startResult + 1] = dealByte(originalByte[startOriginal + 2], invert); resultByte[startResult + 2] = dealByte(originalByte[startOriginal + 1], invert); resultByte[startResult + 3] = dealByte((byte) (originalByte[startOriginal] ^ 0x80), invert); } - public static void getShortSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getShortSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte((byte) (originalByte[startOriginal + 1] ^ 0x80), invert); resultByte[startResult + 1] = dealByte(originalByte[startOriginal], invert); } - public static void getShortDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getShortDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte(originalByte[startOriginal + 1], invert); resultByte[startResult + 1] = dealByte((byte) (originalByte[startOriginal] ^ 0x80), invert); } - public static void getLongSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getLongSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte((byte) (originalByte[startOriginal + 7] ^ 0x80), invert); resultByte[startResult + 1] = dealByte(originalByte[startOriginal + 6], invert); resultByte[startResult + 2] = dealByte(originalByte[startOriginal + 5], invert); @@ -116,7 +125,8 @@ public class SerDeUtils { resultByte[startResult + 7] = dealByte(originalByte[startOriginal], invert); } - public static void getLongDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getLongDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte(originalByte[startOriginal + 7], invert); resultByte[startResult + 1] = dealByte(originalByte[startOriginal + 6], invert); resultByte[startResult + 2] = dealByte(originalByte[startOriginal + 5], invert); @@ -127,7 +137,8 @@ public class SerDeUtils { resultByte[startResult + 7] = dealByte((byte) (originalByte[startOriginal] ^ 0x80), invert); } - public static void getDoubleSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getDoubleSerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { if ((originalByte[startOriginal + 7] & ((byte) 1 << 7)) != 0) { originalByte[startOriginal + 7] = (byte) (0xff ^ originalByte[startOriginal + 7]); originalByte[startOriginal + 6] = (byte) (0xff ^ originalByte[startOriginal + 6]); @@ -150,7 +161,8 @@ public class SerDeUtils { resultByte[startResult + 7] = dealByte(originalByte[startOriginal], invert); } - public static void getDoubleDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getDoubleDeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { if ((dealByte(originalByte[startOriginal], invert) & ((byte) 1 << 7)) == 0) { originalByte[startOriginal] = (byte) (0xff ^ originalByte[startOriginal]); originalByte[startOriginal + 1] = (byte) (0xff ^ originalByte[startOriginal + 1]); @@ -173,7 +185,8 @@ public class SerDeUtils { resultByte[startResult + 7] = dealByte(originalByte[startOriginal], invert); } - public static void getDecimal128SerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getDecimal128SerializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte((byte) (originalByte[startOriginal + 15] ^ 0x80), invert); resultByte[startResult + 1] = dealByte(originalByte[startOriginal + 14], invert); resultByte[startResult + 2] = dealByte(originalByte[startOriginal + 13], invert); @@ -192,7 +205,8 @@ public class SerDeUtils { resultByte[startResult + 15] = dealByte(originalByte[startOriginal], invert); } - public static void getDecimal128DeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, int startResult, boolean invert) { + public static void getDecimal128DeserializeByte(byte[] originalByte, int startOriginal, byte[] resultByte, + int startResult, boolean invert) { resultByte[startResult] = dealByte(originalByte[startOriginal + 15], invert); resultByte[startResult + 1] = dealByte(originalByte[startOriginal + 14], invert); resultByte[startResult + 2] = dealByte(originalByte[startOriginal + 13], invert); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnAscSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnAscSerDe.java index 68c87f13f..9c19fc115 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnAscSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnAscSerDe.java @@ -18,7 +18,7 @@ package com.huawei.boostkit.hive.shuffle; -public class VariableWidthColumnAscSerDe implements ColumnSerDe{ +public class VariableWidthColumnAscSerDe implements ColumnSerDe { private transient byte columnNullMarker; public VariableWidthColumnAscSerDe(byte columnNullMarker) { @@ -26,42 +26,44 @@ public class VariableWidthColumnAscSerDe implements ColumnSerDe{ } @Override public int serialize(byte[] writeBytes, VecWrapper vecWrapper, int offset) { + int totalLen = offset; int index = vecWrapper.index; if (vecWrapper.isNull[index] == 1) { if (columnNullMarker == 1) { - writeBytes[offset++] = (byte) 0x7; + writeBytes[totalLen++] = (byte) 0x7; } else { - writeBytes[offset++] = (byte) 0x9; + writeBytes[totalLen++] = (byte) 0x9; } - writeBytes[offset++] = (byte) 0; - return offset; + writeBytes[totalLen++] = (byte) 0; + return totalLen; } int valueLen = vecWrapper.offset[index + 1] - vecWrapper.offset[index]; - writeBytes[offset++] = (byte) 0x8; + writeBytes[totalLen++] = (byte) 0x8; for (int i = 0; i < valueLen; i++) { - offset = writeByte(offset, writeBytes, vecWrapper.value[vecWrapper.offset[index] + i]); + totalLen = writeByte(totalLen, writeBytes, vecWrapper.value[vecWrapper.offset[index] + i]); } - writeBytes[offset++] = 0; - return offset; + writeBytes[totalLen++] = 0; + return totalLen; } @Override public int deserialize(VecSerdeBody vecSerdeBody, byte[] bytes, int offset) { - offset++; - if (bytes[offset] == 0) { - if (bytes[offset - 1] == 0x8) { + int totalLen = offset; + totalLen++; + if (bytes[totalLen] == 0) { + if (bytes[totalLen - 1] == 0x8) { vecSerdeBody.isNull = 0; } else { vecSerdeBody.isNull = 1; } vecSerdeBody.length = 0; - ++offset; - return offset; + ++totalLen; + return totalLen; } vecSerdeBody.isNull = 0; int vecSerdeBodyValueIndex = 0; int escapeCharNum = 0; - for (int i = offset; i < bytes.length && bytes[i] != 0; i++) { + for (int i = totalLen; i < bytes.length && bytes[i] != 0; i++) { if (bytes[i] == 1) { vecSerdeBody.value[vecSerdeBodyValueIndex++] = (byte) (bytes[++i] - 1); escapeCharNum++; @@ -70,16 +72,17 @@ public class VariableWidthColumnAscSerDe implements ColumnSerDe{ } } vecSerdeBody.length = vecSerdeBodyValueIndex; - return (offset + vecSerdeBodyValueIndex + escapeCharNum + 1); + return (totalLen + vecSerdeBodyValueIndex + escapeCharNum + 1); } public int writeByte(int offset, byte[] writeBytes, byte byteValue) { + int totalLen = offset; if (byteValue == 0 || byteValue == 1) { - writeBytes[offset++] = (byte) (1); - writeBytes[offset++] = (byte) (byteValue + 1); + writeBytes[totalLen++] = (byte) (1); + writeBytes[totalLen++] = (byte) (byteValue + 1); } else { - writeBytes[offset++] = byteValue; + writeBytes[totalLen++] = byteValue; } - return offset; + return totalLen; } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnDescSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnDescSerDe.java index 6461978cf..5bdaac8ba 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnDescSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnDescSerDe.java @@ -16,10 +16,9 @@ * limitations under the License. */ - package com.huawei.boostkit.hive.shuffle; -public class VariableWidthColumnDescSerDe implements ColumnSerDe{ +public class VariableWidthColumnDescSerDe implements ColumnSerDe { private transient byte columnNullMarker; public VariableWidthColumnDescSerDe(byte columnNullMarker) { @@ -27,42 +26,44 @@ public class VariableWidthColumnDescSerDe implements ColumnSerDe{ } @Override public int serialize(byte[] writeBytes, VecWrapper vecWrapper, int offset) { + int totalLen = offset; int index = vecWrapper.index; if (vecWrapper.isNull[index] == 1) { if (columnNullMarker == 1) { - writeBytes[offset++] = (byte) 0x7; + writeBytes[totalLen++] = (byte) 0x7; } else { - writeBytes[offset++] = (byte) 0x9; + writeBytes[totalLen++] = (byte) 0x9; } - writeBytes[offset++] = (byte) 0xff; - return offset; + writeBytes[totalLen++] = (byte) 0xff; + return totalLen; } int valueLen = vecWrapper.offset[index + 1] - vecWrapper.offset[index]; - writeBytes[offset++] = (byte) 0x8; + writeBytes[totalLen++] = (byte) 0x8; for (int i = 0; i < valueLen; i++) { - offset = writeByte(offset, writeBytes, vecWrapper.value[vecWrapper.offset[index] + i]); + totalLen = writeByte(totalLen, writeBytes, vecWrapper.value[vecWrapper.offset[index] + i]); } - writeBytes[offset++] = (byte) 0xff; - return offset; + writeBytes[totalLen++] = (byte) 0xff; + return totalLen; } @Override public int deserialize(VecSerdeBody vecSerdeBody, byte[] bytes, int offset) { - offset++; - if (bytes[offset] == (byte) 0xff) { - if (bytes[offset - 1] == 0x8) { + int totalLen = offset; + totalLen++; + if (bytes[totalLen] == (byte) 0xff) { + if (bytes[totalLen - 1] == 0x8) { vecSerdeBody.isNull = 0; } else { vecSerdeBody.isNull = 1; } vecSerdeBody.length = 0; - ++offset; - return offset; + ++totalLen; + return totalLen; } vecSerdeBody.isNull = 0; int vecSerdeBodyValueIndex = 0; int escapeCharNum = 0; - for (int i = offset; i < bytes.length && bytes[i] != (byte) 0xff; i++) { + for (int i = totalLen; i < bytes.length && bytes[i] != (byte) 0xff; i++) { if ((bytes[i] ^ 0xff) == 1) { vecSerdeBody.value[vecSerdeBodyValueIndex++] = (byte) ((0xff ^ bytes[++i]) - 1); escapeCharNum++; @@ -71,16 +72,17 @@ public class VariableWidthColumnDescSerDe implements ColumnSerDe{ } } vecSerdeBody.length = vecSerdeBodyValueIndex; - return (offset + vecSerdeBodyValueIndex + escapeCharNum + 1); + return (totalLen + vecSerdeBodyValueIndex + escapeCharNum + 1); } public int writeByte(int offset, byte[] writeBytes, byte byteValue) { + int totalLen = offset; if (byteValue == (byte) 0xff || byteValue == 1) { - writeBytes[offset++] = (byte) (0xff ^ 1); - writeBytes[offset++] = (byte) (0xff ^ (byteValue + 1)); + writeBytes[totalLen++] = (byte) (0xff ^ 1); + writeBytes[totalLen++] = (byte) (0xff ^ (byteValue + 1)); } else { - writeBytes[offset++] = (byte) (0xff ^ byteValue); + writeBytes[totalLen++] = (byte) (0xff ^ byteValue); } - return offset; + return totalLen; } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnSerDe.java index 91133d1d5..c70b73804 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/VariableWidthColumnSerDe.java @@ -24,39 +24,40 @@ public class VariableWidthColumnSerDe implements ColumnSerDe { private byte[] lengthBytes = new byte[4]; private LazyBinaryUtils.VInt vInt = new LazyBinaryUtils.VInt(); - @Override public int serialize(byte[] writeBytes, VecWrapper vecWrapper, int offset) { + int totalLen = offset; int index = vecWrapper.index; if (vecWrapper.isNull[index] == 1) { - writeBytes[offset] = -1; - ++offset; - return offset; + writeBytes[totalLen] = -1; + ++totalLen; + return totalLen; } // write length int valueLen = vecWrapper.offset[index + 1] - vecWrapper.offset[index]; int len = LazyBinaryUtils.writeVLongToByteArray(lengthBytes, valueLen); - System.arraycopy(lengthBytes, 0, writeBytes, offset, len); - offset = offset + len; + System.arraycopy(lengthBytes, 0, writeBytes, totalLen, len); + totalLen = totalLen + len; // write value array - System.arraycopy(vecWrapper.value, vecWrapper.offset[index], writeBytes, offset, valueLen); - offset = offset + valueLen; - return offset; + System.arraycopy(vecWrapper.value, vecWrapper.offset[index], writeBytes, totalLen, valueLen); + totalLen = totalLen + valueLen; + return totalLen; } @Override public int deserialize(VecSerdeBody vecSerdeBody, byte[] bytes, int offset) { - if (bytes[offset] == -1) { + int totalLen = offset; + if (bytes[totalLen] == -1) { vecSerdeBody.isNull = 1; vecSerdeBody.length = 0; - ++offset; - return offset; + ++totalLen; + return totalLen; } vecSerdeBody.isNull = 0; - LazyBinaryUtils.readVInt(bytes, offset, vInt); + LazyBinaryUtils.readVInt(bytes, totalLen, vInt); vecSerdeBody.length = vInt.value; - System.arraycopy(bytes, offset + vInt.length, vecSerdeBody.value, 0, vInt.value); - offset = offset + vInt.length + vInt.value; - return offset; + System.arraycopy(bytes, totalLen + vInt.length, vecSerdeBody.value, 0, vInt.value); + totalLen = totalLen + vInt.length + vInt.value; + return totalLen; } } -- Gitee