diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java index cb39f034cb97718f03bff74935915cfbcb3f303a..e873c7e8e6eeb2f693b5deaac49a58c880d1fdcc 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java @@ -157,6 +157,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { private Set reduceSinkReplaceableWorkName = new HashSet<>(); private final Set inputUnReplaceableWork = new HashSet<>(); + private final List reducesinkNeedSort = new ArrayList<>(); private TezWork tezWork; @@ -176,6 +177,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { reduceSinkReplaceableWorkName.clear(); inputUnReplaceableWork.clear(); needSortSerdeReduceSinkName.clear(); + reducesinkNeedSort.clear(); tezWork = null; } @@ -186,6 +188,15 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return queryPlan.getRootTasks().get(0) instanceof ExplainTask; } + private static T getLastElement(final Iterable elements) { + T lastElement = null; + for (T element: elements) { + lastElement = element; + } + + return lastElement; + } + public void run(HookContext hookContext) throws Exception { this.hookContext = hookContext; omniHiveConf = new OmniHiveConf(hookContext.getConf()); @@ -346,6 +357,12 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { tezWork.getEdgeProperty(work, child).setEdgeType(TezEdgeProperty.EdgeType.CUSTOM_SIMPLE_EDGE); return true; } + + if (isReplaceable(reducer, true) && reducer.getType().equals(OperatorType.SELECT)) { + tezWork.getEdgeProperty(work, child).setEdgeType(TezEdgeProperty.EdgeType.SIMPLE_EDGE); + reducesinkNeedSort.add(getLastElement(work.getAllOperators())); + return true; + } return false; } @@ -1243,7 +1260,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { case REDUCESINK: OmniReduceSinkOperator omniReduceSinkOperator = new OmniReduceSinkOperator(current.getCompilationOpContext(), (ReduceSinkDesc) current.getConf(), reduceSinkCanReplace); ReduceSinkDesc conf = (ReduceSinkDesc) current.getConf(); - if (conf.getTopN() == -1) { + if (conf.getTopN() == -1 && !reducesinkNeedSort.contains(current)) { omniReduceSinkOperator.getConf().getKeySerializeInfo().getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); omniReduceSinkOperator.getConf().getValueSerializeInfo().getProperties().setProperty(SERIALIZATION_LIB, OmniVecBatchSerDe.class.getName()); } else {