From f73a0f61e96ef771955cdedf072f802b6569edf1 Mon Sep 17 00:00:00 2001 From: panmingyi Date: Mon, 17 Jun 2024 16:17:25 +0800 Subject: [PATCH 1/4] Clean Code --- .../boostkit/hive/OmniMapJoinOperator.java | 154 +++++++++++---- .../boostkit/hive/OmniMergeJoinOperator.java | 134 ++++++++----- .../hive/OmniMergeJoinWithSortOperator.java | 17 +- .../huawei/boostkit/hive/OmniPTFOperator.java | 30 ++- .../boostkit/hive/OmniReduceSinkOperator.java | 182 ++++++++++++------ .../boostkit/hive/OmniSelectOperator.java | 54 ++++-- .../boostkit/hive/OmniSortOperator.java | 15 +- .../boostkit/hive/OmniTableScanOperator.java | 27 ++- .../boostkit/hive/OmniVectorOperator.java | 49 +++-- .../hive/OmniVectorWithSortOperator.java | 37 +++- .../hive/OmniVectorizedTableScanOperator.java | 15 +- .../hive/OmniVectorizedVectorOperator.java | 27 ++- .../huawei/boostkit/hive/OperatorUtils.java | 29 +++ 13 files changed, 552 insertions(+), 218 deletions(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java index 9fff88495..ff3b24e0f 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java @@ -107,12 +107,16 @@ import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; +/** + * OmniMapJoinOperator + * + * @since 2024-01-10 + */ public class OmniMapJoinOperator extends AbstractMapJoinOperator implements Serializable, VectorizationContextRegion { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(OmniMapJoinOperator.class.getName()); - + /** + * JOIN_TYPE_MAP + */ public static final Map JOIN_TYPE_MAP = new HashMap() { { put(JoinDesc.INNER_JOIN, JoinType.OMNI_JOIN_TYPE_INNER); @@ -122,7 +126,13 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator put(JoinDesc.LEFT_SEMI_JOIN, JoinType.OMNI_JOIN_TYPE_LEFT_SEMI); } }; + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(OmniMapJoinOperator.class.getName()); + private static boolean isAddedCloseThread; + private static Map countShareBuildIds = new HashMap<>(); + private transient OmniOperatorFactory omniLookupJoinWithExprOperatorFactory; + private transient OmniHashBuilderWithExprOperatorFactory omniHashBuilderWithExprOperatorFactory; private transient OmniOperator joinOperator; @@ -157,6 +167,12 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator super(ctx); } + /** + * OmniMapJoinOperator + * + * @param mjop mjop + * @param mapJoinDesc mapJoinDesc + */ public OmniMapJoinOperator(AbstractMapJoinOperator mjop, MapJoinDesc mapJoinDesc) { super(mjop); this.conf = new OmniMapJoinDesc(mapJoinDesc); @@ -164,6 +180,14 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator this.vectorizationContext = null; } + /** + * OmniMapJoinOperator + * + * @param mjop mjop + * @param mapJoinDesc mapJoinDesc + * @param changedCtx changedCtx + * @param vectorizationContext vectorizationContext + */ public OmniMapJoinOperator(AbstractMapJoinOperator mjop, MapJoinDesc mapJoinDesc, boolean changedCtx, VectorizationContext vectorizationContext) { super(mjop); @@ -202,11 +226,13 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator if (!conf.isDynamicPartitionHashJoin()) { OmniHashBuilderWithExprOperatorFactory.gLock.lock(); try { - omniHashBuilderWithExprOperatorFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildNodeId); + omniHashBuilderWithExprOperatorFactory = + OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildNodeId); Integer countShareBuildId = countShareBuildIds.getOrDefault(buildNodeId, 0); countShareBuildIds.put(buildNodeId, ++countShareBuildId); if (omniHashBuilderWithExprOperatorFactory == null) { - omniHashBuilderWithExprOperatorFactory = getOmniHashBuilderWithExprOperatorFactory(joinType, buildTypes, + omniHashBuilderWithExprOperatorFactory = + getOmniHashBuilderWithExprOperatorFactory(joinType, buildTypes, buildIndexes.get(buildIndexes.size() - 1)); buildOperator = omniHashBuilderWithExprOperatorFactory.createOperator(); OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(buildNodeId, @@ -239,15 +265,17 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator if (!canLoadCache && !isInputFileChangeSensitive(mapContext)) { loadBuildVec(mapContext, mrContext); } - if (!addedCloseThread) { + if (!isAddedCloseThread) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { for (Map.Entry entry : countShareBuildIds.entrySet()) { Integer value = entry.getValue(); for (int i = 0; i < value; i++) { - OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(entry.getKey()); + OmniHashBuilderWithExprOperatorFactory + .dereferenceHashBuilderOperatorAndFactory(entry.getKey()); } - if (OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(entry.getKey()) == null) { + if (OmniHashBuilderWithExprOperatorFactory + .getHashBuilderOperatorFactory(entry.getKey()) == null) { LOG.info("release operatorFactor of buildNodeId = " + entry.getKey() + " succeed"); } else { LOG.error("release operatorFactor of buildNodeId = " + entry.getKey() + " failed"); @@ -257,7 +285,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator LOG.error("release operatorFactor failed", e); } })); - addedCloseThread = true; + isAddedCloseThread = true; } } @@ -297,7 +325,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator // result as input of buildOperator List buildList = new ArrayList<>(); List probeVec; - OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories = new OmniHashBuilderWithExprOperatorFactory[buildIndexes.size() - 1]; + OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories = + new OmniHashBuilderWithExprOperatorFactory[buildIndexes.size() - 1]; OmniOperatorFactory[] innerJoinOperatorFactories = new OmniOperatorFactory[buildIndexes.size() - 1]; OmniOperator[] innerBuildOperators = new OmniOperator[buildIndexes.size() - 1]; OmniOperator[] innerJoinOperators = new OmniOperator[buildIndexes.size() - 1]; @@ -336,7 +365,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } } } - closeInnerOperators(innerBuildOperators, innerJoinOperators, innerBuildFactories, innerJoinOperatorFactories); + closeInnerOperators(innerBuildOperators, + innerJoinOperators, innerBuildFactories, innerJoinOperatorFactories); buildOperator.getOutput(); } else { List cacheVecBatches = getVectorFromCache(1 - posBigTable); @@ -390,7 +420,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator int rowLength = ObjectInspectorUtils.getStructSize(serde.getObjectInspector()); // if output includes key and key doesn't have expression, then the value of // hashMapWrapper will not contain key. - boolean needAddKey = rowLength < joinValuesObjectInspectors[pos].size(); + boolean isNeedAddKey = rowLength < joinValuesObjectInspectors[pos].size(); Object[] valueArray = new Object[rowLength]; int rowCount = 0; Object[] keyValueArray = new Object[buildInspectors[pos].size()]; @@ -407,7 +437,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator ObjectInspectorUtils.copyStructToArray(serde.deserialize(value), serde.getObjectInspector(), ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE, valueArray, 0); } - if (needAddKey) { + if (isNeedAddKey) { valueList = new ArrayList<>(Arrays.asList(valueArray)); for (Map.Entry keyValueEntry : valuePosToKeyPos[pos].entrySet()) { valueList.add(keyValueEntry.getKey(), key[keyValueEntry.getValue()]); @@ -465,7 +495,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator int rowLength = ObjectInspectorUtils.getStructSize(valueSerde.getObjectInspector()); // if output includes key and key doesn't have expression, then the value of // hashMapWrapper will not contain key. - boolean needAddKey = rowLength < joinValuesObjectInspectors[pos].size(); + boolean isNeedAddKey = rowLength < joinValuesObjectInspectors[pos].size(); int rowCount = 0; VecSerdeBody[] key; Writable currentKey; @@ -481,12 +511,12 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } rowCount++; if (rowCount == BATCH) { - setBuildVecBatches(pos, buildVecBatches, vecBufferCache, needAddKey, rowCount, keyLength); + setBuildVecBatches(pos, buildVecBatches, vecBufferCache, isNeedAddKey, rowCount, keyLength); rowCount = 0; } } if (rowCount > 0) { - setBuildVecBatches(pos, buildVecBatches, vecBufferCache, needAddKey, rowCount, keyLength); + setBuildVecBatches(pos, buildVecBatches, vecBufferCache, isNeedAddKey, rowCount, keyLength); } return buildVecBatches; } catch (Exception e) { @@ -495,7 +525,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } private void setBuildVecBatches(int pos, List buildVecBatches, VecBufferCache vecBufferCache, - boolean needAddKey, int rowCount, int keyLength) { + boolean isNeedAddKey, int rowCount, int keyLength) { Vec[] keyValueVecs = new Vec[buildInspectors[pos].size()]; Vec[] cachedVecs = vecBufferCache.getValueVecBatchCache(rowCount); System.arraycopy(cachedVecs, 0, keyValueVecs, 0, keyLength); @@ -503,7 +533,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator for (int i = keyLength; i < cachedVecs.length; i++) { valueVecs.add(cachedVecs[i]); } - if (needAddKey) { + if (isNeedAddKey) { for (Map.Entry keyValueEntry : valuePosToKeyPos[pos].entrySet()) { valueVecs.add(keyValueEntry.getKey(), cachedVecs[keyValueEntry.getValue()].slice(0, cachedVecs[keyValueEntry.getValue()].getSize())); @@ -514,6 +544,11 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator valueVecs = null; } + /** + * generateMapMetaData + * + * @throws HiveException HiveException + */ public void generateMapMetaData() throws HiveException { try { TableDesc keyTableDesc = conf.getKeyTblDesc(); @@ -591,7 +626,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator .getOriginalColumnNames(); List probeInspectors = probeInputfields.stream().map(StructField::getFieldObjectInspector) .collect(Collectors.toList()); - DataType[] probeTypes = getTypeFromInspectors(probeInspectors); Map probekeyColNameToId = new HashMap<>(); List probeOutputfieldsName = getExprNodeColumnEvaluator(joinValues[probeIndex]).stream() .map(evaluator -> ((ExprNodeColumnEvaluator) evaluator).getExpr().getColumn()) @@ -610,8 +644,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator if (probeOutputfieldsName.size() < probeOutputCols.length) { probeOutputCols[probeOutputCols.length - 1] = probeInputfields.size() - 1; } - String[] probeHashKeys = getExprFromExprNode(joinKeys[probeIndex], probekeyColNameToId, - inputObjInspectors[probeIndex], false); int[] buildOutputCols = new int[buildIndexes.stream() .mapToInt(buildIndex -> joinValuesObjectInspectors[buildIndex].size()).sum()]; DataType[] buildOutputTypes = new DataType[buildOutputCols.length]; @@ -625,13 +657,17 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } start = start + joinValuesObjectInspectors[buildIndex].size(); } + DataType[] probeTypes = getTypeFromInspectors(probeInspectors); + String[] probeHashKeys = getExprFromExprNode(joinKeys[probeIndex], probekeyColNameToId, + inputObjInspectors[probeIndex], false); JoinType joinType = JOIN_TYPE_MAP.get(condn[buildIndexes.size() - 1].getType()); if (joinType == JoinType.OMNI_JOIN_TYPE_FULL) { return new OmniLookupOuterJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashKeys, buildOutputCols, buildOutputTypes, omniHashBuilderWithExprOperatorFactory); } else { return new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashKeys, - buildOutputCols, buildOutputTypes, omniHashBuilderWithExprOperatorFactory, generateResidualFilter()); + buildOutputCols, buildOutputTypes, omniHashBuilderWithExprOperatorFactory, + generateResidualFilter()); } } @@ -642,7 +678,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator List probeInspectors = new ArrayList<>(); probeInspectors.addAll(joinKeysObjectInspectors[probeIndex]); probeInspectors.addAll(joinValuesObjectInspectors[probeIndex]); - DataType[] probeTypes = getTypeFromInspectors(probeInspectors); int[] probeOutputCols = new int[probeInspectors.size()]; for (int i = 0; i < probeInspectors.size(); i++) { probeOutputCols[i] = i; @@ -658,6 +693,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator buildOutputCols[i] = i; buildOutputTypes[i] = buildTypes[i]; } + DataType[] probeTypes = getTypeFromInspectors(probeInspectors); JoinType joinType = JOIN_TYPE_MAP.get(condn[buildIndexes.size() - 1].getType()); if (joinType == JoinType.OMNI_JOIN_TYPE_FULL) { return new OmniLookupOuterJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashKeys, @@ -669,7 +705,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } private void closeInnerOperators(OmniOperator[] innerBuildOperators, OmniOperator[] innerJoinOperators, - OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories, OmniOperatorFactory[] innerJoinOperatorFactories) { + OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories, + OmniOperatorFactory[] innerJoinOperatorFactories) { for (OmniOperator operator : innerBuildOperators) { operator.close(); } @@ -732,33 +769,42 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator return Optional.empty(); } Map> inputColNameToExprName = getInputColNameToExprName(); - List fields = ((StructObjectInspector) inputObjInspectors[posBigTable]).getAllStructFieldRefs(); + List fields = + ((StructObjectInspector) inputObjInspectors[posBigTable]).getAllStructFieldRefs(); List fieldNames = fields.stream().map(field -> { - String key = field.getFieldName().replace("value.", "VALUE.").replace("key.", "KEY."); + String key = field.getFieldName().replace("value.", "VALUE.") + .replace("key.", "KEY."); if (inputColNameToExprName.containsKey(key)) { List exprNames = inputColNameToExprName.get(key); - return exprNames.get(Math.min(exprNames.size() - 1, posBigTable)).replace("value.", "").replace("key.", ""); + return exprNames.get(Math.min(exprNames.size() - 1, posBigTable)) + .replace("value.", "").replace("key.", ""); } else { - return field.getFieldName().replace("value.", "").replace("key.", ""); + return field.getFieldName().replace("value.", "") + .replace("key.", ""); } } ).collect(Collectors.toList()); - List inspectors = fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList()); + List inspectors = + fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList()); for (int buildIndex : buildIndexes) { fields = ((StructObjectInspector) inputObjInspectors[buildIndex]).getAllStructFieldRefs(); fieldNames.addAll(fields.stream().map(field -> { - String key = field.getFieldName().replace("value.", "VALUE.").replace("key.", "KEY."); + String key = field.getFieldName().replace("value.", "VALUE.") + .replace("key.", "KEY."); if (inputColNameToExprName.containsKey(key)) { List exprNames = inputColNameToExprName.get(key); - return exprNames.get(Math.min(exprNames.size() - 1, buildIndex)).replace("value.", "").replace("key.", ""); + return exprNames.get(Math.min(exprNames.size() - 1, buildIndex)) + .replace("value.", "").replace("key.", ""); } else { - return field.getFieldName().replace("value.", "").replace("key.", ""); + return field.getFieldName().replace("value.", "") + .replace("key.", ""); } } ).collect(Collectors.toList())); inspectors.addAll(fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList())); } - StructObjectInspector exprObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, inspectors); + StructObjectInspector exprObjInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, inspectors); ExprNodeGenericFuncDesc predicate; if (joinFilter == null) { predicate = (ExprNodeGenericFuncDesc) conf.getResidualFilterExprs().get(0); @@ -781,7 +827,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } private ExprNodeGenericFuncDesc getJoinFilter() { - List filters = Arrays.stream(joinFilters).flatMap(Collection::stream).map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); + List filters = Arrays.stream(joinFilters).flatMap(Collection::stream) + .map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); if (filters.isEmpty()) { return null; } @@ -875,7 +922,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } @Override - public void closeOp(boolean abort) throws HiveException { + public void closeOp(boolean isAbort) throws HiveException { joinOperator.close(); omniLookupJoinWithExprOperatorFactory.close(); if (conf.isDynamicPartitionHashJoin()) { @@ -883,7 +930,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator omniHashBuilderWithExprOperatorFactory.close(); } output = null; - super.closeOp(abort); + super.closeOp(isAbort); } @Override @@ -894,8 +941,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator public void endGroup() throws HiveException { } - public void publicSetDone(boolean done) { - this.done = done; + public void publicSetDone(boolean isDone) { + this.done = isDone; } private static class OmniReaderWrapper { @@ -903,6 +950,11 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator private KeyValueReader originValueReader; private KeyValuesReader originValuesReader; + /** + * OmniReaderWrapper + * + * @param reader reader + */ public OmniReaderWrapper(Reader reader) { if (reader instanceof KeyValueReader) { isSingleValue = true; @@ -912,6 +964,12 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } } + /** + * next + * + * @return boolean + * @throws IOException IOException + */ public boolean next() throws IOException { if (isSingleValue) { return originValueReader.next(); @@ -922,6 +980,12 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator return originValuesReader.next(); } + /** + * hasNextValue + * + * @return boolean + * @throws IOException IOException + */ public boolean hasNextValue() throws IOException { if (isSingleValue) { return false; @@ -929,6 +993,12 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator return originValuesReader.getCurrentValues().iterator().hasNext(); } + /** + * getCurrentKey + * + * @return Object + * @throws IOException IOException + */ public Object getCurrentKey() throws IOException { if (isSingleValue) { return originValueReader.getCurrentKey(); @@ -936,6 +1006,12 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator return originValuesReader.getCurrentKey(); } + /** + * getCurrentValue + * + * @return Object + * @throws IOException IOException + */ public Object getCurrentValue() throws IOException { if (isSingleValue) { return originValueReader.getCurrentValue(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java index 089f5e531..2e34297bf 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java @@ -18,9 +18,13 @@ package com.huawei.boostkit.hive; +import static com.huawei.boostkit.hive.JoinUtils.getExprNodeColumnEvaluator; +import static com.huawei.boostkit.hive.OmniMapJoinOperator.JOIN_TYPE_MAP; + import com.huawei.boostkit.hive.expression.BaseExpression; import com.huawei.boostkit.hive.expression.ExpressionUtils; import com.huawei.boostkit.hive.expression.TypeUtils; + import nova.hetu.omniruntime.constants.JoinType; import nova.hetu.omniruntime.operator.OmniOperator; import nova.hetu.omniruntime.operator.OmniOperatorFactory; @@ -65,7 +69,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitive import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -76,20 +79,21 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static com.huawei.boostkit.hive.JoinUtils.getExprNodeColumnEvaluator; -import static com.huawei.boostkit.hive.OmniMapJoinOperator.JOIN_TYPE_MAP; +/** + * OmniMergeJoinOperator + * + * @since 2024-01-10 + */ public class OmniMergeJoinOperator extends CommonJoinOperator implements Serializable { - private static final long serialVersionUID = 1L; - protected static final int SMJ_NEED_ADD_STREAM_TBL_DATA = 2; protected static final int SMJ_NEED_ADD_BUFFERED_TBL_DATA = 3; protected static final int SCAN_FINISH = 4; protected static final int RES_INIT = 0; protected static final int SMJ_FETCH_JOIN_DATA = 5; + private static final long serialVersionUID = 1L; + protected transient RecordSource[] sources; protected transient boolean[] fetchDone; @@ -107,23 +111,29 @@ public class OmniMergeJoinOperator extends CommonJoinOperator protected int posBigTable; private OmniVectorOperator omniVectorOperator; - /** - * Kryo ctor. - */ - protected OmniMergeJoinOperator() { - super(); - } - public OmniMergeJoinOperator(CompilationOpContext ctx) { super(ctx); } + /** + * OmniMergeJoinOperator + * + * @param ctx ctx + * @param commonMergeJoinDesc commonMergeJoinDesc + */ public OmniMergeJoinOperator(CompilationOpContext ctx, CommonMergeJoinDesc commonMergeJoinDesc) { super(ctx); this.conf = new OmniMergeJoinDesc(commonMergeJoinDesc); this.posBigTable = commonMergeJoinDesc.getPosBigTable(); } + /** + * Kryo ctor. + */ + protected OmniMergeJoinOperator() { + super(); + } + @Override // If mergeJoinOperator has 3 tables, first join table0 and table1, and output // all columns of table0 and table1. @@ -163,23 +173,24 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } } - private void generateOmniOperator(int bufferIndex, boolean getAll) throws HiveException { + private void generateOmniOperator(int bufferIndex, boolean isGetAll) throws HiveException { int opIndex = bufferIndex - 1; List streamAliasList = new ArrayList<>(); for (int i = 0; i < bufferIndex; i++) { streamAliasList.add(i); } - streamFactories[opIndex] = (OmniSmjStreamedTableWithExprOperatorFactory) getFactory(streamAliasList, null, - getAll, opIndex); + streamFactories[opIndex] = + (OmniSmjStreamedTableWithExprOperatorFactory) getFactory(streamAliasList, null, + isGetAll, opIndex); streamOperators[opIndex] = streamFactories[opIndex].createOperator(); bufferFactories[opIndex] = (OmniSmjBufferedTableWithExprOperatorFactory) getFactory(Arrays.asList(bufferIndex), - streamFactories[opIndex], getAll, opIndex); + streamFactories[opIndex], isGetAll, opIndex); bufferOperators[opIndex] = bufferFactories[opIndex].createOperator(); } private OmniOperatorFactory getFactory(List aliasList, OmniSmjStreamedTableWithExprOperatorFactory streamFactory, - boolean getAll, int opIndex) throws HiveException { + boolean isGetAll, int opIndex) throws HiveException { List inputFields = aliasList.stream() .flatMap(alias -> ((StructObjectInspector) inputObjInspectors[alias]).getAllStructFieldRefs().stream() .flatMap(keyValue -> ((StructObjectInspector) keyValue.getFieldObjectInspector()) @@ -202,11 +213,12 @@ public class OmniMergeJoinOperator extends CommonJoinOperator if (i >= fieldNum[tagIndex]) { ++tagIndex; } - inputTypes[i] = TypeUtils.buildInputDataType(((AbstractPrimitiveObjectInspector) inputFields.get(i).getFieldObjectInspector()).getTypeInfo()); + inputTypes[i] = TypeUtils.buildInputDataType(((AbstractPrimitiveObjectInspector) inputFields.get(i) + .getFieldObjectInspector()).getTypeInfo()); colNameToId.get(tagIndex).put(inputFields.get(i).getFieldName(), i); } int[] outputCols; - if (getAll) { + if (isGetAll) { outputCols = new int[inputTypes.length]; for (int i = 0; i < inputTypes.length; i++) { outputCols[i] = i; @@ -249,30 +261,36 @@ public class OmniMergeJoinOperator extends CommonJoinOperator List fields = Utilities.constructVectorizedReduceRowOI((StructObjectInspector) ((StructObjectInspector) inputObjInspectors[opIndex]).getAllStructFieldRefs().get(0).getFieldObjectInspector(), (StructObjectInspector) ((StructObjectInspector) inputObjInspectors[opIndex]).getAllStructFieldRefs().get(1).getFieldObjectInspector()).getAllStructFieldRefs(); List fieldNames = fields.stream().map(field -> { - String key = field.getFieldName().replace("value.", "VALUE.").replace("key.", "KEY."); + String key = field.getFieldName() + .replace("value.", "VALUE.").replace("key.", "KEY."); if (inputColNameToExprName.containsKey(key)) { List exprNames = inputColNameToExprName.get(key); - return exprNames.get(Math.min(exprNames.size() - 1, opIndex)).replace("value.", "").replace("key.", ""); + return exprNames.get(Math.min(exprNames.size() - 1, opIndex)) + .replace("value.", "").replace("key.", ""); } else { - return field.getFieldName().replace("value.", "").replace("key.", ""); + return field.getFieldName() + .replace("value.", "").replace("key.", ""); } } ).collect(Collectors.toList()); - List inspectors = fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList()); - int bufferIndex = opIndex + 1; fields = Utilities.constructVectorizedReduceRowOI((StructObjectInspector) ((StructObjectInspector) inputObjInspectors[bufferIndex]).getAllStructFieldRefs().get(0).getFieldObjectInspector(), (StructObjectInspector) ((StructObjectInspector) inputObjInspectors[bufferIndex]).getAllStructFieldRefs().get(1).getFieldObjectInspector()).getAllStructFieldRefs(); fieldNames.addAll(fields.stream().map(field -> { - String key = field.getFieldName().replace("value.", "VALUE.").replace("key.", "KEY."); + String key = field.getFieldName() + .replace("value.", "VALUE.").replace("key.", "KEY."); if (inputColNameToExprName.containsKey(key)) { List exprNames = inputColNameToExprName.get(key); - return exprNames.get(Math.min(exprNames.size() - 1, bufferIndex)).replace("value.", "").replace("key.", ""); + return exprNames.get(Math.min(exprNames.size() - 1, bufferIndex)) + .replace("value.", "").replace("key.", ""); } else { - return field.getFieldName().replace("value.", "").replace("key.", ""); + return field.getFieldName() + .replace("value.", "").replace("key.", ""); } } ).collect(Collectors.toList())); + List inspectors = + fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList()); inspectors.addAll(fields.stream().map(StructField::getFieldObjectInspector).collect(Collectors.toList())); StructObjectInspector exprObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, @@ -281,10 +299,12 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } else { StructObjectInspector flattenInspector = Utilities.constructVectorizedReduceRowOI((StructObjectInspector) ((StructObjectInspector) inputObjInspectors[opIndex]).getAllStructFieldRefs().get(0).getFieldObjectInspector(), (StructObjectInspector) ((StructObjectInspector) inputObjInspectors[opIndex]).getAllStructFieldRefs().get(1).getFieldObjectInspector()); - List inspectors = flattenInspector.getAllStructFieldRefs().stream().map(field -> field.getFieldObjectInspector()).collect(Collectors.toList()); - List fieldNames = flattenInspector.getAllStructFieldRefs().stream().map(field -> field.getFieldName()).collect(Collectors.toList()); - StructObjectInspector exprObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, - inspectors); + List inspectors = flattenInspector.getAllStructFieldRefs() + .stream().map(field -> field.getFieldObjectInspector()).collect(Collectors.toList()); + List fieldNames = flattenInspector.getAllStructFieldRefs() + .stream().map(field -> field.getFieldName()).collect(Collectors.toList()); + StructObjectInspector exprObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(fieldNames, inspectors); root = ExpressionUtils.build(joinFilter, exprObjInspector); } return Optional.of(root.toString()); @@ -303,7 +323,8 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } private ExprNodeGenericFuncDesc getJoinFilter(int opIndex) { - List filters = joinFilters[opIndex].stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); + List filters = + joinFilters[opIndex].stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); if (filters.isEmpty()) { return null; } @@ -318,7 +339,8 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } private ExprNodeGenericFuncDesc getResidualFilter() { - List filters = residualJoinFilters.stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); + List filters = + residualJoinFilters.stream().map(ExprNodeEvaluator::getExpr).collect(Collectors.toList()); if (filters.size() == 1) { return (ExprNodeGenericFuncDesc) filters.get(0); } @@ -375,12 +397,21 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } } + /** + * processOmni + * + * @param opIndex opIndex + * @param bufferIndex bufferIndex + * @throws HiveException HiveException + */ protected void processOmni(int opIndex, int bufferIndex) throws HiveException { if (flowControlCode[opIndex] != SCAN_FINISH && resCode[opIndex] == RES_INIT) { if (flowControlCode[opIndex] == SMJ_NEED_ADD_STREAM_TBL_DATA) { - processOmniSmj(opIndex, opIndex, streamData, streamOperators, SMJ_NEED_ADD_STREAM_TBL_DATA, streamTypes); + processOmniSmj(opIndex, opIndex, streamData, + streamOperators, SMJ_NEED_ADD_STREAM_TBL_DATA, streamTypes); } else { - processOmniSmj(opIndex, bufferIndex, bufferData, bufferOperators, SMJ_NEED_ADD_BUFFERED_TBL_DATA, bufferTypes); + processOmniSmj(opIndex, bufferIndex, bufferData, + bufferOperators, SMJ_NEED_ADD_BUFFERED_TBL_DATA, bufferTypes); } } if (resCode[opIndex] == SMJ_FETCH_JOIN_DATA) { @@ -409,6 +440,17 @@ public class OmniMergeJoinOperator extends CommonJoinOperator resCode[opIndex] = RES_INIT; } + /** + * processOmniSmj + * + * @param opIndex opIndex + * @param dataIndex dataIndex + * @param data data + * @param operators operators + * @param controlCode controlCode + * @param types types + * @throws HiveException HiveException + */ protected void processOmniSmj(int opIndex, int dataIndex, Queue[] data, OmniOperator[] operators, int controlCode, DataType[][] types) throws HiveException { if (!data[opIndex].isEmpty()) { @@ -457,8 +499,8 @@ public class OmniMergeJoinOperator extends CommonJoinOperator public void endGroup() throws HiveException { } - public void publicSetDone(boolean done) { - this.done = done; + public void publicSetDone(boolean isDone) { + this.done = isDone; } @Override @@ -472,7 +514,7 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } @Override - public void close(boolean abort) throws HiveException { + public void close(boolean isAbort) throws HiveException { if (!allInitializedParentsAreClosed()) { return; } @@ -494,9 +536,15 @@ public class OmniMergeJoinOperator extends CommonJoinOperator processOmni(opIndex, opIndex + 1); } } - super.close(abort); + super.close(isAbort); } + /** + * createEofVecBatch + * + * @param dataTypes dataTypes + * @return VecBatch VecBatch + */ protected VecBatch createEofVecBatch(DataType[] dataTypes) { Vec[] vecs = new Vec[dataTypes.length]; for (int i = 0; i < dataTypes.length; i++) { @@ -564,7 +612,7 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } @Override - public void closeOp(boolean abort) throws HiveException { + public void closeOp(boolean isAbort) throws HiveException { for (int i = 0; i < streamOperators.length; i++) { streamOperators[i].close(); bufferOperators[i].close(); @@ -579,6 +627,6 @@ public class OmniMergeJoinOperator extends CommonJoinOperator vecBatch.close(); } } - super.closeOp(abort); + super.closeOp(isAbort); } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java index a3b41444b..14dc3bc36 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java @@ -30,9 +30,18 @@ import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import java.util.Queue; +/** + * OmniMergeJoinWithSortOperator + * + * @since 2024-01-10 + */ public class OmniMergeJoinWithSortOperator extends OmniMergeJoinOperator { private transient OmniVectorWithSortOperator omniVectorWithSortOperator; + public OmniMergeJoinWithSortOperator(CompilationOpContext ctx) { + super(ctx); + } + /** * Kryo ctor. */ @@ -40,10 +49,6 @@ public class OmniMergeJoinWithSortOperator extends OmniMergeJoinOperator { super(); } - public OmniMergeJoinWithSortOperator(CompilationOpContext ctx) { - super(ctx); - } - public OmniMergeJoinWithSortOperator(CompilationOpContext ctx, CommonMergeJoinDesc commonMergeJoinDesc) { super(ctx, commonMergeJoinDesc); } @@ -86,13 +91,13 @@ public class OmniMergeJoinWithSortOperator extends OmniMergeJoinOperator { } @Override - public void closeOp(boolean abort) throws HiveException { + public void closeOp(boolean isAbort) throws HiveException { for (OmniOperator sortOperator : omniVectorWithSortOperator.getSortOperators()) { sortOperator.close(); } for (OmniSortOperatorFactory sortOperatorFactory : omniVectorWithSortOperator.getSortOperatorFactories()) { sortOperatorFactory.close(); } - super.closeOp(abort); + super.closeOp(isAbort); } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java index 93b8a209d..e1ef3f4d1 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java @@ -21,6 +21,7 @@ package com.huawei.boostkit.hive; import com.huawei.boostkit.hive.expression.TypeUtils; import com.google.common.primitives.Ints; + import nova.hetu.omniruntime.constants.FunctionType; import nova.hetu.omniruntime.constants.OmniWindowFrameBoundType; import nova.hetu.omniruntime.constants.OmniWindowFrameType; @@ -31,6 +32,7 @@ import nova.hetu.omniruntime.operator.window.OmniWindowOperatorFactory; import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -61,16 +63,21 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +/** + * OmniPTFOperator + * + * @since 2024-01-10 + */ public class OmniPTFOperator extends OmniHiveOperator implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(OmniPTFOperator.class.getName()); - private transient OmniWindowOperatorFactory omniWindowOperatorFactory; - private transient OmniOperator omniOperator; boolean isMapOperator; - transient Configuration hiveConf; + private transient OmniWindowOperatorFactory omniWindowOperatorFactory; + private transient OmniOperator omniOperator; + public OmniPTFOperator() { super(); } @@ -79,6 +86,12 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se super(ctx); } + /** + * OmniPTFOperator + * + * @param ctx ctx + * @param conf conf + */ public OmniPTFOperator(CompilationOpContext ctx, PTFDesc conf) { super(ctx); this.conf = new OmniPTFDesc(conf); @@ -171,7 +184,8 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se private OmniWindowFrameType[] getWindowFrameType(List windowFunctionDefs) { return windowFunctionDefs.stream().map( - windowFunctionDef -> TypeUtils.getWindowFrameType(windowFunctionDef.getWindowFrame().getWindowType())) + windowFunctionDef -> + TypeUtils.getWindowFrameType(windowFunctionDef.getWindowFrame().getWindowType())) .toArray(OmniWindowFrameType[]::new); } @@ -275,8 +289,8 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se * Initialize the visitor to use the QueryDefDeserializer Use the order * defined in QueryDefWalker to visit the QueryDef * - * @param hiveConf - * @throws HiveException + * @param hiveConf hiveConf + * @throws HiveException HiveException */ protected void reconstructQueryDef(Configuration hiveConf) throws HiveException { PTFDeserializer ds = new PTFDeserializer(conf, (StructObjectInspector) inputObjInspectors[0], hiveConf); @@ -300,8 +314,8 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se } @Override - protected void closeOp(boolean abort) throws HiveException { - super.closeOp(abort); + protected void closeOp(boolean isAbort) throws HiveException { + super.closeOp(isAbort); Iterator output = this.omniOperator.getOutput(); while (output.hasNext()) { VecBatch next = output.next(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java index 72cc75f08..7ff9c638b 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java @@ -126,23 +126,18 @@ import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; +/** + * OmniReduceSinkOperator + * + * @since 2024-01-10 + */ public class OmniReduceSinkOperator extends TerminalOperator implements Serializable, TopNHash.BinaryCollector { private static final long serialVersionUID = 1L; - private transient ObjectInspector[] partitionObjectInspectors; - private transient ObjectInspector[] bucketObjectInspectors; - private transient int buckColIdxInKey; - /** - * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} + * OutPutCollector */ - private transient int buckColIdxInKeyForSdpo = -1; - private boolean firstRow; - private boolean skipTag = false; - private transient int[] valueIndex; // index for value(+ from keys, - from values) - private transient Set decimal128ConvertDecimal64Cols = new HashSet<>(); - protected transient OutputCollector out; /** @@ -177,7 +172,7 @@ public class OmniReduceSinkOperator extends TerminalOperator protected transient int numDistributionKeys; protected transient int numDistinctExprs; protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) - protected transient boolean useUniformHash = false; + protected transient boolean isUseUniformHash = false; // picks topN K:V pairs from input. protected transient TopNHash reducerHash; @@ -211,17 +206,34 @@ public class OmniReduceSinkOperator extends TerminalOperator // multiple? protected transient Object[][] cachedKeys; - protected transient long cntr = 1; - protected transient long logEveryNRows = 0; + protected transient long cntr = 1L; + protected transient long logEveryNRows = 0L; + + private transient ObjectInspector[] partitionObjectInspectors; + private transient ObjectInspector[] bucketObjectInspectors; + private transient int buckColIdxInKey; + + /** + * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} + */ + private transient int buckColIdxInKeyForSdpo = -1; + private boolean isFirstRow; + private boolean isSkipTag = false; + private transient int[] valueIndex; // index for value(+ from keys, - from values) + private transient Set decimal128ConvertDecimal64Cols = new HashSet<>(); private long[] keyFieldId; private long[] valueFieldId; - private transient boolean needProject; + private transient boolean isNeedProject; private transient OmniOperator projectOperator; - private transient boolean reduceSinkCanReplaceKey; + private transient boolean isReduceSinkCanReplaceKey; private transient VecWrapper[] vecWrappers; + public OmniReduceSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + /** * Kryo ctor. */ @@ -229,14 +241,17 @@ public class OmniReduceSinkOperator extends TerminalOperator super(); } - public OmniReduceSinkOperator(CompilationOpContext ctx) { - super(ctx); - } - - public OmniReduceSinkOperator(CompilationOpContext ctx, ReduceSinkDesc conf, boolean reduceSinkCanReplaceKey) { + /** + * OmniReduceSinkOperator + * + * @param ctx ctx + * @param conf conf + * @param isReduceSinkCanReplaceKey isReduceSinkCanReplaceKey + */ + public OmniReduceSinkOperator(CompilationOpContext ctx, ReduceSinkDesc conf, boolean isReduceSinkCanReplaceKey) { super(ctx); this.conf = conf; - this.reduceSinkCanReplaceKey = reduceSinkCanReplaceKey; + this.isReduceSinkCanReplaceKey = isReduceSinkCanReplaceKey; } @Override @@ -249,7 +264,7 @@ public class OmniReduceSinkOperator extends TerminalOperator ArrayList keyCols = conf.getKeyCols(); for (ExprNodeDesc valueCol : keyCols) { if (valueCol instanceof ExprNodeGenericFuncDesc || valueCol instanceof ExprNodeConstantDesc) { - needProject = true; + isNeedProject = true; } } keyFieldId = new long[keyCols.size()]; @@ -270,7 +285,7 @@ public class OmniReduceSinkOperator extends TerminalOperator ArrayList valueCols = this.conf.getValueCols(); for (ExprNodeDesc keyCol : valueCols) { if (keyCol instanceof ExprNodeGenericFuncDesc || keyCol instanceof ExprNodeConstantDesc) { - needProject = true; + isNeedProject = true; } } this.valueFieldId = new long[valueCols.size()]; @@ -338,7 +353,7 @@ public class OmniReduceSinkOperator extends TerminalOperator int tag = conf.getTag(); tagByte[0] = (byte) tag; - skipTag = conf.getSkipTag(); + isSkipTag = conf.getSkipTag(); if (LOG.isInfoEnabled()) { LOG.info("Using tag = " + tag); } @@ -362,18 +377,18 @@ public class OmniReduceSinkOperator extends TerminalOperator reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this, conf, hconf); } - useUniformHash = conf.getReducerTraits().contains(UNIFORM); + isUseUniformHash = conf.getReducerTraits().contains(UNIFORM); - firstRow = true; - // acidOp flag has to be checked to use JAVA hash which works like + isFirstRow = true; + // isAcidOp flag has to be checked to use JAVA hash which works like // identity function for integers, necessary to read RecordIdentifier // incase of ACID updates/deletes. - boolean acidOp = conf.getWriteType() == AcidUtils.Operation.UPDATE + boolean isAcidOp = conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE; - hashFunc = bucketingVersion == 2 && !acidOp + hashFunc = bucketingVersion == 2 && !isAcidOp ? ObjectInspectorUtils::getBucketHashCode : ObjectInspectorUtils::getBucketHashCodeOld; - if (needProject) { + if (isNeedProject) { generatorProject(); } getDecimal128ConvertDecimal64Cols(); @@ -383,7 +398,7 @@ public class OmniReduceSinkOperator extends TerminalOperator throw new RuntimeException(e); } int inputFieldNum = ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs().size(); - vecWrappers = new VecWrapper[needProject + vecWrappers = new VecWrapper[isNeedProject ? conf.getKeyCols().size() + conf.getValueCols().size() : inputFieldNum]; for (int i = 0; i < vecWrappers.length; i++) { @@ -399,19 +414,21 @@ public class OmniReduceSinkOperator extends TerminalOperator private void getDecimal128ConvertDecimal64Cols() { ArrayList allCols = new ArrayList<>(conf.getKeyCols()); allCols.addAll(conf.getValueCols()); - List allStructFieldRefs = ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs(); - List sortedAllCols = allCols.stream().filter(col -> col instanceof ExprNodeColumnDesc). - filter(col -> !extractNumberPart((ExprNodeColumnDesc) col).isEmpty()). - sorted(Comparator.comparingInt(col -> Integer.parseInt(extractNumberPart((ExprNodeColumnDesc) col)))).collect(Collectors.toList()); + List allStructFieldRefs = + ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs(); + List sortedAllCols = allCols.stream().filter(col -> col instanceof ExprNodeColumnDesc) + .filter(col -> !extractNumberPart((ExprNodeColumnDesc) col).isEmpty()) + .sorted(Comparator.comparingInt(col -> Integer.parseInt(extractNumberPart((ExprNodeColumnDesc) col)))) + .collect(Collectors.toList()); if (allStructFieldRefs.size() != sortedAllCols.size()) { return; } for (int i = 0; i < allStructFieldRefs.size(); i++) { ObjectInspector fieldObjectInspector = allStructFieldRefs.get(i).getFieldObjectInspector(); TypeInfo typeInfo = sortedAllCols.get(i).getTypeInfo(); - if (fieldObjectInspector instanceof WritableHiveDecimalObjectInspector && - ((DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo()).getPrecision() > 18 && - (typeInfo instanceof DecimalTypeInfo) && ((DecimalTypeInfo) typeInfo).getPrecision() <= 18) { + if (fieldObjectInspector instanceof WritableHiveDecimalObjectInspector + && ((DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo()).getPrecision() > 18 + && (typeInfo instanceof DecimalTypeInfo) && ((DecimalTypeInfo) typeInfo).getPrecision() <= 18) { decimal128ConvertDecimal64Cols.add(i); } } @@ -463,9 +480,18 @@ public class OmniReduceSinkOperator extends TerminalOperator *

* If distinctColIndices is empty, the object inspector is same as * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} + * + * @param evals evals + * @param distinctColIndices distinctColIndices + * @param outputColNames outputColNames + * @param length length + * @param rowInspector rowInspector + * @return StructObjectInspector + * @throws HiveException HiveException */ protected static StructObjectInspector initEvaluatorsAndReturnStruct(ExprNodeEvaluator[] evals, - List> distinctColIndices, List outputColNames, int length, + List> distinctColIndices, + List outputColNames, int length, ObjectInspector rowInspector) throws HiveException { int inspectorLen = evals.length > length ? length + 1 : evals.length; List sois = new ArrayList(inspectorLen); @@ -521,7 +547,7 @@ public class OmniReduceSinkOperator extends TerminalOperator if (!decimal128ConvertDecimal64Cols.isEmpty()) { input = convertVec(input); } - if (needProject) { + if (isNeedProject) { this.projectOperator.addInput(input); Iterator output = this.projectOperator.getOutput(); while (output.hasNext()) { @@ -572,8 +598,8 @@ public class OmniReduceSinkOperator extends TerminalOperator private void perProcess(Vec[] row, int tag, int index) throws HiveException { try { ObjectInspector rowInspector = inputObjInspectors[tag]; - if (firstRow) { - firstRow = false; + if (isFirstRow) { + isFirstRow = false; // TODO: this is fishy - we init object inspectors based on first tag. We // should either init for each tag, or if rowInspector doesn't really // matter, then we can create this in ctor and get rid of firstRow. @@ -637,7 +663,7 @@ public class OmniReduceSinkOperator extends TerminalOperator final int hashCode; // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] - if (useUniformHash && partitionEval.length > 0) { + if (isUseUniformHash && partitionEval.length > 0) { hashCode = computeMurmurHash(firstKey); } else { hashCode = computeHashCode(row, bucketNumber, index); @@ -648,14 +674,13 @@ public class OmniReduceSinkOperator extends TerminalOperator * in case of TopN for windowing, we need to distinguish between rows with * null partition keys and rows with value 0 for partition keys. */ - boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row, index); + boolean isPartKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row, index); // Try to store the first key. // if TopNHashes aren't active, always forward - // if TopNHashes are active, proceed if not already excluded (i.e order by - // limit) + // if TopNHashes are active, proceed if not already excluded (i.e order by limit) final int firstIndex = (reducerHash != null) - ? reducerHash.tryStoreKey(firstKey, partKeyNull) + ? reducerHash.tryStoreKey(firstKey, isPartKeyNull) : TopNHash.FORWARD; if (firstIndex == TopNHash.EXCLUDE) { return; // Nothing to do. @@ -770,6 +795,7 @@ public class OmniReduceSinkOperator extends TerminalOperator * * @param row the row * @param index the cachedKeys index to write to + * @throws HiveException HiveException */ private void populateCachedDistinctKeys(Object row, int index) throws HiveException { StandardUnionObjectInspector.StandardUnion union; @@ -792,6 +818,12 @@ public class OmniReduceSinkOperator extends TerminalOperator * UDFToInteger(ROW__ID) and buckNum == -1 so that the result of this method * is to return the bucketId extracted from ROW__ID unless it optimized by * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} + * + * @param row row + * @param buckNum buckNum + * @param index index + * @return int + * @throws HiveException HiveException */ private int computeHashCode(Object row, int buckNum, int index) throws HiveException { // Evaluate the HashCode @@ -836,7 +868,8 @@ public class OmniReduceSinkOperator extends TerminalOperator } private Object getOriginValue(Vec vector, int index, ObjectInspector fieldObjectInspector) { - PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveCategory(); + PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = + ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveCategory(); DataType type = vector.getType(); switch (primitiveCategory) { case INT: @@ -852,7 +885,8 @@ public class OmniReduceSinkOperator extends TerminalOperator case SHORT: return ((ShortVec) vector).get(index); case DECIMAL: { - DecimalTypeInfo typeInfo = (DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo(); + DecimalTypeInfo typeInfo = + (DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo(); if (type.getId() == OMNI_LONG) { long value = ((LongVec) vector).get(index); return HiveDecimal.create(value, typeInfo.getScale()); @@ -875,7 +909,8 @@ public class OmniReduceSinkOperator extends TerminalOperator } private Object getWritableValue(Vec vector, int index, ObjectInspector fieldObjectInspector) { - PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveCategory(); + PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = + ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveCategory(); DataType type = vector.getType(); switch (primitiveCategory) { case INT: @@ -891,7 +926,8 @@ public class OmniReduceSinkOperator extends TerminalOperator case SHORT: return new ShortWritable(((ShortVec) vector).get(index)); case DECIMAL: { - DecimalTypeInfo typeInfo = (DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo(); + DecimalTypeInfo typeInfo = + (DecimalTypeInfo) ((WritableHiveDecimalObjectInspector) fieldObjectInspector).getTypeInfo(); if (type.getId() == OMNI_LONG) { long value = ((LongVec) vector).get(index); HiveDecimalWritable hiveDecimalWritable = new HiveDecimalWritable(); @@ -944,11 +980,19 @@ public class OmniReduceSinkOperator extends TerminalOperator return false; } - // Serialize the keys and append the tag + /** + * Serialize the keys and append the tag + * + * @param obj obj + * @param tag tag + * @param distLength distLength + * @return HiveKey + * @throws SerDeException SerDeException + */ protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException { BinaryComparable key = (BinaryComparable) keySerializer.serialize(obj, keyObjectInspector); int keyLength = key.getLength(); - if (tag == -1 || skipTag) { + if (tag == -1 || isSkipTag) { keyWritable.set(key.getBytes(), 0, keyLength); } else { keyWritable.setSize(keyLength); @@ -965,10 +1009,17 @@ public class OmniReduceSinkOperator extends TerminalOperator collect(keyWritable, valueWritable); } + /** + * collect + * + * @param keyWritable keyWritable + * @param valueWritable valueWritable + * @throws IOException IOException + */ protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called - if (null != out) { + if (out != null) { numRows++; runTimeNumRows++; if (LOG.isTraceEnabled()) { @@ -998,12 +1049,12 @@ public class OmniReduceSinkOperator extends TerminalOperator } @Override - protected void closeOp(boolean abort) throws HiveException { - if (!abort && reducerHash != null) { + protected void closeOp(boolean isAbort) throws HiveException { + if (!isAbort && reducerHash != null) { reducerHash.flush(); } runTimeNumRows = numRows; - super.closeOp(abort); + super.closeOp(isAbort); out = null; random = null; reducerHash = null; @@ -1013,6 +1064,8 @@ public class OmniReduceSinkOperator extends TerminalOperator } /** + * getName + * * @return the name of the operator */ @Override @@ -1030,8 +1083,8 @@ public class OmniReduceSinkOperator extends TerminalOperator return false; } - public void setSkipTag(boolean value) { - this.skipTag = value; + public void setSkipTag(boolean isSkipTag) { + this.isSkipTag = isSkipTag; } public void setValueIndex(int[] valueIndex) { @@ -1061,10 +1114,15 @@ public class OmniReduceSinkOperator extends TerminalOperator } @Override - public void setOutputCollector(OutputCollector _out) { - this.out = _out; + public void setOutputCollector(OutputCollector out) { + this.out = out; } + /** + * getConf + * + * @return ReduceSinkDesc + */ public ReduceSinkDesc getConf() { return new OmniReduceSinkDesc(this.conf); } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java index 386ad3686..99563da70 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java @@ -35,7 +35,6 @@ import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -65,29 +64,32 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; +/** + * OmniSelectOperator + * + * @since 2024-01-10 + */ public class OmniSelectOperator extends OmniHiveOperator implements Serializable { private static final long serialVersionUID = 1L; - private final transient Logger LOG = LoggerFactory.getLogger(getClass().getName()); + private static Cache cache = CacheBuilder.newBuilder().concurrencyLevel(8).initialCapacity(10) + .maximumSize(100).recordStats().removalListener(notification -> { + ((OmniProjectOperatorFactory) notification.getValue()).close(); + }).build(); + private static boolean isAddedCloseThread; + + private final transient Logger LOG = LoggerFactory.getLogger(getClass().getName()); private transient OmniProjectOperatorFactory projectOperatorFactory; private transient OmniOperator omniOperator; private transient boolean isSelectStarNoCompute = false; - private transient boolean needSliceVector = false; + private transient boolean isNeedSliceVector = false; private transient Iterator output; - private static Cache cache = CacheBuilder.newBuilder().concurrencyLevel(8).initialCapacity(10) - .maximumSize(100).recordStats().removalListener(notification -> { - ((OmniProjectOperatorFactory) notification.getValue()).close(); - }).build(); - - private static boolean addedCloseThread; - public OmniSelectOperator() { super(); } @@ -96,6 +98,12 @@ public class OmniSelectOperator extends OmniHiveOperator impleme super(ctx); } + /** + * OmniSelectOperator + * + * @param ctx ctx + * @param conf conf + */ public OmniSelectOperator(CompilationOpContext ctx, SelectDesc conf) { super(ctx); this.conf = new OmniSelectDesc(conf); @@ -185,11 +193,11 @@ public class OmniSelectOperator extends OmniHiveOperator impleme SpillConfig.NONE, new OverflowConfig(OverflowConfig.OverflowConfigId.OVERFLOW_CONFIG_NULL), true)); this.omniOperator = this.projectOperatorFactory.createOperator(); cache.put(cacheKey, this.projectOperatorFactory); - if (!addedCloseThread) { + if (!isAddedCloseThread) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { cache.invalidateAll(); })); - addedCloseThread = true; + isAddedCloseThread = true; } } @@ -233,7 +241,7 @@ public class OmniSelectOperator extends OmniHiveOperator impleme List> parentOperators = this.getParentOperators(); List> childOperators = parentOperators.get(0).getChildOperators(); if (childOperators.get(childOperators.size() - 1) != this) { - needSliceVector = true; + isNeedSliceVector = true; } } @@ -244,7 +252,7 @@ public class OmniSelectOperator extends OmniHiveOperator impleme return; } VecBatch input = (VecBatch) row; - if (needSliceVector) { + if (isNeedSliceVector) { Vec[] vectors = input.getVectors(); Vec[] copyVectors = new Vec[vectors.length]; for (int i = 0; i < vectors.length; i++) { @@ -276,7 +284,7 @@ public class OmniSelectOperator extends OmniHiveOperator impleme } @Override - protected void closeOp(boolean abort) throws HiveException { + protected void closeOp(boolean isAbort) throws HiveException { if (projectOperatorFactory != null) { projectOperatorFactory.close(); } @@ -284,7 +292,7 @@ public class OmniSelectOperator extends OmniHiveOperator impleme omniOperator.close(); } output = null; - super.closeOp(abort); + super.closeOp(isAbort); } private TypeInfo createTypeinfo(ExprNodeGenericFuncDesc nodeDesc) { @@ -295,13 +303,19 @@ public class OmniSelectOperator extends OmniHiveOperator impleme return typeInfo; } + /** + * isInvalidSelectColumn + * + * @param colList colList + * @return boolean + */ public static boolean isInvalidSelectColumn(List colList) { - for (ExprNodeDesc exprNodeDesc: colList) { - if (exprNodeDesc instanceof ExprNodeConstantDesc && exprNodeDesc.getTypeInfo().getTypeName().equals("smallint")) { + for (ExprNodeDesc exprNodeDesc : colList) { + if (exprNodeDesc instanceof ExprNodeConstantDesc + && exprNodeDesc.getTypeInfo().getTypeName().equals("smallint")) { return true; } } return false; } - } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java index b5e0373d5..7e438ed7a 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java @@ -40,6 +40,11 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +/** + * OmniSortOperator + * + * @since 2024-01-10 + */ public class OmniSortOperator extends OmniHiveOperator implements Serializable { private static final long serialVersionUID = 1L; @@ -55,6 +60,12 @@ public class OmniSortOperator extends OmniHiveOperator implements super(ctx); } + /** + * OmniSortOperator + * + * @param ctx ctx + * @param groupByDesc groupByDesc + */ public OmniSortOperator(CompilationOpContext ctx, GroupByDesc groupByDesc) { super(ctx); this.conf = new OmniSortDesc(groupByDesc); @@ -105,7 +116,7 @@ public class OmniSortOperator extends OmniHiveOperator implements } @Override - protected void closeOp(boolean abort) throws HiveException { + protected void closeOp(boolean isAbort) throws HiveException { Iterator output = this.omniOperator.getOutput(); while (output.hasNext()) { forward(output.next(), outputObjInspector); @@ -116,6 +127,6 @@ public class OmniSortOperator extends OmniHiveOperator implements if (omniOperator != null) { omniOperator.close(); } - super.closeOp(abort); + super.closeOp(isAbort); } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java index 26020b38e..32e05c93a 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java @@ -54,6 +54,8 @@ import java.util.Set; * Table Scan Operator If the data is coming from the map-reduce framework, just * forward it. This will be needed as part of local work when data is not being * read as part of map-reduce framework + * + * @since 2024-01-10 **/ public class OmniTableScanOperator extends TableScanOperator implements Serializable { private static final long serialVersionUID = 1L; @@ -69,12 +71,10 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ protected transient PrimitiveTypeInfo[] partColTypeInfos; /** - * Kryo ctor. + * OmniTableScanOperator + * + * @param tableScanOperator tableScanOperator */ - protected OmniTableScanOperator() { - super(); - } - public OmniTableScanOperator(TableScanOperator tableScanOperator) { super(tableScanOperator.getCompilationOpContext()); this.conf = tableScanOperator.getConf(); @@ -83,6 +83,13 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ this.setSchema(tableScanOperator.getSchema()); } + /** + * Kryo ctor. + */ + protected OmniTableScanOperator() { + super(); + } + @Override public String getName() { return "OMNI_TS"; @@ -149,6 +156,12 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ forward(vecBatch); } + /** + * forward + * + * @param vecBatch vecBatch + * @throws HiveException HiveException + */ protected void forward(VecBatch vecBatch) throws HiveException { vecBatches[0] = vecBatch; this.runTimeNumRows += vecBatch.getRowCount(); @@ -183,7 +196,7 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ } @Override - public void closeOp(boolean abort) throws HiveException { - super.closeOp(abort); + public void closeOp(boolean isAbort) throws HiveException { + super.closeOp(isAbort); } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java index 207680173..dbeb4837a 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java @@ -62,15 +62,20 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +/** + * OmniVectorOperator + * + * @since 2024-01-10 + */ public class OmniVectorOperator extends OmniHiveOperator { - private boolean isToVector; - - private transient VecConverter[][] converters; - public transient VectorCache[] vectorCache; public transient VecBufferCache[] vecBufferCaches; + private boolean isToVector; + + private transient VecConverter[][] converters; + private transient boolean isKeyValue; private transient List[] flatFields; @@ -97,6 +102,12 @@ public class OmniVectorOperator extends OmniHiveOperator { super(ctx); } + /** + * OmniVectorOperator + * + * @param ctx ctx + * @param conf conf + */ public OmniVectorOperator(CompilationOpContext ctx, OmniVectorDesc conf) { super(ctx); this.conf = conf; @@ -106,7 +117,9 @@ public class OmniVectorOperator extends OmniHiveOperator { @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); - if (parentOperators.isEmpty() || (parentOperators.get(0) instanceof LimitOperator && parentOperators.get(0).getParentOperators().isEmpty())) { + if (parentOperators.isEmpty() + || (parentOperators.get(0) instanceof LimitOperator + && parentOperators.get(0).getParentOperators().isEmpty())) { isKeyValue = true; } converters = new VecConverter[inputObjInspectors.length][]; @@ -128,7 +141,8 @@ public class OmniVectorOperator extends OmniHiveOperator { } ReduceWork reduceWork = Utilities.getReduceWork(hconf); if (reduceWork != null && (reduceWork.getKeyDesc().getProperties().get(SERIALIZATION_LIB) - .equals(OmniVecBatchSerDe.class.getName()) || reduceWork.getKeyDesc().getProperties().get(SERIALIZATION_LIB) + .equals(OmniVecBatchSerDe.class.getName()) + || reduceWork.getKeyDesc().getProperties().get(SERIALIZATION_LIB) .equals(OmniVecBatchOrderSerDe.class.getName()))) { isVecBatchSerDe = true; } @@ -168,7 +182,8 @@ public class OmniVectorOperator extends OmniHiveOperator { } } if (!this.childOperators.isEmpty() && childOperators.get(0) instanceof OmniMergeJoinOperator - || childOperators.get(0) instanceof OmniMapJoinOperator && ((OmniMapJoinDesc) childOperators.get(0).getConf()).isDynamicPartitionHashJoin()) { + || childOperators.get(0) instanceof OmniMapJoinOperator + && ((OmniMapJoinDesc) childOperators.get(0).getConf()).isDynamicPartitionHashJoin()) { List fieldNames = new ArrayList<>(); for (int i = 0; i < inputObjInspectors.length; i++) { fieldNames.add(String.valueOf(i)); @@ -181,6 +196,12 @@ public class OmniVectorOperator extends OmniHiveOperator { } } + /** + * convertLazyToJavaInspector + * + * @param allStructFieldRefs allStructFieldRefs + * @return StandardStructObjectInspector + */ public static StandardStructObjectInspector convertLazyToJavaInspector( List allStructFieldRefs) { List structFieldNames = new ArrayList<>(); @@ -194,8 +215,8 @@ public class OmniVectorOperator extends OmniHiveOperator { case VARCHAR: case CHAR: case STRING: - LazyObjectInspectorParameters lazyParam = new LazyObjectInspectorParametersImpl(false, (byte) 0, - false, null, null, null); + LazyObjectInspectorParameters lazyParam = new LazyObjectInspectorParametersImpl(false, + (byte) 0, false, null, null, null); structFieldObjectInspectors.add(LazyPrimitiveObjectInspectorFactory.getLazyObjectInspector( ((PrimitiveObjectInspector) field.getFieldObjectInspector()).getTypeInfo(), lazyParam)); break; @@ -342,7 +363,7 @@ public class OmniVectorOperator extends OmniHiveOperator { } @Override - public void close(boolean abort) throws HiveException { + public void close(boolean isAbort) throws HiveException { // here to process the remaining data in the cache if (source != null) { for (int i = 0; i < source.length; i++) { @@ -361,9 +382,15 @@ public class OmniVectorOperator extends OmniHiveOperator { rowCount[i] = 0; } } - super.close(abort); + super.close(isAbort); } + /** + * pushRestData + * + * @param tag tag + * @throws HiveException HiveException + */ public void pushRestData(int tag) throws HiveException { if (rowCount[tag] > 0) { forwardNext(tag); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java index 686b88a0a..889bd0fe9 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java @@ -50,18 +50,25 @@ import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; +/** + * OmniVectorWithSortOperator + * + * @since 2024-01-10 + */ public class OmniVectorWithSortOperator extends OmniHiveOperator { public transient VectorCache[] vectorCache; public transient VecBufferCache[] vecBufferCaches; + public transient Iterator[] outputs; + private transient List[] flatFields; private transient RecordSource[] source; private transient boolean[] fetchDone; - private transient boolean allFetchDone; + private transient boolean isAllFetchDone; private int[] rowCount; @@ -73,14 +80,18 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator private transient OmniOperator[] sortOperators; - public transient Iterator[] outputs; - private int posBigTable; public OmniVectorWithSortOperator() { super(); } + /** + * OmniVectorWithSortOperator + * + * @param ctx ctx + * @param conf conf + */ public OmniVectorWithSortOperator(CompilationOpContext ctx, OmniVectorDesc conf) { super(ctx); this.conf = conf; @@ -122,7 +133,7 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, Arrays.asList(soi)); generateSortOperator(); posBigTable = ((OmniMergeJoinOperator) childOperators.get(0)).getPosBigTable(); - allFetchDone = false; + isAllFetchDone = false; } private void generateSortOperator() { @@ -153,9 +164,9 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator @Override public void process(Object row, int tag) throws HiveException { - if (tag == posBigTable && !allFetchDone) { + if (tag == posBigTable && !isAllFetchDone) { dealSource(); - allFetchDone = true; + isAllFetchDone = true; } for (int i = 0; i < fields[tag].size(); i++) { Object structFieldData = soi[tag].getStructFieldData(row, fields[tag].get(i)); @@ -185,10 +196,10 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator } @Override - public void close(boolean abort) throws HiveException { - if (!allFetchDone) { + public void close(boolean isAbort) throws HiveException { + if (!isAllFetchDone) { dealSource(); - allFetchDone = true; + isAllFetchDone = true; } for (int i = 0; i < rowCount.length; i++) { if (rowCount[i] > 0) { @@ -204,9 +215,15 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator if (vecBatch != null) { forward(outputs[posBigTable].next(), posBigTable); } - super.close(abort); + super.close(isAbort); } + /** + * pushRecord + * + * @param tag tag + * @throws HiveException HiveException + */ public void pushRecord(int tag) throws HiveException { forward(outputs[tag].next(), tag); } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java index b6afde6d8..25b3f5447 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java @@ -26,11 +26,18 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import java.util.Iterator; - +/** + * OmniVectorizedTableScanOperator + * + * @since 2024-01-10 + */ public class OmniVectorizedTableScanOperator extends OmniTableScanOperator { private static final long serialVersionUID = 1L; + public OmniVectorizedTableScanOperator(TableScanOperator tableScanOperator) { + super(tableScanOperator); + } + /** * Kryo ctor. */ @@ -38,10 +45,6 @@ public class OmniVectorizedTableScanOperator extends OmniTableScanOperator { super(); } - public OmniVectorizedTableScanOperator(TableScanOperator tableScanOperator) { - super(tableScanOperator); - } - @Override protected void forward(Object row, ObjectInspector rowInspector, boolean isVectorized) throws HiveException { this.runTimeNumRows += ((VecBatchWrapper) row).getVecBatch().getRowCount(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java index 60fb4bb35..fe0c1928a 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java @@ -79,6 +79,11 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +/** + * OmniVectorizedVectorOperator + * + * @since 2024-01-10 + */ public class OmniVectorizedVectorOperator extends OmniHiveOperator { private boolean isToVector; @@ -112,6 +117,14 @@ public class OmniVectorizedVectorOperator extends OmniHiveOperator 0) { forwardNext(); rowCount = 0; } - super.close(abort); + super.close(isAbort); } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OperatorUtils.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OperatorUtils.java index a28eb7cfc..541d10edc 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OperatorUtils.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OperatorUtils.java @@ -1,3 +1,21 @@ +/* + * Copyright (C) 2023-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.huawei.boostkit.hive; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -9,7 +27,18 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +/** + * OperatorUtils + * + * @since 2024-01-10 + */ public class OperatorUtils { + /** + * expandInspector + * + * @param inspector inspector + * @return ObjectInspector + */ public static ObjectInspector expandInspector(ObjectInspector inspector) { List fields = new ArrayList<>(); List keyValueFields = ((StructObjectInspector) inspector).getAllStructFieldRefs(); -- Gitee From 8a2592a84539110957fe9860f00b3cb4e730151a Mon Sep 17 00:00:00 2001 From: panmingyi Date: Mon, 17 Jun 2024 16:51:39 +0800 Subject: [PATCH 2/4] Clean Code --- .../java/com/huawei/boostkit/hive/OmniMapJoinOperator.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java index ff3b24e0f..c62102f8b 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java @@ -130,6 +130,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator private static final Logger LOG = LoggerFactory.getLogger(OmniMapJoinOperator.class.getName()); private static boolean isAddedCloseThread; private static Map countShareBuildIds = new HashMap<>(); + private static int buildNodeId; private transient OmniOperatorFactory omniLookupJoinWithExprOperatorFactory; @@ -154,10 +155,8 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator private transient List[] buildVecs; private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; - private static boolean addedCloseThread; private transient Iterator output; - private static int buildNodeId; - private static Map countShareBuildIds = new HashMap<>(); + public OmniMapJoinOperator() { super(); -- Gitee From 56a4ad922c3623f0328ffc5c86601689739bee6f Mon Sep 17 00:00:00 2001 From: panmingyi Date: Mon, 17 Jun 2024 19:18:03 +0800 Subject: [PATCH 3/4] Clean Code --- .../boostkit/hive/OmniMapJoinOperator.java | 56 ------------------- .../boostkit/hive/OmniMergeJoinOperator.java | 35 ------------ .../hive/OmniMergeJoinWithSortOperator.java | 5 -- .../huawei/boostkit/hive/OmniPTFOperator.java | 14 ----- .../boostkit/hive/OmniReduceSinkOperator.java | 53 ------------------ .../boostkit/hive/OmniSelectOperator.java | 17 ------ .../boostkit/hive/OmniSortOperator.java | 11 ---- .../boostkit/hive/OmniTableScanOperator.java | 13 ----- .../boostkit/hive/OmniVectorOperator.java | 23 -------- .../hive/OmniVectorWithSortOperator.java | 17 ------ .../hive/OmniVectorizedTableScanOperator.java | 5 -- .../hive/OmniVectorizedVectorOperator.java | 18 ------ .../huawei/boostkit/hive/OperatorUtils.java | 11 ---- 13 files changed, 278 deletions(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java index c62102f8b..a14a05320 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java @@ -107,16 +107,8 @@ import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; -/** - * OmniMapJoinOperator - * - * @since 2024-01-10 - */ public class OmniMapJoinOperator extends AbstractMapJoinOperator implements Serializable, VectorizationContextRegion { - /** - * JOIN_TYPE_MAP - */ public static final Map JOIN_TYPE_MAP = new HashMap() { { put(JoinDesc.INNER_JOIN, JoinType.OMNI_JOIN_TYPE_INNER); @@ -166,12 +158,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator super(ctx); } - /** - * OmniMapJoinOperator - * - * @param mjop mjop - * @param mapJoinDesc mapJoinDesc - */ public OmniMapJoinOperator(AbstractMapJoinOperator mjop, MapJoinDesc mapJoinDesc) { super(mjop); this.conf = new OmniMapJoinDesc(mapJoinDesc); @@ -179,14 +165,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator this.vectorizationContext = null; } - /** - * OmniMapJoinOperator - * - * @param mjop mjop - * @param mapJoinDesc mapJoinDesc - * @param changedCtx changedCtx - * @param vectorizationContext vectorizationContext - */ public OmniMapJoinOperator(AbstractMapJoinOperator mjop, MapJoinDesc mapJoinDesc, boolean changedCtx, VectorizationContext vectorizationContext) { super(mjop); @@ -543,11 +521,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator valueVecs = null; } - /** - * generateMapMetaData - * - * @throws HiveException HiveException - */ public void generateMapMetaData() throws HiveException { try { TableDesc keyTableDesc = conf.getKeyTblDesc(); @@ -949,11 +922,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator private KeyValueReader originValueReader; private KeyValuesReader originValuesReader; - /** - * OmniReaderWrapper - * - * @param reader reader - */ public OmniReaderWrapper(Reader reader) { if (reader instanceof KeyValueReader) { isSingleValue = true; @@ -963,12 +931,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } } - /** - * next - * - * @return boolean - * @throws IOException IOException - */ public boolean next() throws IOException { if (isSingleValue) { return originValueReader.next(); @@ -979,12 +941,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator return originValuesReader.next(); } - /** - * hasNextValue - * - * @return boolean - * @throws IOException IOException - */ public boolean hasNextValue() throws IOException { if (isSingleValue) { return false; @@ -992,12 +948,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator return originValuesReader.getCurrentValues().iterator().hasNext(); } - /** - * getCurrentKey - * - * @return Object - * @throws IOException IOException - */ public Object getCurrentKey() throws IOException { if (isSingleValue) { return originValueReader.getCurrentKey(); @@ -1005,12 +955,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator return originValuesReader.getCurrentKey(); } - /** - * getCurrentValue - * - * @return Object - * @throws IOException IOException - */ public Object getCurrentValue() throws IOException { if (isSingleValue) { return originValueReader.getCurrentValue(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java index 2e34297bf..1c3859982 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java @@ -80,11 +80,6 @@ import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; -/** - * OmniMergeJoinOperator - * - * @since 2024-01-10 - */ public class OmniMergeJoinOperator extends CommonJoinOperator implements Serializable { protected static final int SMJ_NEED_ADD_STREAM_TBL_DATA = 2; protected static final int SMJ_NEED_ADD_BUFFERED_TBL_DATA = 3; @@ -115,12 +110,6 @@ public class OmniMergeJoinOperator extends CommonJoinOperator super(ctx); } - /** - * OmniMergeJoinOperator - * - * @param ctx ctx - * @param commonMergeJoinDesc commonMergeJoinDesc - */ public OmniMergeJoinOperator(CompilationOpContext ctx, CommonMergeJoinDesc commonMergeJoinDesc) { super(ctx); this.conf = new OmniMergeJoinDesc(commonMergeJoinDesc); @@ -397,13 +386,6 @@ public class OmniMergeJoinOperator extends CommonJoinOperator } } - /** - * processOmni - * - * @param opIndex opIndex - * @param bufferIndex bufferIndex - * @throws HiveException HiveException - */ protected void processOmni(int opIndex, int bufferIndex) throws HiveException { if (flowControlCode[opIndex] != SCAN_FINISH && resCode[opIndex] == RES_INIT) { if (flowControlCode[opIndex] == SMJ_NEED_ADD_STREAM_TBL_DATA) { @@ -440,17 +422,6 @@ public class OmniMergeJoinOperator extends CommonJoinOperator resCode[opIndex] = RES_INIT; } - /** - * processOmniSmj - * - * @param opIndex opIndex - * @param dataIndex dataIndex - * @param data data - * @param operators operators - * @param controlCode controlCode - * @param types types - * @throws HiveException HiveException - */ protected void processOmniSmj(int opIndex, int dataIndex, Queue[] data, OmniOperator[] operators, int controlCode, DataType[][] types) throws HiveException { if (!data[opIndex].isEmpty()) { @@ -539,12 +510,6 @@ public class OmniMergeJoinOperator extends CommonJoinOperator super.close(isAbort); } - /** - * createEofVecBatch - * - * @param dataTypes dataTypes - * @return VecBatch VecBatch - */ protected VecBatch createEofVecBatch(DataType[] dataTypes) { Vec[] vecs = new Vec[dataTypes.length]; for (int i = 0; i < dataTypes.length; i++) { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java index 14dc3bc36..d84ae3e25 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinWithSortOperator.java @@ -30,11 +30,6 @@ import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import java.util.Queue; -/** - * OmniMergeJoinWithSortOperator - * - * @since 2024-01-10 - */ public class OmniMergeJoinWithSortOperator extends OmniMergeJoinOperator { private transient OmniVectorWithSortOperator omniVectorWithSortOperator; diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java index e1ef3f4d1..6fab01863 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java @@ -63,11 +63,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -/** - * OmniPTFOperator - * - * @since 2024-01-10 - */ public class OmniPTFOperator extends OmniHiveOperator implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(OmniPTFOperator.class.getName()); @@ -86,12 +81,6 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se super(ctx); } - /** - * OmniPTFOperator - * - * @param ctx ctx - * @param conf conf - */ public OmniPTFOperator(CompilationOpContext ctx, PTFDesc conf) { super(ctx); this.conf = new OmniPTFDesc(conf); @@ -288,9 +277,6 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se /** * Initialize the visitor to use the QueryDefDeserializer Use the order * defined in QueryDefWalker to visit the QueryDef - * - * @param hiveConf hiveConf - * @throws HiveException HiveException */ protected void reconstructQueryDef(Configuration hiveConf) throws HiveException { PTFDeserializer ds = new PTFDeserializer(conf, (StructObjectInspector) inputObjInspectors[0], hiveConf); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java index 7ff9c638b..84079c867 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java @@ -126,18 +126,10 @@ import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; -/** - * OmniReduceSinkOperator - * - * @since 2024-01-10 - */ public class OmniReduceSinkOperator extends TerminalOperator implements Serializable, TopNHash.BinaryCollector { private static final long serialVersionUID = 1L; - /** - * OutPutCollector - */ protected transient OutputCollector out; /** @@ -241,13 +233,6 @@ public class OmniReduceSinkOperator extends TerminalOperator super(); } - /** - * OmniReduceSinkOperator - * - * @param ctx ctx - * @param conf conf - * @param isReduceSinkCanReplaceKey isReduceSinkCanReplaceKey - */ public OmniReduceSinkOperator(CompilationOpContext ctx, ReduceSinkDesc conf, boolean isReduceSinkCanReplaceKey) { super(ctx); this.conf = conf; @@ -480,14 +465,6 @@ public class OmniReduceSinkOperator extends TerminalOperator *

* If distinctColIndices is empty, the object inspector is same as * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} - * - * @param evals evals - * @param distinctColIndices distinctColIndices - * @param outputColNames outputColNames - * @param length length - * @param rowInspector rowInspector - * @return StructObjectInspector - * @throws HiveException HiveException */ protected static StructObjectInspector initEvaluatorsAndReturnStruct(ExprNodeEvaluator[] evals, List> distinctColIndices, @@ -792,10 +769,6 @@ public class OmniReduceSinkOperator extends TerminalOperator /** * Populate distinct keys part of cachedKeys for a particular row. - * - * @param row the row - * @param index the cachedKeys index to write to - * @throws HiveException HiveException */ private void populateCachedDistinctKeys(Object row, int index) throws HiveException { StandardUnionObjectInspector.StandardUnion union; @@ -818,12 +791,6 @@ public class OmniReduceSinkOperator extends TerminalOperator * UDFToInteger(ROW__ID) and buckNum == -1 so that the result of this method * is to return the bucketId extracted from ROW__ID unless it optimized by * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} - * - * @param row row - * @param buckNum buckNum - * @param index index - * @return int - * @throws HiveException HiveException */ private int computeHashCode(Object row, int buckNum, int index) throws HiveException { // Evaluate the HashCode @@ -982,12 +949,6 @@ public class OmniReduceSinkOperator extends TerminalOperator /** * Serialize the keys and append the tag - * - * @param obj obj - * @param tag tag - * @param distLength distLength - * @return HiveKey - * @throws SerDeException SerDeException */ protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException { BinaryComparable key = (BinaryComparable) keySerializer.serialize(obj, keyObjectInspector); @@ -1009,13 +970,6 @@ public class OmniReduceSinkOperator extends TerminalOperator collect(keyWritable, valueWritable); } - /** - * collect - * - * @param keyWritable keyWritable - * @param valueWritable valueWritable - * @throws IOException IOException - */ protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called @@ -1064,8 +1018,6 @@ public class OmniReduceSinkOperator extends TerminalOperator } /** - * getName - * * @return the name of the operator */ @Override @@ -1118,11 +1070,6 @@ public class OmniReduceSinkOperator extends TerminalOperator this.out = out; } - /** - * getConf - * - * @return ReduceSinkDesc - */ public ReduceSinkDesc getConf() { return new OmniReduceSinkDesc(this.conf); } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java index 99563da70..36313018f 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSelectOperator.java @@ -65,11 +65,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -/** - * OmniSelectOperator - * - * @since 2024-01-10 - */ public class OmniSelectOperator extends OmniHiveOperator implements Serializable { private static final long serialVersionUID = 1L; private static Cache cache = CacheBuilder.newBuilder().concurrencyLevel(8).initialCapacity(10) @@ -98,12 +93,6 @@ public class OmniSelectOperator extends OmniHiveOperator impleme super(ctx); } - /** - * OmniSelectOperator - * - * @param ctx ctx - * @param conf conf - */ public OmniSelectOperator(CompilationOpContext ctx, SelectDesc conf) { super(ctx); this.conf = new OmniSelectDesc(conf); @@ -303,12 +292,6 @@ public class OmniSelectOperator extends OmniHiveOperator impleme return typeInfo; } - /** - * isInvalidSelectColumn - * - * @param colList colList - * @return boolean - */ public static boolean isInvalidSelectColumn(List colList) { for (ExprNodeDesc exprNodeDesc : colList) { if (exprNodeDesc instanceof ExprNodeConstantDesc diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java index 7e438ed7a..945610bc9 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniSortOperator.java @@ -40,11 +40,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -/** - * OmniSortOperator - * - * @since 2024-01-10 - */ public class OmniSortOperator extends OmniHiveOperator implements Serializable { private static final long serialVersionUID = 1L; @@ -60,12 +55,6 @@ public class OmniSortOperator extends OmniHiveOperator implements super(ctx); } - /** - * OmniSortOperator - * - * @param ctx ctx - * @param groupByDesc groupByDesc - */ public OmniSortOperator(CompilationOpContext ctx, GroupByDesc groupByDesc) { super(ctx); this.conf = new OmniSortDesc(groupByDesc); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java index 32e05c93a..c36c81186 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniTableScanOperator.java @@ -54,8 +54,6 @@ import java.util.Set; * Table Scan Operator If the data is coming from the map-reduce framework, just * forward it. This will be needed as part of local work when data is not being * read as part of map-reduce framework - * - * @since 2024-01-10 **/ public class OmniTableScanOperator extends TableScanOperator implements Serializable { private static final long serialVersionUID = 1L; @@ -70,11 +68,6 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ protected transient PrimitiveTypeInfo[] partColTypeInfos; - /** - * OmniTableScanOperator - * - * @param tableScanOperator tableScanOperator - */ public OmniTableScanOperator(TableScanOperator tableScanOperator) { super(tableScanOperator.getCompilationOpContext()); this.conf = tableScanOperator.getConf(); @@ -156,12 +149,6 @@ public class OmniTableScanOperator extends TableScanOperator implements Serializ forward(vecBatch); } - /** - * forward - * - * @param vecBatch vecBatch - * @throws HiveException HiveException - */ protected void forward(VecBatch vecBatch) throws HiveException { vecBatches[0] = vecBatch; this.runTimeNumRows += vecBatch.getRowCount(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java index dbeb4837a..9cdd049a9 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorOperator.java @@ -62,11 +62,6 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; -/** - * OmniVectorOperator - * - * @since 2024-01-10 - */ public class OmniVectorOperator extends OmniHiveOperator { public transient VectorCache[] vectorCache; @@ -102,12 +97,6 @@ public class OmniVectorOperator extends OmniHiveOperator { super(ctx); } - /** - * OmniVectorOperator - * - * @param ctx ctx - * @param conf conf - */ public OmniVectorOperator(CompilationOpContext ctx, OmniVectorDesc conf) { super(ctx); this.conf = conf; @@ -196,12 +185,6 @@ public class OmniVectorOperator extends OmniHiveOperator { } } - /** - * convertLazyToJavaInspector - * - * @param allStructFieldRefs allStructFieldRefs - * @return StandardStructObjectInspector - */ public static StandardStructObjectInspector convertLazyToJavaInspector( List allStructFieldRefs) { List structFieldNames = new ArrayList<>(); @@ -385,12 +368,6 @@ public class OmniVectorOperator extends OmniHiveOperator { super.close(isAbort); } - /** - * pushRestData - * - * @param tag tag - * @throws HiveException HiveException - */ public void pushRestData(int tag) throws HiveException { if (rowCount[tag] > 0) { forwardNext(tag); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java index 889bd0fe9..80dfabd7c 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java @@ -50,11 +50,6 @@ import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; -/** - * OmniVectorWithSortOperator - * - * @since 2024-01-10 - */ public class OmniVectorWithSortOperator extends OmniHiveOperator { public transient VectorCache[] vectorCache; @@ -86,12 +81,6 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator super(); } - /** - * OmniVectorWithSortOperator - * - * @param ctx ctx - * @param conf conf - */ public OmniVectorWithSortOperator(CompilationOpContext ctx, OmniVectorDesc conf) { super(ctx); this.conf = conf; @@ -218,12 +207,6 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator super.close(isAbort); } - /** - * pushRecord - * - * @param tag tag - * @throws HiveException HiveException - */ public void pushRecord(int tag) throws HiveException { forward(outputs[tag].next(), tag); } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java index 25b3f5447..a986e048f 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedTableScanOperator.java @@ -26,11 +26,6 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -/** - * OmniVectorizedTableScanOperator - * - * @since 2024-01-10 - */ public class OmniVectorizedTableScanOperator extends OmniTableScanOperator { private static final long serialVersionUID = 1L; diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java index fe0c1928a..a13d99743 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java @@ -79,11 +79,6 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; -/** - * OmniVectorizedVectorOperator - * - * @since 2024-01-10 - */ public class OmniVectorizedVectorOperator extends OmniHiveOperator { private boolean isToVector; @@ -117,14 +112,6 @@ public class OmniVectorizedVectorOperator extends OmniHiveOperator fields = new ArrayList<>(); List keyValueFields = ((StructObjectInspector) inspector).getAllStructFieldRefs(); -- Gitee From 5a79b80dfc23106887581e51749bfd6a375d6fa9 Mon Sep 17 00:00:00 2001 From: panmingyi Date: Tue, 18 Jun 2024 14:22:12 +0800 Subject: [PATCH 4/4] Clean Code --- .../java/com/huawei/boostkit/hive/OmniMapJoinOperator.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java index a14a05320..a0c592f7e 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java @@ -248,11 +248,9 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator for (Map.Entry entry : countShareBuildIds.entrySet()) { Integer value = entry.getValue(); for (int i = 0; i < value; i++) { - OmniHashBuilderWithExprOperatorFactory - .dereferenceHashBuilderOperatorAndFactory(entry.getKey()); + OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(entry.getKey()); } - if (OmniHashBuilderWithExprOperatorFactory - .getHashBuilderOperatorFactory(entry.getKey()) == null) { + if (OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(entry.getKey()) == null) { LOG.info("release operatorFactor of buildNodeId = " + entry.getKey() + " succeed"); } else { LOG.error("release operatorFactor of buildNodeId = " + entry.getKey() + " failed"); -- Gitee