diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java index 9fff88495390dc4e0b3dcc974dd6c10a8806f04d..a0c592f7e9de09f7ec0db9bc26c59f148258d9f3 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java @@ -109,10 +109,6 @@ import java.util.stream.Collectors; public class OmniMapJoinOperator extends AbstractMapJoinOperator implements Serializable, VectorizationContextRegion { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(OmniMapJoinOperator.class.getName()); - public static final Map JOIN_TYPE_MAP = new HashMap() { { put(JoinDesc.INNER_JOIN, JoinType.OMNI_JOIN_TYPE_INNER); @@ -122,7 +118,14 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator put(JoinDesc.LEFT_SEMI_JOIN, JoinType.OMNI_JOIN_TYPE_LEFT_SEMI); } }; + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(OmniMapJoinOperator.class.getName()); + private static boolean isAddedCloseThread; + private static Map countShareBuildIds = new HashMap<>(); + private static int buildNodeId; + private transient OmniOperatorFactory omniLookupJoinWithExprOperatorFactory; + private transient OmniHashBuilderWithExprOperatorFactory omniHashBuilderWithExprOperatorFactory; private transient OmniOperator joinOperator; @@ -144,10 +147,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator private transient List[] buildVecs; private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; - private static boolean addedCloseThread; private transient Iterator output; - private static int buildNodeId; - private static Map countShareBuildIds = new HashMap<>(); + public OmniMapJoinOperator() { super(); @@ -202,11 +203,13 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator if (!conf.isDynamicPartitionHashJoin()) { OmniHashBuilderWithExprOperatorFactory.gLock.lock(); try { - omniHashBuilderWithExprOperatorFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildNodeId); + omniHashBuilderWithExprOperatorFactory = + OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildNodeId); Integer countShareBuildId = countShareBuildIds.getOrDefault(buildNodeId, 0); countShareBuildIds.put(buildNodeId, ++countShareBuildId); if (omniHashBuilderWithExprOperatorFactory == null) { - omniHashBuilderWithExprOperatorFactory = getOmniHashBuilderWithExprOperatorFactory(joinType, buildTypes, + omniHashBuilderWithExprOperatorFactory = + getOmniHashBuilderWithExprOperatorFactory(joinType, buildTypes, buildIndexes.get(buildIndexes.size() - 1)); buildOperator = omniHashBuilderWithExprOperatorFactory.createOperator(); OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(buildNodeId, @@ -239,7 +242,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator if (!canLoadCache && !isInputFileChangeSensitive(mapContext)) { loadBuildVec(mapContext, mrContext); } - if (!addedCloseThread) { + if (!isAddedCloseThread) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { for (Map.Entry entry : countShareBuildIds.entrySet()) { @@ -257,7 +260,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator LOG.error("release operatorFactor failed", e); } })); - addedCloseThread = true; + isAddedCloseThread = true; } } @@ -297,7 +300,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator // result as input of buildOperator List buildList = new ArrayList<>(); List probeVec; - OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories = new OmniHashBuilderWithExprOperatorFactory[buildIndexes.size() - 1]; + OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories = + new OmniHashBuilderWithExprOperatorFactory[buildIndexes.size() - 1]; OmniOperatorFactory[] innerJoinOperatorFactories = new OmniOperatorFactory[buildIndexes.size() - 1]; OmniOperator[] innerBuildOperators = new OmniOperator[buildIndexes.size() - 1]; OmniOperator[] innerJoinOperators = new OmniOperator[buildIndexes.size() - 1]; @@ -336,7 +340,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } } } - closeInnerOperators(innerBuildOperators, innerJoinOperators, innerBuildFactories, innerJoinOperatorFactories); + closeInnerOperators(innerBuildOperators, + innerJoinOperators, innerBuildFactories, innerJoinOperatorFactories); buildOperator.getOutput(); } else { List cacheVecBatches = getVectorFromCache(1 - posBigTable); @@ -390,7 +395,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator int rowLength = ObjectInspectorUtils.getStructSize(serde.getObjectInspector()); // if output includes key and key doesn't have expression, then the value of // hashMapWrapper will not contain key. - boolean needAddKey = rowLength < joinValuesObjectInspectors[pos].size(); + boolean isNeedAddKey = rowLength < joinValuesObjectInspectors[pos].size(); Object[] valueArray = new Object[rowLength]; int rowCount = 0; Object[] keyValueArray = new Object[buildInspectors[pos].size()]; @@ -407,7 +412,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator ObjectInspectorUtils.copyStructToArray(serde.deserialize(value), serde.getObjectInspector(), ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE, valueArray, 0); } - if (needAddKey) { + if (isNeedAddKey) { valueList = new ArrayList<>(Arrays.asList(valueArray)); for (Map.Entry keyValueEntry : valuePosToKeyPos[pos].entrySet()) { valueList.add(keyValueEntry.getKey(), key[keyValueEntry.getValue()]); @@ -465,7 +470,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator int rowLength = ObjectInspectorUtils.getStructSize(valueSerde.getObjectInspector()); // if output includes key and key doesn't have expression, then the value of // hashMapWrapper will not contain key. - boolean needAddKey = rowLength < joinValuesObjectInspectors[pos].size(); + boolean isNeedAddKey = rowLength < joinValuesObjectInspectors[pos].size(); int rowCount = 0; VecSerdeBody[] key; Writable currentKey; @@ -481,12 +486,12 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } rowCount++; if (rowCount == BATCH) { - setBuildVecBatches(pos, buildVecBatches, vecBufferCache, needAddKey, rowCount, keyLength); + setBuildVecBatches(pos, buildVecBatches, vecBufferCache, isNeedAddKey, rowCount, keyLength); rowCount = 0; } } if (rowCount > 0) { - setBuildVecBatches(pos, buildVecBatches, vecBufferCache, needAddKey, rowCount, keyLength); + setBuildVecBatches(pos, buildVecBatches, vecBufferCache, isNeedAddKey, rowCount, keyLength); } return buildVecBatches; } catch (Exception e) { @@ -495,7 +500,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } private void setBuildVecBatches(int pos, List buildVecBatches, VecBufferCache vecBufferCache, - boolean needAddKey, int rowCount, int keyLength) { + boolean isNeedAddKey, int rowCount, int keyLength) { Vec[] keyValueVecs = new Vec[buildInspectors[pos].size()]; Vec[] cachedVecs = vecBufferCache.getValueVecBatchCache(rowCount); System.arraycopy(cachedVecs, 0, keyValueVecs, 0, keyLength); @@ -503,7 +508,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator for (int i = keyLength; i < cachedVecs.length; i++) { valueVecs.add(cachedVecs[i]); } - if (needAddKey) { + if (isNeedAddKey) { for (Map.Entry keyValueEntry : valuePosToKeyPos[pos].entrySet()) { valueVecs.add(keyValueEntry.getKey(), cachedVecs[keyValueEntry.getValue()].slice(0, cachedVecs[keyValueEntry.getValue()].getSize())); @@ -591,7 +596,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator .getOriginalColumnNames(); List probeInspectors = probeInputfields.stream().map(StructField::getFieldObjectInspector) .collect(Collectors.toList()); - DataType[] probeTypes = getTypeFromInspectors(probeInspectors); Map probekeyColNameToId = new HashMap<>(); List probeOutputfieldsName = getExprNodeColumnEvaluator(joinValues[probeIndex]).stream() .map(evaluator -> ((ExprNodeColumnEvaluator) evaluator).getExpr().getColumn()) @@ -610,8 +614,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator if (probeOutputfieldsName.size() < probeOutputCols.length) { probeOutputCols[probeOutputCols.length - 1] = probeInputfields.size() - 1; } - String[] probeHashKeys = getExprFromExprNode(joinKeys[probeIndex], probekeyColNameToId, - inputObjInspectors[probeIndex], false); int[] buildOutputCols = new int[buildIndexes.stream() .mapToInt(buildIndex -> joinValuesObjectInspectors[buildIndex].size()).sum()]; DataType[] buildOutputTypes = new DataType[buildOutputCols.length]; @@ -625,13 +627,17 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } start = start + joinValuesObjectInspectors[buildIndex].size(); } + DataType[] probeTypes = getTypeFromInspectors(probeInspectors); + String[] probeHashKeys = getExprFromExprNode(joinKeys[probeIndex], probekeyColNameToId, + inputObjInspectors[probeIndex], false); JoinType joinType = JOIN_TYPE_MAP.get(condn[buildIndexes.size() - 1].getType()); if (joinType == JoinType.OMNI_JOIN_TYPE_FULL) { return new OmniLookupOuterJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashKeys, buildOutputCols, buildOutputTypes, omniHashBuilderWithExprOperatorFactory); } else { return new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashKeys, - buildOutputCols, buildOutputTypes, omniHashBuilderWithExprOperatorFactory, generateResidualFilter()); + buildOutputCols, buildOutputTypes, omniHashBuilderWithExprOperatorFactory, + generateResidualFilter()); } } @@ -642,7 +648,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator List probeInspectors = new ArrayList<>(); probeInspectors.addAll(joinKeysObjectInspectors[probeIndex]); probeInspectors.addAll(joinValuesObjectInspectors[probeIndex]); - DataType[] probeTypes = getTypeFromInspectors(probeInspectors); int[] probeOutputCols = new int[probeInspectors.size()]; for (int i = 0; i < probeInspectors.size(); i++) { probeOutputCols[i] = i; @@ -658,6 +663,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator buildOutputCols[i] = i; buildOutputTypes[i] = buildTypes[i]; } + DataType[] probeTypes = getTypeFromInspectors(probeInspectors); JoinType joinType = JOIN_TYPE_MAP.get(condn[buildIndexes.size() - 1].getType()); if (joinType == JoinType.OMNI_JOIN_TYPE_FULL) { return new OmniLookupOuterJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashKeys, @@ -669,7 +675,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } private void closeInnerOperators(OmniOperator[] innerBuildOperators, OmniOperator[] innerJoinOperators, - OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories, OmniOperatorFactory[] innerJoinOperatorFactories) { + OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories, + OmniOperatorFactory[] innerJoinOperatorFactories) { for (OmniOperator operator : innerBuildOperators) { operator.close(); } @@ -732,33 +739,42 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator return Optional.empty(); } Map> inputColNameToExprName = getInputColNameToExprName(); - List fields = ((StructObjectInspector) inputObjInspectors[posBigTable]).getAllStructFieldRefs(); + List fields = + ((StructObjectInspector) inputObjInspectors[posBigTable]).getAllStructFieldRefs(); List fieldNames = fields.stream().map(field -> { - String key = field.getFieldName().replace("value.", "VALUE.").replace("key.", "KEY."); + String key = field.getFieldName().replace("value.", "VALUE.") + .replace("key.", "KEY."); if (inputColNameToExprName.containsKey(key)) { List exprNames = inputColNameToExprName.get(key); - return exprNames.get(Math.min(exprNames.size() - 1, posBigTable)).replace("value.", "").replace("key.", ""); + return exprNames.get(Math.min(exprNames.size() - 1, posBigTable)) + .replace("value.", "").replace("key.", ""); } else { - return field.getFieldName().replace("value.", "").replace("key.", ""); + return field.getFieldName().replace("value.", "") + .replace("key.", ""); } } ).collect(Collectors.toList()); - List inspectors = fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList()); + List inspectors = + fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList()); for (int buildIndex : buildIndexes) { fields = ((StructObjectInspector) inputObjInspectors[buildIndex]).getAllStructFieldRefs(); fieldNames.addAll(fields.stream().map(field -> { - String key = field.getFieldName().replace("value.", "VALUE.").replace("key.", "KEY."); + String key = field.getFieldName().replace("value.", "VALUE.") + .replace("key.", "KEY."); if (inputColNameToExprName.containsKey(key)) { List exprNames = inputColNameToExprName.get(key); - return exprNames.get(Math.min(exprNames.size() - 1, buildIndex)).replace("value.", "").replace("key.", ""); + return exprNames.get(Math.min(exprNames.size() - 1, buildIndex)) + .replace("value.", "").replace("key.", ""); } else { - return field.getFieldName().replace("value.", "").replace("key.", ""); + return field.getFieldName().replace("value.", "") + .replace("key.", ""); } } ).collect(Collectors.toList())); inspectors.addAll(fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList())); } - StructObjectInspector exprObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, inspectors); + StructObjectInspector exprObjInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, inspectors); ExprNodeGenericFuncDesc predicate; if (joinFilter == null) { predicate = (ExprNodeGenericFuncDesc) conf.getResidualFilterExprs().get(0); @@ -781,7 +797,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } private ExprNodeGenericFuncDesc getJoinFilter() { - List filters = Arrays.stream(joinFilters).flatMap(Collection::stream).map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); + List filters = Arrays.stream(joinFilters).flatMap(Collection::stream) + .map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); if (filters.isEmpty()) { return null; } @@ -875,7 +892,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } @Override - public void closeOp(boolean abort) throws HiveException { + public void closeOp(boolean isAbort) throws HiveException { joinOperator.close(); omniLookupJoinWithExprOperatorFactory.close(); if (conf.isDynamicPartitionHashJoin()) { @@ -883,7 +900,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator omniHashBuilderWithExprOperatorFactory.close(); } output = null; - super.closeOp(abort); + super.closeOp(isAbort); } @Override @@ -894,8 +911,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator public void endGroup() throws HiveException { } - public void publicSetDone(boolean done) { - this.done = done; + public void publicSetDone(boolean isDone) { + this.done = isDone; } private static class OmniReaderWrapper { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java index 089f5e5313d67d21191fc97de2d67abc4171b39c..1c38599829418e3476568c5e7bf51840e49c386b 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java @@ -18,9 +18,13 @@ package com.huawei.boostkit.hive; +import static com.huawei.boostkit.hive.JoinUtils.getExprNodeColumnEvaluator; +import static com.huawei.boostkit.hive.OmniMapJoinOperator.JOIN_TYPE_MAP; + import com.huawei.boostkit.hive.expression.BaseExpression; import com.huawei.boostkit.hive.expression.ExpressionUtils; import com.huawei.boostkit.hive.expression.TypeUtils; + import nova.hetu.omniruntime.constants.JoinType; import nova.hetu.omniruntime.operator.OmniOperator; import nova.hetu.omniruntime.operator.OmniOperatorFactory; @@ -65,7 +69,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitive import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -76,20 +79,16 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static com.huawei.boostkit.hive.JoinUtils.getExprNodeColumnEvaluator; -import static com.huawei.boostkit.hive.OmniMapJoinOperator.JOIN_TYPE_MAP; public class OmniMergeJoinOperator extends CommonJoinOperator implements Serializable { - private static final long serialVersionUID = 1L; - protected static final int SMJ_NEED_ADD_STREAM_TBL_DATA = 2; protected static final int SMJ_NEED_ADD_BUFFERED_TBL_DATA = 3; protected static final int SCAN_FINISH = 4; protected static final int RES_INIT = 0; protected static final int SMJ_FETCH_JOIN_DATA = 5; + private static final long serialVersionUID = 1L; + protected transient RecordSource[] sources; protected transient boolean[] fetchDone; @@ -107,13 +106,6 @@ public class OmniMergeJoinOperator extends CommonJoinOperator protected int posBigTable; private OmniVectorOperator omniVectorOperator; - /** - * Kryo ctor. - */ - protected OmniMergeJoinOperator() { - super(); - } - public OmniMergeJoinOperator(CompilationOpContext ctx) { super(ctx); } @@ -124,6 +116,13 @@ public class OmniMergeJoinOperator extends CommonJoinOperator this.posBigTable = commonMergeJoinDesc.getPosBigTable(); } + /** + * Kryo ctor. + */ + protected OmniMergeJoinOperator() { + super(); + } + @Override // If mergeJoinOperator has 3 tables, first join table0 and table1, and output // all columns of table0 and table1. @@ -163,23 +162,24 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } } - private void generateOmniOperator(int bufferIndex, boolean getAll) throws HiveException { + private void generateOmniOperator(int bufferIndex, boolean isGetAll) throws HiveException { int opIndex = bufferIndex - 1; List streamAliasList = new ArrayList<>(); for (int i = 0; i < bufferIndex; i++) { streamAliasList.add(i); } - streamFactories[opIndex] = (OmniSmjStreamedTableWithExprOperatorFactory) getFactory(streamAliasList, null, - getAll, opIndex); + streamFactories[opIndex] = + (OmniSmjStreamedTableWithExprOperatorFactory) getFactory(streamAliasList, null, + isGetAll, opIndex); streamOperators[opIndex] = streamFactories[opIndex].createOperator(); bufferFactories[opIndex] = (OmniSmjBufferedTableWithExprOperatorFactory) getFactory(Arrays.asList(bufferIndex), - streamFactories[opIndex], getAll, opIndex); + streamFactories[opIndex], isGetAll, opIndex); bufferOperators[opIndex] = bufferFactories[opIndex].createOperator(); } private OmniOperatorFactory getFactory(List aliasList, OmniSmjStreamedTableWithExprOperatorFactory streamFactory, - boolean getAll, int opIndex) throws HiveException { + boolean isGetAll, int opIndex) throws HiveException { List inputFields = aliasList.stream() .flatMap(alias -> ((StructObjectInspector) inputObjInspectors[alias]).getAllStructFieldRefs().stream() .flatMap(keyValue -> ((StructObjectInspector) keyValue.getFieldObjectInspector()) @@ -202,11 +202,12 @@ public class OmniMergeJoinOperator extends CommonJoinOperator if (i >= fieldNum[tagIndex]) { ++tagIndex; } - inputTypes[i] = TypeUtils.buildInputDataType(((AbstractPrimitiveObjectInspector) inputFields.get(i).getFieldObjectInspector()).getTypeInfo()); + inputTypes[i] = TypeUtils.buildInputDataType(((AbstractPrimitiveObjectInspector) inputFields.get(i) + .getFieldObjectInspector()).getTypeInfo()); colNameToId.get(tagIndex).put(inputFields.get(i).getFieldName(), i); } int[] outputCols; - if (getAll) { + if (isGetAll) { outputCols = new int[inputTypes.length]; for (int i = 0; i < inputTypes.length; i++) { outputCols[i] = i; @@ -249,30 +250,36 @@ public class OmniMergeJoinOperator extends CommonJoinOperator List fields = Utilities.constructVectorizedReduceRowOI((StructObjectInspector) ((StructObjectInspector) inputObjInspectors[opIndex]).getAllStructFieldRefs().get(0).getFieldObjectInspector(), (StructObjectInspector) ((StructObjectInspector) inputObjInspectors[opIndex]).getAllStructFieldRefs().get(1).getFieldObjectInspector()).getAllStructFieldRefs(); List fieldNames = fields.stream().map(field -> { - String key = field.getFieldName().replace("value.", "VALUE.").replace("key.", "KEY."); + String key = field.getFieldName() + .replace("value.", "VALUE.").replace("key.", "KEY."); if (inputColNameToExprName.containsKey(key)) { List exprNames = inputColNameToExprName.get(key); - return exprNames.get(Math.min(exprNames.size() - 1, opIndex)).replace("value.", "").replace("key.", ""); + return exprNames.get(Math.min(exprNames.size() - 1, opIndex)) + .replace("value.", "").replace("key.", ""); } else { - return field.getFieldName().replace("value.", "").replace("key.", ""); + return field.getFieldName() + .replace("value.", "").replace("key.", ""); } } ).collect(Collectors.toList()); - List inspectors = fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList()); - int bufferIndex = opIndex + 1; fields = Utilities.constructVectorizedReduceRowOI((StructObjectInspector) ((StructObjectInspector) inputObjInspectors[bufferIndex]).getAllStructFieldRefs().get(0).getFieldObjectInspector(), (StructObjectInspector) ((StructObjectInspector) inputObjInspectors[bufferIndex]).getAllStructFieldRefs().get(1).getFieldObjectInspector()).getAllStructFieldRefs(); fieldNames.addAll(fields.stream().map(field -> { - String key = field.getFieldName().replace("value.", "VALUE.").replace("key.", "KEY."); + String key = field.getFieldName() + .replace("value.", "VALUE.").replace("key.", "KEY."); if (inputColNameToExprName.containsKey(key)) { List exprNames = inputColNameToExprName.get(key); - return exprNames.get(Math.min(exprNames.size() - 1, bufferIndex)).replace("value.", "").replace("key.", ""); + return exprNames.get(Math.min(exprNames.size() - 1, bufferIndex)) + .replace("value.", "").replace("key.", ""); } else { - return field.getFieldName().replace("value.", "").replace("key.", ""); + return field.getFieldName() + .replace("value.", "").replace("key.", ""); } } ).collect(Collectors.toList())); + List inspectors = + fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList()); inspectors.addAll(fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList())); StructObjectInspector exprObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, @@ -281,10 +288,12 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } else { StructObjectInspector flattenInspector = Utilities.constructVectorizedReduceRowOI((StructObjectInspector) ((StructObjectInspector) inputObjInspectors[opIndex]).getAllStructFieldRefs().get(0).getFieldObjectInspector(), (StructObjectInspector) ((StructObjectInspector) inputObjInspectors[opIndex]).getAllStructFieldRefs().get(1).getFieldObjectInspector()); - List inspectors = flattenInspector.getAllStructFieldRefs().stream().map(field -> field.getFieldObjectInspector()).collect(Collectors.toList()); - List fieldNames = flattenInspector.getAllStructFieldRefs().stream().map(field -> field.getFieldName()).collect(Collectors.toList()); - StructObjectInspector exprObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, - inspectors); + List inspectors = flattenInspector.getAllStructFieldRefs() + .stream().map(field -> field.getFieldObjectInspector()).collect(Collectors.toList()); + List fieldNames = flattenInspector.getAllStructFieldRefs() + .stream().map(field -> field.getFieldName()).collect(Collectors.toList()); + StructObjectInspector exprObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(fieldNames, inspectors); root = ExpressionUtils.build(joinFilter, exprObjInspector); } return Optional.of(root.toString()); @@ -303,7 +312,8 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } private ExprNodeGenericFuncDesc getJoinFilter(int opIndex) { - List filters = joinFilters[opIndex].stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); + List filters = + joinFilters[opIndex].stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); if (filters.isEmpty()) { return null; } @@ -318,7 +328,8 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } private ExprNodeGenericFuncDesc getResidualFilter() { - List filters = residualJoinFilters.stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); + List filters = + residualJoinFilters.stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); if (filters.size() == 1) { return (ExprNodeGenericFuncDesc) filters.get(0); } @@ -378,9 +389,11 @@ public class OmniMergeJoinOperator extends CommonJoinOperator protected void processOmni(int opIndex, int bufferIndex) throws HiveException { if (flowControlCode[opIndex] != SCAN_FINISH && resCode[opIndex] == RES_INIT) { if (flowControlCode[opIndex] == SMJ_NEED_ADD_STREAM_TBL_DATA) { - processOmniSmj(opIndex, opIndex, streamData, streamOperators, SMJ_NEED_ADD_STREAM_TBL_DATA, streamTypes); + processOmniSmj(opIndex, opIndex, streamData, + streamOperators, SMJ_NEED_ADD_STREAM_TBL_DATA, streamTypes); } else { - processOmniSmj(opIndex, bufferIndex, bufferData, bufferOperators, SMJ_NEED_ADD_BUFFERED_TBL_DATA, bufferTypes); + processOmniSmj(opIndex, bufferIndex, bufferData, + bufferOperators, SMJ_NEED_ADD_BUFFERED_TBL_DATA, bufferTypes); } } if (resCode[opIndex] == SMJ_FETCH_JOIN_DATA) { @@ -457,8 +470,8 @@ public class OmniMergeJoinOperator extends CommonJoinOperator public void endGroup() throws HiveException { } - public void publicSetDone(boolean done) { - this.done = done; + public void publicSetDone(boolean isDone) { + this.done = isDone; } @Override @@ -472,7 +485,7 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } @Override - public void close(boolean abort) throws HiveException { + public void close(boolean isAbort) throws HiveException { if (!allInitializedParentsAreClosed()) { return; } @@ -494,7 +507,7 @@ public class OmniMergeJoinOperator extends CommonJoinOperator processOmni(opIndex, opIndex + 1); } } - super.close(abort); + super.close(isAbort); } protected VecBatch createEofVecBatch(DataType[] dataTypes) { @@ -564,7 +577,7 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } @Override - public void closeOp(boolean abort) throws HiveException { + public void closeOp(boolean isAbort) throws HiveException { for (int i = 0; i < streamOperators.length; i++) { streamOperators[i].close(); bufferOperators[i].close(); @@ -579,6 +592,6 @@ public class OmniMergeJoinOperator extends CommonJoinOperator vecBatch.close(); } } - super.closeOp(abort); + super.closeOp(isAbort); } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java index a3b41444b3b76a193575f78dd860c5e914337d61..d84ae3e256a158a7177caed5e7f6d11d6e46d997 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java @@ -33,6 +33,10 @@ import java.util.Queue; public class OmniMergeJoinWithSortOperator extends OmniMergeJoinOperator { private transient OmniVectorWithSortOperator omniVectorWithSortOperator; + public OmniMergeJoinWithSortOperator(CompilationOpContext ctx) { + super(ctx); + } + /** * Kryo ctor. */ @@ -40,10 +44,6 @@ public class OmniMergeJoinWithSortOperator extends OmniMergeJoinOperator { super(); } - public OmniMergeJoinWithSortOperator(CompilationOpContext ctx) { - super(ctx); - } - public OmniMergeJoinWithSortOperator(CompilationOpContext ctx, CommonMergeJoinDesc commonMergeJoinDesc) { super(ctx, commonMergeJoinDesc); } @@ -86,13 +86,13 @@ public class OmniMergeJoinWithSortOperator extends OmniMergeJoinOperator { } @Override - public void closeOp(boolean abort) throws HiveException { + public void closeOp(boolean isAbort) throws HiveException { for (OmniOperator sortOperator : omniVectorWithSortOperator.getSortOperators()) { sortOperator.close(); } for (OmniSortOperatorFactory sortOperatorFactory : omniVectorWithSortOperator.getSortOperatorFactories()) { sortOperatorFactory.close(); } - super.closeOp(abort); + super.closeOp(isAbort); } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java index 93b8a209d7fce5e1637c0293f0f232d322ea200d..6fab01863595ffa1576e9cbbc7ebdf619462f780 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java @@ -21,6 +21,7 @@ package com.huawei.boostkit.hive; import com.huawei.boostkit.hive.expression.TypeUtils; import com.google.common.primitives.Ints; + import nova.hetu.omniruntime.constants.FunctionType; import nova.hetu.omniruntime.constants.OmniWindowFrameBoundType; import nova.hetu.omniruntime.constants.OmniWindowFrameType; @@ -31,6 +32,7 @@ import nova.hetu.omniruntime.operator.window.OmniWindowOperatorFactory; import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -64,13 +66,13 @@ import java.util.List; public class OmniPTFOperator extends OmniHiveOperator implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(OmniPTFOperator.class.getName()); - private transient OmniWindowOperatorFactory omniWindowOperatorFactory; - private transient OmniOperator omniOperator; boolean isMapOperator; - transient Configuration hiveConf; + private transient OmniWindowOperatorFactory omniWindowOperatorFactory; + private transient OmniOperator omniOperator; + public OmniPTFOperator() { super(); } @@ -171,7 +173,8 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se private OmniWindowFrameType[] getWindowFrameType(List windowFunctionDefs) { return windowFunctionDefs.stream().map( - windowFunctionDef -> TypeUtils.getWindowFrameType(windowFunctionDef.getWindowFrame().getWindowType())) + windowFunctionDef -> + TypeUtils.getWindowFrameType(windowFunctionDef.getWindowFrame().getWindowType())) .toArray(OmniWindowFrameType[]::new); } @@ -274,9 +277,6 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se /** * Initialize the visitor to use the QueryDefDeserializer Use the order * defined in QueryDefWalker to visit the QueryDef - * - * @param hiveConf - * @throws HiveException */ protected void reconstructQueryDef(Configuration hiveConf) throws HiveException { PTFDeserializer ds = new PTFDeserializer(conf, (StructObjectInspector) inputObjInspectors[0], hiveConf); @@ -300,8 +300,8 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se } @Override - protected void closeOp(boolean abort) throws HiveException { - super.closeOp(abort); + protected void closeOp(boolean isAbort) throws HiveException { + super.closeOp(isAbort); Iterator output = this.omniOperator.getOutput(); while (output.hasNext()) { VecBatch next = output.next(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java index 72cc75f08829ebc584aaa29580f822ce6f3c7156..84079c867572db59d08c835da93ccbeaaaae9d68 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java @@ -130,19 +130,6 @@ public class OmniReduceSinkOperator extends TerminalOperator implements Serializable, TopNHash.BinaryCollector { private static final long serialVersionUID = 1L; - private transient ObjectInspector[] partitionObjectInspectors; - private transient ObjectInspector[] bucketObjectInspectors; - private transient int buckColIdxInKey; - - /** - * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} - */ - private transient int buckColIdxInKeyForSdpo = -1; - private boolean firstRow; - private boolean skipTag = false; - private transient int[] valueIndex; // index for value(+ from keys, - from values) - private transient Set decimal128ConvertDecimal64Cols = new HashSet<>(); - protected transient OutputCollector out; /** @@ -177,7 +164,7 @@ public class OmniReduceSinkOperator extends TerminalOperator protected transient int numDistributionKeys; protected transient int numDistinctExprs; protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) - protected transient boolean useUniformHash = false; + protected transient boolean isUseUniformHash = false; // picks topN K:V pairs from input. protected transient TopNHash reducerHash; @@ -211,17 +198,34 @@ public class OmniReduceSinkOperator extends TerminalOperator // multiple? protected transient Object[][] cachedKeys; - protected transient long cntr = 1; - protected transient long logEveryNRows = 0; + protected transient long cntr = 1L; + protected transient long logEveryNRows = 0L; + + private transient ObjectInspector[] partitionObjectInspectors; + private transient ObjectInspector[] bucketObjectInspectors; + private transient int buckColIdxInKey; + + /** + * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} + */ + private transient int buckColIdxInKeyForSdpo = -1; + private boolean isFirstRow; + private boolean isSkipTag = false; + private transient int[] valueIndex; // index for value(+ from keys, - from values) + private transient Set decimal128ConvertDecimal64Cols = new HashSet<>(); private long[] keyFieldId; private long[] valueFieldId; - private transient boolean needProject; + private transient boolean isNeedProject; private transient OmniOperator projectOperator; - private transient boolean reduceSinkCanReplaceKey; + private transient boolean isReduceSinkCanReplaceKey; private transient VecWrapper[] vecWrappers; + public OmniReduceSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + /** * Kryo ctor. */ @@ -229,14 +233,10 @@ public class OmniReduceSinkOperator extends TerminalOperator super(); } - public OmniReduceSinkOperator(CompilationOpContext ctx) { - super(ctx); - } - - public OmniReduceSinkOperator(CompilationOpContext ctx, ReduceSinkDesc conf, boolean reduceSinkCanReplaceKey) { + public OmniReduceSinkOperator(CompilationOpContext ctx, ReduceSinkDesc conf, boolean isReduceSinkCanReplaceKey) { super(ctx); this.conf = conf; - this.reduceSinkCanReplaceKey = reduceSinkCanReplaceKey; + this.isReduceSinkCanReplaceKey = isReduceSinkCanReplaceKey; } @Override @@ -249,7 +249,7 @@ public class OmniReduceSinkOperator extends TerminalOperator ArrayList keyCols = conf.getKeyCols(); for (ExprNodeDesc valueCol : keyCols) { if (valueCol instanceof ExprNodeGenericFuncDesc || valueCol instanceof ExprNodeConstantDesc) { - needProject = true; + isNeedProject = true; } } keyFieldId = new long[keyCols.size()]; @@ -270,7 +270,7 @@ public class OmniReduceSinkOperator extends TerminalOperator ArrayList valueCols = this.conf.getValueCols(); for (ExprNodeDesc keyCol : valueCols) { if (keyCol instanceof ExprNodeGenericFuncDesc || keyCol instanceof ExprNodeConstantDesc) { - needProject = true; + isNeedProject = true; } } this.valueFieldId = new long[valueCols.size()]; @@ -338,7 +338,7 @@ public class OmniReduceSinkOperator extends TerminalOperator int tag = conf.getTag(); tagByte[0] = (byte) tag; - skipTag = conf.getSkipTag(); + isSkipTag = conf.getSkipTag(); if (LOG.isInfoEnabled()) { LOG.info("Using tag = " + tag); } @@ -362,18 +362,18 @@ public class OmniReduceSinkOperator extends TerminalOperator reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this, conf, hconf); } - useUniformHash = conf.getReducerTraits().contains(UNIFORM); + isUseUniformHash = conf.getReducerTraits().contains(UNIFORM); - firstRow = true; - // acidOp flag has to be checked to use JAVA hash which works like + isFirstRow = true; + // isAcidOp flag has to be checked to use JAVA hash which works like // identity function for integers, necessary to read RecordIdentifier // incase of ACID updates/deletes. - boolean acidOp = conf.getWriteType() == AcidUtils.Operation.UPDATE + boolean isAcidOp = conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE; - hashFunc = bucketingVersion == 2 && !acidOp + hashFunc = bucketingVersion == 2 && !isAcidOp ? ObjectInspectorUtils::getBucketHashCode : ObjectInspectorUtils::getBucketHashCodeOld; - if (needProject) { + if (isNeedProject) { generatorProject(); } getDecimal128ConvertDecimal64Cols(); @@ -383,7 +383,7 @@ public class OmniReduceSinkOperator extends TerminalOperator throw new RuntimeException(e); } int inputFieldNum = ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs().size(); - vecWrappers = new VecWrapper[needProject + vecWrappers = new VecWrapper[isNeedProject ? conf.getKeyCols().size() + conf.getValueCols().size() : inputFieldNum]; for (int i = 0; i < vecWrappers.length; i++) { @@ -399,19 +399,21 @@ public class OmniReduceSinkOperator extends TerminalOperator private void getDecimal128ConvertDecimal64Cols() { ArrayList allCols = new ArrayList<>(conf.getKeyCols()); allCols.addAll(conf.getValueCols()); - List allStructFieldRefs = ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs(); - List sortedAllCols = allCols.stream().filter(col -> col instanceof ExprNodeColumnDesc). - filter(col -> !extractNumberPart((ExprNodeColumnDesc) col).isEmpty()). - sorted(Comparator.comparingInt(col -> Integer.parseInt(extractNumberPart((ExprNodeColumnDesc) col)))).collect(Collectors.toList()); + List allStructFieldRefs = + ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs(); + List sortedAllCols = allCols.stream().filter(col -> col instanceof ExprNodeColumnDesc) + .filter(col -> !extractNumberPart((ExprNodeColumnDesc) col).isEmpty()) + .sorted(Comparator.comparingInt(col -> Integer.parseInt(extractNumberPart((ExprNodeColumnDesc) col)))) + .collect(Collectors.toList()); if (allStructFieldRefs.size() != sortedAllCols.size()) { return; } for (int i = 0; i < allStructFieldRefs.size(); i++) { ObjectInspector fieldObjectInspector = allStructFieldRefs.get(i).getFieldObjectInspector(); TypeInfo typeInfo = sortedAllCols.get(i).getTypeInfo(); - if (fieldObjectInspector instanceof WritableHiveDecimalObjectInspector && - ((DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo()).getPrecision() > 18 && - (typeInfo instanceof DecimalTypeInfo) && ((DecimalTypeInfo) typeInfo).getPrecision() <= 18) { + if (fieldObjectInspector instanceof WritableHiveDecimalObjectInspector + && ((DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo()).getPrecision() > 18 + && (typeInfo instanceof DecimalTypeInfo) && ((DecimalTypeInfo) typeInfo).getPrecision() <= 18) { decimal128ConvertDecimal64Cols.add(i); } } @@ -465,7 +467,8 @@ public class OmniReduceSinkOperator extends TerminalOperator * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} */ protected static StructObjectInspector initEvaluatorsAndReturnStruct(ExprNodeEvaluator[] evals, - List> distinctColIndices, List outputColNames, int length, + List> distinctColIndices, + List outputColNames, int length, ObjectInspector rowInspector) throws HiveException { int inspectorLen = evals.length > length ? length + 1 : evals.length; List sois = new ArrayList(inspectorLen); @@ -521,7 +524,7 @@ public class OmniReduceSinkOperator extends TerminalOperator if (!decimal128ConvertDecimal64Cols.isEmpty()) { input = convertVec(input); } - if (needProject) { + if (isNeedProject) { this.projectOperator.addInput(input); Iterator output = this.projectOperator.getOutput(); while (output.hasNext()) { @@ -572,8 +575,8 @@ public class OmniReduceSinkOperator extends TerminalOperator private void perProcess(Vec[] row, int tag, int index) throws HiveException { try { ObjectInspector rowInspector = inputObjInspectors[tag]; - if (firstRow) { - firstRow = false; + if (isFirstRow) { + isFirstRow = false; // TODO: this is fishy - we init object inspectors based on first tag. We // should either init for each tag, or if rowInspector doesn't really // matter, then we can create this in ctor and get rid of firstRow. @@ -637,7 +640,7 @@ public class OmniReduceSinkOperator extends TerminalOperator final int hashCode; // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] - if (useUniformHash && partitionEval.length > 0) { + if (isUseUniformHash && partitionEval.length > 0) { hashCode = computeMurmurHash(firstKey); } else { hashCode = computeHashCode(row, bucketNumber, index); @@ -648,14 +651,13 @@ public class OmniReduceSinkOperator extends TerminalOperator * in case of TopN for windowing, we need to distinguish between rows with * null partition keys and rows with value 0 for partition keys. */ - boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row, index); + boolean isPartKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row, index); // Try to store the first key. // if TopNHashes aren't active, always forward - // if TopNHashes are active, proceed if not already excluded (i.e order by - // limit) + // if TopNHashes are active, proceed if not already excluded (i.e order by limit) final int firstIndex = (reducerHash != null) - ? reducerHash.tryStoreKey(firstKey, partKeyNull) + ? reducerHash.tryStoreKey(firstKey, isPartKeyNull) : TopNHash.FORWARD; if (firstIndex == TopNHash.EXCLUDE) { return; // Nothing to do. @@ -767,9 +769,6 @@ public class OmniReduceSinkOperator extends TerminalOperator /** * Populate distinct keys part of cachedKeys for a particular row. - * - * @param row the row - * @param index the cachedKeys index to write to */ private void populateCachedDistinctKeys(Object row, int index) throws HiveException { StandardUnionObjectInspector.StandardUnion union; @@ -836,7 +835,8 @@ public class OmniReduceSinkOperator extends TerminalOperator } private Object getOriginValue(Vec vector, int index, ObjectInspector fieldObjectInspector) { - PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveCategory(); + PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = + ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveCategory(); DataType type = vector.getType(); switch (primitiveCategory) { case INT: @@ -852,7 +852,8 @@ public class OmniReduceSinkOperator extends TerminalOperator case SHORT: return ((ShortVec) vector).get(index); case DECIMAL: { - DecimalTypeInfo typeInfo = (DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo(); + DecimalTypeInfo typeInfo = + (DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo(); if (type.getId() == OMNI_LONG) { long value = ((LongVec) vector).get(index); return HiveDecimal.create(value, typeInfo.getScale()); @@ -875,7 +876,8 @@ public class OmniReduceSinkOperator extends TerminalOperator } private Object getWritableValue(Vec vector, int index, ObjectInspector fieldObjectInspector) { - PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveCategory(); + PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = + ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveCategory(); DataType type = vector.getType(); switch (primitiveCategory) { case INT: @@ -891,7 +893,8 @@ public class OmniReduceSinkOperator extends TerminalOperator case SHORT: return new ShortWritable(((ShortVec) vector).get(index)); case DECIMAL: { - DecimalTypeInfo typeInfo = (DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo(); + DecimalTypeInfo typeInfo = + (DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo(); if (type.getId() == OMNI_LONG) { long value = ((LongVec) vector).get(index); HiveDecimalWritable hiveDecimalWritable = new HiveDecimalWritable(); @@ -944,11 +947,13 @@ public class OmniReduceSinkOperator extends TerminalOperator return false; } - // Serialize the keys and append the tag + /** + * Serialize the keys and append the tag + */ protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException { BinaryComparable key = (BinaryComparable) keySerializer.serialize(obj, keyObjectInspector); int keyLength = key.getLength(); - if (tag == -1 || skipTag) { + if (tag == -1 || isSkipTag) { keyWritable.set(key.getBytes(), 0, keyLength); } else { keyWritable.setSize(keyLength); @@ -968,7 +973,7 @@ public class OmniReduceSinkOperator extends TerminalOperator protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called - if (null != out) { + if (out != null) { numRows++; runTimeNumRows++; if (LOG.isTraceEnabled()) { @@ -998,12 +1003,12 @@ public class OmniReduceSinkOperator extends TerminalOperator } @Override - protected void closeOp(boolean abort) throws HiveException { - if (!abort && reducerHash != null) { + protected void closeOp(boolean isAbort) throws HiveException { + if (!isAbort && reducerHash != null) { reducerHash.flush(); } runTimeNumRows = numRows; - super.closeOp(abort); + super.closeOp(isAbort); out = null; random = null; reducerHash = null; @@ -1030,8 +1035,8 @@ public class OmniReduceSinkOperator extends TerminalOperator return false; } - public void setSkipTag(boolean value) { - this.skipTag = value; + public void setSkipTag(boolean isSkipTag) { + this.isSkipTag = isSkipTag; } public void setValueIndex(int[] valueIndex) { @@ -1061,8 +1066,8 @@ public class OmniReduceSinkOperator extends TerminalOperator } @Override - public void setOutputCollector(OutputCollector _out) { - this.out = _out; + public void setOutputCollector(OutputCollector out) { + this.out = out; } public ReduceSinkDesc getConf() { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java index 386ad368666e386eafcca1bfe46b4866a23b2df7..36313018f98cbdd08e40ae727248ee0bad3ed945 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java @@ -35,7 +35,6 @@ import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -65,29 +64,27 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; public class OmniSelectOperator extends OmniHiveOperator implements Serializable { private static final long serialVersionUID = 1L; - private final transient Logger LOG = LoggerFactory.getLogger(getClass().getName()); + private static Cache cache = CacheBuilder.newBuilder().concurrencyLevel(8).initialCapacity(10) + .maximumSize(100).recordStats().removalListener(notification -> { + ((OmniProjectOperatorFactory) notification.getValue()).close(); + }).build(); + + private static boolean isAddedCloseThread; + private final transient Logger LOG = LoggerFactory.getLogger(getClass().getName()); private transient OmniProjectOperatorFactory projectOperatorFactory; private transient OmniOperator omniOperator; private transient boolean isSelectStarNoCompute = false; - private transient boolean needSliceVector = false; + private transient boolean isNeedSliceVector = false; private transient Iterator output; - private static Cache cache = CacheBuilder.newBuilder().concurrencyLevel(8).initialCapacity(10) - .maximumSize(100).recordStats().removalListener(notification -> { - ((OmniProjectOperatorFactory) notification.getValue()).close(); - }).build(); - - private static boolean addedCloseThread; - public OmniSelectOperator() { super(); } @@ -185,11 +182,11 @@ public class OmniSelectOperator extends OmniHiveOperator impleme SpillConfig.NONE, new OverflowConfig(OverflowConfig.OverflowConfigId.OVERFLOW_CONFIG_NULL), true)); this.omniOperator = this.projectOperatorFactory.createOperator(); cache.put(cacheKey, this.projectOperatorFactory); - if (!addedCloseThread) { + if (!isAddedCloseThread) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { cache.invalidateAll(); })); - addedCloseThread = true; + isAddedCloseThread = true; } } @@ -233,7 +230,7 @@ public class OmniSelectOperator extends OmniHiveOperator impleme List> parentOperators = this.getParentOperators(); List> childOperators = parentOperators.get(0).getChildOperators(); if (childOperators.get(childOperators.size() - 1) != this) { - needSliceVector = true; + isNeedSliceVector = true; } } @@ -244,7 +241,7 @@ public class OmniSelectOperator extends OmniHiveOperator impleme return; } VecBatch input = (VecBatch) row; - if (needSliceVector) { + if (isNeedSliceVector) { Vec[] vectors = input.getVectors(); Vec[] copyVectors = new Vec[vectors.length]; for (int i = 0; i < vectors.length; i++) { @@ -276,7 +273,7 @@ public class OmniSelectOperator extends OmniHiveOperator impleme } @Override - protected void closeOp(boolean abort) throws HiveException { + protected void closeOp(boolean isAbort) throws HiveException { if (projectOperatorFactory != null) { projectOperatorFactory.close(); } @@ -284,7 +281,7 @@ public class OmniSelectOperator extends OmniHiveOperator impleme omniOperator.close(); } output = null; - super.closeOp(abort); + super.closeOp(isAbort); } private TypeInfo createTypeinfo(ExprNodeGenericFuncDesc nodeDesc) { @@ -296,12 +293,12 @@ public class OmniSelectOperator extends OmniHiveOperator impleme } public static boolean isInvalidSelectColumn(List colList) { - for (ExprNodeDesc exprNodeDesc: colList) { - if (exprNodeDesc instanceof ExprNodeConstantDesc && exprNodeDesc.getTypeInfo().getTypeName().equals("smallint")) { + for (ExprNodeDesc exprNodeDesc : colList) { + if (exprNodeDesc instanceof ExprNodeConstantDesc + && exprNodeDesc.getTypeInfo().getTypeName().equals("smallint")) { return true; } } return false; } - } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java index b5e0373d559398d84c5559e374126f0f515e233a..945610bc9161cbe343c9a5bf5c355daf7cd51da8 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java @@ -105,7 +105,7 @@ public class OmniSortOperator extends OmniHiveOperator implements } @Override - protected void closeOp(boolean abort) throws HiveException { + protected void closeOp(boolean isAbort) throws HiveException { Iterator output = this.omniOperator.getOutput(); while (output.hasNext()) { forward(output.next(), outputObjInspector); @@ -116,6 +116,6 @@ public class OmniSortOperator extends OmniHiveOperator implements if (omniOperator != null) { omniOperator.close(); } - super.closeOp(abort); + super.closeOp(isAbort); } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java index 26020b38e90178c4168fdb8bd8f93ecf10f5ceba..c36c811864e38e7039f6f35a814de649cd355bc4 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java @@ -68,13 +68,6 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ protected transient PrimitiveTypeInfo[] partColTypeInfos; - /** - * Kryo ctor. - */ - protected OmniTableScanOperator() { - super(); - } - public OmniTableScanOperator(TableScanOperator tableScanOperator) { super(tableScanOperator.getCompilationOpContext()); this.conf = tableScanOperator.getConf(); @@ -83,6 +76,13 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ this.setSchema(tableScanOperator.getSchema()); } + /** + * Kryo ctor. + */ + protected OmniTableScanOperator() { + super(); + } + @Override public String getName() { return "OMNI_TS"; @@ -183,7 +183,7 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ } @Override - public void closeOp(boolean abort) throws HiveException { - super.closeOp(abort); + public void closeOp(boolean isAbort) throws HiveException { + super.closeOp(isAbort); } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java index 207680173496656f43d4963e539aaeca0626bd68..9cdd049a98e5fda137772be50484b72e143e747e 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java @@ -63,14 +63,14 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; public class OmniVectorOperator extends OmniHiveOperator { - private boolean isToVector; - - private transient VecConverter[][] converters; - public transient VectorCache[] vectorCache; public transient VecBufferCache[] vecBufferCaches; + private boolean isToVector; + + private transient VecConverter[][] converters; + private transient boolean isKeyValue; private transient List[] flatFields; @@ -106,7 +106,9 @@ public class OmniVectorOperator extends OmniHiveOperator { @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); - if (parentOperators.isEmpty() || (parentOperators.get(0) instanceof LimitOperator && parentOperators.get(0).getParentOperators().isEmpty())) { + if (parentOperators.isEmpty() + || (parentOperators.get(0) instanceof LimitOperator + && parentOperators.get(0).getParentOperators().isEmpty())) { isKeyValue = true; } converters = new VecConverter[inputObjInspectors.length][]; @@ -128,7 +130,8 @@ public class OmniVectorOperator extends OmniHiveOperator { } ReduceWork reduceWork = Utilities.getReduceWork(hconf); if (reduceWork != null && (reduceWork.getKeyDesc().getProperties().get(SERIALIZATION_LIB) - .equals(OmniVecBatchSerDe.class.getName()) || reduceWork.getKeyDesc().getProperties().get(SERIALIZATION_LIB) + .equals(OmniVecBatchSerDe.class.getName()) + || reduceWork.getKeyDesc().getProperties().get(SERIALIZATION_LIB) .equals(OmniVecBatchOrderSerDe.class.getName()))) { isVecBatchSerDe = true; } @@ -168,7 +171,8 @@ public class OmniVectorOperator extends OmniHiveOperator { } } if (!this.childOperators.isEmpty() && childOperators.get(0) instanceof OmniMergeJoinOperator - || childOperators.get(0) instanceof OmniMapJoinOperator && ((OmniMapJoinDesc) childOperators.get(0).getConf()).isDynamicPartitionHashJoin()) { + || childOperators.get(0) instanceof OmniMapJoinOperator + && ((OmniMapJoinDesc) childOperators.get(0).getConf()).isDynamicPartitionHashJoin()) { List fieldNames = new ArrayList<>(); for (int i = 0; i < inputObjInspectors.length; i++) { fieldNames.add(String.valueOf(i)); @@ -194,8 +198,8 @@ public class OmniVectorOperator extends OmniHiveOperator { case VARCHAR: case CHAR: case STRING: - LazyObjectInspectorParameters lazyParam = new LazyObjectInspectorParametersImpl(false, (byte) 0, - false, null, null, null); + LazyObjectInspectorParameters lazyParam = new LazyObjectInspectorParametersImpl(false, + (byte) 0, false, null, null, null); structFieldObjectInspectors.add(LazyPrimitiveObjectInspectorFactory.getLazyObjectInspector( ((PrimitiveObjectInspector) field.getFieldObjectInspector()).getTypeInfo(), lazyParam)); break; @@ -342,7 +346,7 @@ public class OmniVectorOperator extends OmniHiveOperator { } @Override - public void close(boolean abort) throws HiveException { + public void close(boolean isAbort) throws HiveException { // here to process the remaining data in the cache if (source != null) { for (int i = 0; i < source.length; i++) { @@ -361,7 +365,7 @@ public class OmniVectorOperator extends OmniHiveOperator { rowCount[i] = 0; } } - super.close(abort); + super.close(isAbort); } public void pushRestData(int tag) throws HiveException { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java index 686b88a0af8d7379dfa3daef939391d03e32bb01..80dfabd7c9dac4b2f54787985489e2440e0e9c3b 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java @@ -55,13 +55,15 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator public transient VecBufferCache[] vecBufferCaches; + public transient Iterator[] outputs; + private transient List[] flatFields; private transient RecordSource[] source; private transient boolean[] fetchDone; - private transient boolean allFetchDone; + private transient boolean isAllFetchDone; private int[] rowCount; @@ -73,8 +75,6 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator private transient OmniOperator[] sortOperators; - public transient Iterator[] outputs; - private int posBigTable; public OmniVectorWithSortOperator() { @@ -122,7 +122,7 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, Arrays.asList(soi)); generateSortOperator(); posBigTable = ((OmniMergeJoinOperator) childOperators.get(0)).getPosBigTable(); - allFetchDone = false; + isAllFetchDone = false; } private void generateSortOperator() { @@ -153,9 +153,9 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator @Override public void process(Object row, int tag) throws HiveException { - if (tag == posBigTable && !allFetchDone) { + if (tag == posBigTable && !isAllFetchDone) { dealSource(); - allFetchDone = true; + isAllFetchDone = true; } for (int i = 0; i < fields[tag].size(); i++) { Object structFieldData = soi[tag].getStructFieldData(row, fields[tag].get(i)); @@ -185,10 +185,10 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator } @Override - public void close(boolean abort) throws HiveException { - if (!allFetchDone) { + public void close(boolean isAbort) throws HiveException { + if (!isAllFetchDone) { dealSource(); - allFetchDone = true; + isAllFetchDone = true; } for (int i = 0; i < rowCount.length; i++) { if (rowCount[i] > 0) { @@ -204,7 +204,7 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator if (vecBatch != null) { forward(outputs[posBigTable].next(), posBigTable); } - super.close(abort); + super.close(isAbort); } public void pushRecord(int tag) throws HiveException { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java index b6afde6d89aa3bb56a491bffdd0bca0af11eb9d3..a986e048f77f2090cecc76a83d7fe4166fc4c7bc 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java @@ -26,11 +26,13 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import java.util.Iterator; - public class OmniVectorizedTableScanOperator extends OmniTableScanOperator { private static final long serialVersionUID = 1L; + public OmniVectorizedTableScanOperator(TableScanOperator tableScanOperator) { + super(tableScanOperator); + } + /** * Kryo ctor. */ @@ -38,10 +40,6 @@ public class OmniVectorizedTableScanOperator extends OmniTableScanOperator { super(); } - public OmniVectorizedTableScanOperator(TableScanOperator tableScanOperator) { - super(tableScanOperator); - } - @Override protected void forward(Object row, ObjectInspector rowInspector, boolean isVectorized) throws HiveException { this.runTimeNumRows += ((VecBatchWrapper) row).getVecBatch().getRowCount(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java index 60fb4bb35aff9dd7863a7980f500568c5de20640..a13d997439db869ab2dd5b9b614c46bc00b717ab 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java @@ -127,7 +127,8 @@ public class OmniVectorizedVectorOperator extends OmniHiveOperator 0) { forwardNext(); rowCount = 0; } - super.close(abort); + super.close(isAbort); } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OperatorUtils.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OperatorUtils.java index a28eb7cfc89c4ef0e1b7bd42ca7f44fe735420a3..ae0c92680eda92807f943fe75c101d257e06022e 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OperatorUtils.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OperatorUtils.java @@ -1,3 +1,21 @@ +/* + * Copyright (C) 2023-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.huawei.boostkit.hive; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;