From b071d7fd988c0df0ec6acb696996987c9c226346 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=AC=A3=E4=BC=9F?= <11737428+xiang-xinwei@user.noreply.gitee.com> Date: Tue, 17 Oct 2023 06:17:46 +0000 Subject: [PATCH 1/2] adapter decimal128 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 向欣伟 <11737428+xiang-xinwei@user.noreply.gitee.com> --- .../nova/hetu/olk/block/Int128ArrayOmniBlock.java | 12 ++++++++++++ .../java/nova/hetu/olk/tool/OperatorUtils.java | 14 +++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/block/Int128ArrayOmniBlock.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/block/Int128ArrayOmniBlock.java index c1ee727cf..311f98a73 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/block/Int128ArrayOmniBlock.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/block/Int128ArrayOmniBlock.java @@ -84,6 +84,18 @@ public class Int128ArrayOmniBlock public Int128ArrayOmniBlock(int positionOffset, int positionCount, byte[] valueIsNull, long[] values) { + for (int i = positionOffset; i < positionCount; i++) { + int first = 2 * i; + int second = first + 1; + if (values[second] < 0) { + values[first] = ~values[first] + 1; + values[second] = values[second] ^ 0x7FFFFFFFFFFFFFFFL; + if (values[first] == 0) { + values[second] = values[second] + 1; + } + } + } + if (positionOffset < 0) { throw new IllegalArgumentException("positionOffset is negative"); } diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/tool/OperatorUtils.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/tool/OperatorUtils.java index a95e2e9d8..fdb0449a5 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/tool/OperatorUtils.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/tool/OperatorUtils.java @@ -916,8 +916,20 @@ public final class OperatorUtils private static Block buildInt128ArrayBlock(Block block, int positionCount) { Decimal128Vec decimal128Vec = (Decimal128Vec) block.getValues(); + long[] values = decimal128Vec.get(0, positionCount); + for (int i = 0; i < positionCount; i++) { + int first = 2 * i; + int second = first + 1; + if (values[second] < 0) { + values[first] = ~values[first] + 1; + values[second] = values[second] ^ 0x7FFFFFFFFFFFFFFFL; + if (values[first] == 0) { + values[second] = values[second] + 1; + } + } + } return new Int128ArrayBlock(positionCount, Optional.of(decimal128Vec.getValuesNulls(0, positionCount)), - decimal128Vec.get(0, positionCount)); + values); } private static Block buildDoubleArrayBLock(Block block, int positionCount) -- Gitee From 7012e04d7933ece2ec85f93904193bff0f2f269d Mon Sep 17 00:00:00 2001 From: x30027624 Date: Mon, 16 Oct 2023 17:02:59 +0800 Subject: [PATCH 2/2] adapter moving join filter from hash builder to lookup join for sharing hash table --- .../hetu/olk/OmniLocalExecutionPlanner.java | 25 ++++++++++--------- .../olk/operator/HashBuilderOmniOperator.java | 2 +- .../olk/operator/LookupJoinOmniOperator.java | 4 +-- .../olk/operator/LookupJoinOmniOperators.java | 14 +++++------ .../operator/LookupJoinOmniOperatorTest.java | 2 +- .../BenchmarkHashJoinOmniOperators.java | 4 +-- 6 files changed, 26 insertions(+), 25 deletions(-) diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/OmniLocalExecutionPlanner.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/OmniLocalExecutionPlanner.java index b7a4718da..291e47830 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/OmniLocalExecutionPlanner.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/OmniLocalExecutionPlanner.java @@ -1355,13 +1355,10 @@ public class OmniLocalExecutionPlanner } } - public JoinBridgeManager createLookupSourceFactory(JoinNode node, + public JoinBridgeManager createLookupSourceFactory(JoinNode node, LocalExecutionPlanContext buildContext, PhysicalOperation buildSource, PlanNode buildNode, List buildSymbols, Optional buildHashSymbol, PhysicalOperation probeSource, LocalExecutionPlanContext context, boolean spillEnabled) { - LocalExecutionPlanContext buildContext = context.createSubContext(); - PhysicalOperation buildSource = buildNode.accept(this, buildContext); - if (buildSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION) { checkState(probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION, "Build execution is GROUPED_EXECUTION. Probe execution is expected be GROUPED_EXECUTION, but is UNGROUPED_EXECUTION."); @@ -1502,11 +1499,15 @@ public class OmniLocalExecutionPlanner // Plan build boolean spillEnabled = isSpillEnabled(session) && node.isSpillable().orElseThrow(() -> new IllegalArgumentException("spillable not yet set")); - JoinBridgeManager lookupSourceFactory = createLookupSourceFactory(node, + LocalExecutionPlanContext buildContext = context.createSubContext(); + PhysicalOperation buildSource = buildNode.accept(this, buildContext); + JoinBridgeManager lookupSourceFactory = createLookupSourceFactory(node, buildContext, buildSource, buildNode, buildSymbols, buildHashSymbol, probeSource, context, spillEnabled); - + Optional filterFunction = node.getFilter() + .map(filterExpression -> getTranslatedExpression(context, buildSource, probeSource, + filterExpression)); OperatorFactory operator = createLookupJoin(node, probeSource, probeSymbols, probeHashSymbol, - lookupSourceFactory, context, spillEnabled); + lookupSourceFactory, context, spillEnabled, filterFunction); ImmutableMap.Builder outputMappings = ImmutableMap.builder(); List outputSymbols = node.getOutputSymbols(); @@ -1520,7 +1521,7 @@ public class OmniLocalExecutionPlanner public OperatorFactory createLookupJoin(JoinNode node, PhysicalOperation probeSource, List probeSymbols, Optional probeHashSymbol, JoinBridgeManager lookupSourceFactoryManager, - LocalExecutionPlanContext context, boolean spillEnabled) + LocalExecutionPlanContext context, boolean spillEnabled, Optional filter) { List probeTypes = probeSource.getTypes(); List probeOutputSymbols = node.getOutputSymbols().stream() @@ -1539,7 +1540,7 @@ public class OmniLocalExecutionPlanner boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL; if (!buildOuter) { return createOmniLookupJoin(node, lookupSourceFactoryManager, context, probeTypes, probeOutputChannels, - probeJoinChannels, probeHashChannel, totalOperatorsCount); + probeJoinChannels, probeHashChannel, totalOperatorsCount, filter); } return getLookUpJoinOperatorFactory(node, lookupSourceFactoryManager, context, probeTypes, probeOutputChannels, probeJoinChannels, probeHashChannel, totalOperatorsCount); @@ -1561,19 +1562,19 @@ public class OmniLocalExecutionPlanner public OperatorFactory createOmniLookupJoin(JoinNode node, JoinBridgeManager lookupSourceFactoryManager, LocalExecutionPlanContext context, List probeTypes, List probeOutputChannels, - List probeJoinChannels, OptionalInt probeHashChannel, OptionalInt totalOperatorsCount) + List probeJoinChannels, OptionalInt probeHashChannel, OptionalInt totalOperatorsCount, Optional filter) { List driverFactories = context.getDriverFactories(); DriverFactory driverFactory = driverFactories.get(driverFactories.size() - 1); List operatorFactories = driverFactory.getOperatorFactories(); OperatorFactory buildOperatorFactory = operatorFactories.get(operatorFactories.size() - 1); - + System.out.println(node.getType()); switch (node.getType()) { case INNER: return LookupJoinOmniOperators.innerJoin(context.getNextOperatorId(), node.getId(), lookupSourceFactoryManager, probeTypes, probeJoinChannels, probeHashChannel, Optional.of(probeOutputChannels), totalOperatorsCount, - (HashBuilderOmniOperatorFactory) buildOperatorFactory); + (HashBuilderOmniOperatorFactory) buildOperatorFactory, filter); case LEFT: return LookupJoinOmniOperators.probeOuterJoin(context.getNextOperatorId(), node.getId(), lookupSourceFactoryManager, probeTypes, probeJoinChannels, probeHashChannel, diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/HashBuilderOmniOperator.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/HashBuilderOmniOperator.java index f1e9195f8..1e0bf7b76 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/HashBuilderOmniOperator.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/HashBuilderOmniOperator.java @@ -122,7 +122,7 @@ public class HashBuilderOmniOperator DataType[] omniBuildTypes = OperatorUtils.toDataTypes(buildTypes); String[] omniSearchFunctions = searchFunctions.stream().toArray(String[]::new); this.omniHashBuilderOperatorFactory = new OmniHashBuilderOperatorFactory(omniBuildTypes, - Ints.toArray(hashChannels), filterFunction, sortChannel, omniSearchFunctions, operatorCount); + Ints.toArray(hashChannels), operatorCount); } @Override diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperator.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperator.java index 3a81b9d98..74fdb02e6 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperator.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperator.java @@ -330,7 +330,7 @@ public class LookupJoinOmniOperator JoinBridgeManager lookupSourceFactoryManager, List probeTypes, List probeOutputChannels, List probeOutputChannelTypes, JoinType joinType, OptionalInt totalOperatorsCount, List probeJoinChannel, OptionalInt probeHashChannel, - HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory) + HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory, Optional filter) { this.operatorId = operatorId; this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); @@ -362,7 +362,7 @@ public class LookupJoinOmniOperator this.omniLookupJoinOperatorFactory = new OmniLookupJoinOperatorFactory(types, Ints.toArray(probeOutputChannels), Ints.toArray(probeJoinChannel), Ints.toArray(buildOutputChannels), buildOutputDataTypes, getOmniJoinType(joinType), - hashBuilderOmniOperatorFactory.getOmniHashBuilderOperatorFactory()); + hashBuilderOmniOperatorFactory.getOmniHashBuilderOperatorFactory(), filter); } private nova.hetu.omniruntime.constants.JoinType getOmniJoinType(JoinType joinType) diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperators.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperators.java index ce39ada9f..358d7c4ad 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperators.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperators.java @@ -73,11 +73,11 @@ public class LookupJoinOmniOperators JoinBridgeManager lookupSourceFactory, List probeTypes, List probeJoinChannel, OptionalInt probeHashChannel, Optional> probeOutputChannels, OptionalInt totalOperatorsCount, - HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory) + HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory, Optional filter) { return createOmniJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), - LookupJoinOperators.JoinType.INNER, totalOperatorsCount, hashBuilderOmniOperatorFactory); + LookupJoinOperators.JoinType.INNER, totalOperatorsCount, hashBuilderOmniOperatorFactory, filter); } /** @@ -102,7 +102,7 @@ public class LookupJoinOmniOperators { return createOmniJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), - LookupJoinOperators.JoinType.PROBE_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory); + LookupJoinOperators.JoinType.PROBE_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory, Optional.empty()); } /** @@ -127,7 +127,7 @@ public class LookupJoinOmniOperators { return createOmniJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), - LookupJoinOperators.JoinType.LOOKUP_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory); + LookupJoinOperators.JoinType.LOOKUP_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory, Optional.empty()); } /** @@ -152,7 +152,7 @@ public class LookupJoinOmniOperators { return createOmniJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), - LookupJoinOperators.JoinType.FULL_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory); + LookupJoinOperators.JoinType.FULL_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory, Optional.empty()); } private static List rangeList(int endExclusive) @@ -164,13 +164,13 @@ public class LookupJoinOmniOperators JoinBridgeManager lookupSourceFactoryManager, List probeTypes, List probeJoinChannel, OptionalInt probeHashChannel, List probeOutputChannels, LookupJoinOperators.JoinType joinType, OptionalInt totalOperatorsCount, - HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory) + HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory, Optional filter) { List probeOutputChannelTypes = probeOutputChannels.stream().map(probeTypes::get) .collect(toImmutableList()); return new LookupJoinOmniOperator.LookupJoinOmniOperatorFactory(operatorId, planNodeId, lookupSourceFactoryManager, probeTypes, probeOutputChannels, probeOutputChannelTypes, joinType, - totalOperatorsCount, probeJoinChannel, probeHashChannel, hashBuilderOmniOperatorFactory); + totalOperatorsCount, probeJoinChannel, probeHashChannel, hashBuilderOmniOperatorFactory, filter); } } diff --git a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LookupJoinOmniOperatorTest.java b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LookupJoinOmniOperatorTest.java index 747dab1c0..9c2c607ab 100644 --- a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LookupJoinOmniOperatorTest.java +++ b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LookupJoinOmniOperatorTest.java @@ -128,7 +128,7 @@ public class LookupJoinOmniOperatorTest case INNER: operatorFactory = innerJoin(operatorId, planNodeId, lookupSourceFactoryManager, probeTypes, probeJoinChannels, empty, Optional.of(probeOutputChannels), totalOperatorsCount, - hashBuilderOmniOperatorFactory); + hashBuilderOmniOperatorFactory, Optional.empty()); break; case PROBE_OUTER: operatorFactory = probeOuterJoin(operatorId, planNodeId, lookupSourceFactoryManager, probeTypes, diff --git a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/benchmark/BenchmarkHashJoinOmniOperators.java b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/benchmark/BenchmarkHashJoinOmniOperators.java index a2fc57dbb..0e8fa4154 100644 --- a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/benchmark/BenchmarkHashJoinOmniOperators.java +++ b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/benchmark/BenchmarkHashJoinOmniOperators.java @@ -201,7 +201,7 @@ public class BenchmarkHashJoinOmniOperators HashBuilderOmniOperatorFactory hashBuilderOperatorFactory = createBuildOperatorFactory(); LookupJoinOmniOperators.innerJoin(HASH_JOIN_OPERATOR_ID, TEST_PLAN_NODE_ID, lookupSourceFactoryManager, getBuildTypes(), buildJoinChannels, buildHashChannel, - Optional.of(buildOutputChannels), OptionalInt.of(1), hashBuilderOperatorFactory); + Optional.of(buildOutputChannels), OptionalInt.of(1), hashBuilderOperatorFactory, Optional.empty()); return hashBuilderOperatorFactory; } @@ -552,7 +552,7 @@ public class BenchmarkHashJoinOmniOperators OperatorFactory operatorFactory = LookupJoinOmniOperators.innerJoin(HASH_JOIN_OPERATOR_ID, TEST_PLAN_NODE_ID, lookupSourceFactoryManager, getProbeTypes(), probeJoinChannels, probeHashChannel, - Optional.of(probeOutputChannels), OptionalInt.of(1), hashBuilderOperatorFactory); + Optional.of(probeOutputChannels), OptionalInt.of(1), hashBuilderOperatorFactory, Optional.empty()); buildDriverContext = super.createTaskContext().addPipelineContext(0, true, true, false) .addDriverContext(); buildOperator = hashBuilderOperatorFactory.createOperator(buildDriverContext); -- Gitee