From 295b2b2fde4d5648f10418ff4664bbf0605664a3 Mon Sep 17 00:00:00 2001 From: ycsongcs Date: Thu, 16 May 2024 10:48:00 +0800 Subject: [PATCH 1/2] [hive-extension] fix the agg number acquisition issue --- .../java/com/huawei/boostkit/hive/OmniGroupByOperator.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java index d88ffc24b..1dfa13b94 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java @@ -489,6 +489,13 @@ public class OmniGroupByOperator extends OmniHiveOperator imple Matcher matcherValue = patternValue.matcher(name); if (matcherValue.find()) { offset = numKeys; + List keyStructFieldRefs = + ((StandardStructObjectInspector) allStructFieldRefs.get(0).getFieldObjectInspector()) + .getAllStructFieldRefs(); + + if (numKeys != keyStructFieldRefs.size()) { + offset = keyStructFieldRefs.size(); + } } else { offset = 0; } -- Gitee From 17713f127f1b9318628fe94f13064d2e0af6da3e Mon Sep 17 00:00:00 2001 From: zhangyuxi <1434187877@qq.com> Date: Fri, 17 May 2024 14:17:06 +0800 Subject: [PATCH 2/2] [hive-extension] fix align the next vectors and outputObjectInspectors --- .../boostkit/hive/OmniGroupByOperator.java | 108 ++++++++++++------ 1 file changed, 73 insertions(+), 35 deletions(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java index 1dfa13b94..eb5f3b6ac 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java @@ -514,48 +514,55 @@ public class OmniGroupByOperator extends OmniHiveOperator imple return structField.getFieldID(); } + private Vec setNewVec(Vec vec, Vec newVec, int vecIndex) { + int rowCount = vec.getSize(); + DataType.DataTypeId dataTypeId = vec.getType().getId(); + Vec flatVec = (vec instanceof DictionaryVec) ? ((DictionaryVec) vec).expandDictionary() : vec; + int[] rawValueOffset = (dataTypeId == OMNI_VARCHAR || dataTypeId == OMNI_CHAR) ? ((VarcharVec) flatVec).getRawValueOffset() : new int[0]; + switch (dataTypeId) { + case OMNI_INT: + case OMNI_DATE32: + ((IntVec) newVec).put(((IntVec) flatVec).get(0, rowCount), vecIndex * rowCount, 0, rowCount); + break; + case OMNI_LONG: + case OMNI_DATE64: + case OMNI_DECIMAL64: + ((LongVec) newVec).put(((LongVec) flatVec).get(0, rowCount), vecIndex * rowCount, 0, rowCount); + break; + case OMNI_DOUBLE: + ((DoubleVec) newVec).put(((DoubleVec) flatVec).get(0, rowCount), vecIndex * rowCount, 0, rowCount); + break; + case OMNI_BOOLEAN: + ((BooleanVec) newVec).put(((BooleanVec) flatVec).get(0, rowCount), vecIndex * rowCount, 0, rowCount); + break; + case OMNI_SHORT: + ((ShortVec) newVec).put(((ShortVec) flatVec).get(0, rowCount), vecIndex * rowCount, 0, rowCount); + break; + case OMNI_DECIMAL128: + long[] values = ((Decimal128Vec) flatVec).get(0, rowCount); + ((Decimal128Vec) newVec).put(values, vecIndex * rowCount, 0, values.length); + break; + case OMNI_VARCHAR: + case OMNI_CHAR: + ((VarcharVec) newVec).put(vecIndex * rowCount, ((VarcharVec) flatVec).get(0, rowCount), 0, + rawValueOffset, 0, rowCount); + break; + default: + throw new RuntimeException("Not support dataType, dataTypeId: " + dataTypeId); + } + + return newVec; + } + private Vec expandVec(Vec vec, long mask) { int rowCount = vec.getSize(); int groupingSetSize = groupingSets.size(); Vec newVec = VecFactory.createFlatVec(rowCount * groupingSetSize, vec.getType()); - Vec flatVec = (vec instanceof DictionaryVec) ? ((DictionaryVec) vec).expandDictionary() : vec; byte[] rawValueNulls = vec.getRawValueNulls(); - DataType.DataTypeId dataTypeId = vec.getType().getId(); - int[] rawValueOffset = (dataTypeId == OMNI_VARCHAR || dataTypeId == OMNI_CHAR) ? ((VarcharVec) flatVec).getRawValueOffset() : new int[0]; for (int i = 0; i < groupingSetSize; i++) { newVec.setNulls(i * rowCount, rawValueNulls, 0, rowCount); if ((groupingSets.get(i) & mask) == 0) { - switch (dataTypeId) { - case OMNI_INT: - case OMNI_DATE32: - ((IntVec) newVec).put(((IntVec) flatVec).get(0, rowCount), i * rowCount, 0, rowCount); - break; - case OMNI_LONG: - case OMNI_DATE64: - case OMNI_DECIMAL64: - ((LongVec) newVec).put(((LongVec) flatVec).get(0, rowCount), i * rowCount, 0, rowCount); - break; - case OMNI_DOUBLE: - ((DoubleVec) newVec).put(((DoubleVec) flatVec).get(0, rowCount), i * rowCount, 0, rowCount); - break; - case OMNI_BOOLEAN: - ((BooleanVec) newVec).put(((BooleanVec) flatVec).get(0, rowCount), i * rowCount, 0, rowCount); - break; - case OMNI_SHORT: - ((ShortVec) newVec).put(((ShortVec) flatVec).get(0, rowCount), i * rowCount, 0, rowCount); - break; - case OMNI_DECIMAL128: - long[] values = ((Decimal128Vec) flatVec).get(0, rowCount); - ((Decimal128Vec) newVec).put(values, i * rowCount, 0, values.length); - break; - case OMNI_VARCHAR: - case OMNI_CHAR: - ((VarcharVec) newVec).put(i * rowCount, ((VarcharVec) flatVec).get(0, rowCount), 0, - rawValueOffset, 0, rowCount); - break; - default: - throw new RuntimeException("Not support dataType, dataTypeId: " + dataTypeId); - } + setNewVec(vec, newVec, i); } else { byte[] nulls = new byte[rowCount]; Arrays.fill(nulls, (byte) 1); @@ -565,6 +572,12 @@ public class OmniGroupByOperator extends OmniHiveOperator imple return newVec; } + private Vec expandVec(Vec vec) { + int rowCount = vec.getSize(); + Vec newVec = VecFactory.createFlatVec(rowCount, vec.getType()); + return setNewVec(vec, newVec, 0); + } + private Set getAggChannels(List> nodes) { Set aggChannels = new HashSet(); for (List node : nodes) { @@ -742,6 +755,31 @@ public class OmniGroupByOperator extends OmniHiveOperator imple return vOutContext; } + private VecBatch getAlignedNextVecBatch(VecBatch vecBatchNext) { + List allStructFieldRefByOutput = ((StandardStructObjectInspector) outputObjInspector).getAllStructFieldRefs(); + int len = allStructFieldRefByOutput.size(); + if (len != vecBatchNext.getVectorCount()) { + Vec[] newVecBatchNext = new Vec[len]; + for (int nextIndex = 0, i = 0; i < len; i++) { + if (allStructFieldRefByOutput.get(i).getFieldObjectInspector().getTypeName().equals("void")) { + if (vecBatchNext.getVector(nextIndex).getType().getId().name().equals("OMNI_BOOLEAN")) { + newVecBatchNext[i] = expandVec(vecBatchNext.getVector(nextIndex)); + nextIndex++; + continue; + } + if (keyFields.get(i) instanceof ExprNodeConstantEvaluator) { + newVecBatchNext[i] = createConstantVec((ExprNodeConstantEvaluator) keyFields.get(i), VectorCache.BATCH); + } + continue; + } + newVecBatchNext[i] = expandVec(vecBatchNext.getVector(nextIndex)); + nextIndex++; + } + return new VecBatch(newVecBatchNext); + } + return vecBatchNext; + } + @Override protected void closeOp(boolean abort) throws HiveException { if (!abort) { @@ -754,7 +792,7 @@ public class OmniGroupByOperator extends OmniHiveOperator imple } else { Iterator output = this.omniOperator.getOutput(); while (output.hasNext()) { - VecBatch next = output.next(); + VecBatch next = getAlignedNextVecBatch(output.next()); if (outputObjInspector instanceof StandardStructObjectInspector && next.getVectorCount() != ((StandardStructObjectInspector) outputObjInspector).getAllStructFieldRefs().size()) { next = removeVector(next, numKeys - 1); -- Gitee