diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/OmniLocalExecutionPlanner.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/OmniLocalExecutionPlanner.java index b7a4718da9858567222f2e5185eacc5242d8f76d..291e47830fd108bc759a6decae7dbb336fa2b037 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/OmniLocalExecutionPlanner.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/OmniLocalExecutionPlanner.java @@ -1355,13 +1355,10 @@ public class OmniLocalExecutionPlanner } } - public JoinBridgeManager createLookupSourceFactory(JoinNode node, + public JoinBridgeManager createLookupSourceFactory(JoinNode node, LocalExecutionPlanContext buildContext, PhysicalOperation buildSource, PlanNode buildNode, List buildSymbols, Optional buildHashSymbol, PhysicalOperation probeSource, LocalExecutionPlanContext context, boolean spillEnabled) { - LocalExecutionPlanContext buildContext = context.createSubContext(); - PhysicalOperation buildSource = buildNode.accept(this, buildContext); - if (buildSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION) { checkState(probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION, "Build execution is GROUPED_EXECUTION. Probe execution is expected be GROUPED_EXECUTION, but is UNGROUPED_EXECUTION."); @@ -1502,11 +1499,15 @@ public class OmniLocalExecutionPlanner // Plan build boolean spillEnabled = isSpillEnabled(session) && node.isSpillable().orElseThrow(() -> new IllegalArgumentException("spillable not yet set")); - JoinBridgeManager lookupSourceFactory = createLookupSourceFactory(node, + LocalExecutionPlanContext buildContext = context.createSubContext(); + PhysicalOperation buildSource = buildNode.accept(this, buildContext); + JoinBridgeManager lookupSourceFactory = createLookupSourceFactory(node, buildContext, buildSource, buildNode, buildSymbols, buildHashSymbol, probeSource, context, spillEnabled); - + Optional filterFunction = node.getFilter() + .map(filterExpression -> getTranslatedExpression(context, buildSource, probeSource, + filterExpression)); OperatorFactory operator = createLookupJoin(node, probeSource, probeSymbols, probeHashSymbol, - lookupSourceFactory, context, spillEnabled); + lookupSourceFactory, context, spillEnabled, filterFunction); ImmutableMap.Builder outputMappings = ImmutableMap.builder(); List outputSymbols = node.getOutputSymbols(); @@ -1520,7 +1521,7 @@ public class OmniLocalExecutionPlanner public OperatorFactory createLookupJoin(JoinNode node, PhysicalOperation probeSource, List probeSymbols, Optional probeHashSymbol, JoinBridgeManager lookupSourceFactoryManager, - LocalExecutionPlanContext context, boolean spillEnabled) + LocalExecutionPlanContext context, boolean spillEnabled, Optional filter) { List probeTypes = probeSource.getTypes(); List probeOutputSymbols = node.getOutputSymbols().stream() @@ -1539,7 +1540,7 @@ public class OmniLocalExecutionPlanner boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL; if (!buildOuter) { return createOmniLookupJoin(node, lookupSourceFactoryManager, context, probeTypes, probeOutputChannels, - probeJoinChannels, probeHashChannel, totalOperatorsCount); + probeJoinChannels, probeHashChannel, totalOperatorsCount, filter); } return getLookUpJoinOperatorFactory(node, lookupSourceFactoryManager, context, probeTypes, probeOutputChannels, probeJoinChannels, probeHashChannel, totalOperatorsCount); @@ -1561,19 +1562,19 @@ public class OmniLocalExecutionPlanner public OperatorFactory createOmniLookupJoin(JoinNode node, JoinBridgeManager lookupSourceFactoryManager, LocalExecutionPlanContext context, List probeTypes, List probeOutputChannels, - List probeJoinChannels, OptionalInt probeHashChannel, OptionalInt totalOperatorsCount) + List probeJoinChannels, OptionalInt probeHashChannel, OptionalInt totalOperatorsCount, Optional filter) { List driverFactories = context.getDriverFactories(); DriverFactory driverFactory = driverFactories.get(driverFactories.size() - 1); List operatorFactories = driverFactory.getOperatorFactories(); OperatorFactory buildOperatorFactory = operatorFactories.get(operatorFactories.size() - 1); - + System.out.println(node.getType()); switch (node.getType()) { case INNER: return LookupJoinOmniOperators.innerJoin(context.getNextOperatorId(), node.getId(), lookupSourceFactoryManager, probeTypes, probeJoinChannels, probeHashChannel, Optional.of(probeOutputChannels), totalOperatorsCount, - (HashBuilderOmniOperatorFactory) buildOperatorFactory); + (HashBuilderOmniOperatorFactory) buildOperatorFactory, filter); case LEFT: return LookupJoinOmniOperators.probeOuterJoin(context.getNextOperatorId(), node.getId(), lookupSourceFactoryManager, probeTypes, probeJoinChannels, probeHashChannel, diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/block/Int128ArrayOmniBlock.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/block/Int128ArrayOmniBlock.java index c1ee727cffe7e98c9bebdab5a4b5e515f801410c..311f98a73bf6248f8d3397f4d51c819cbf334007 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/block/Int128ArrayOmniBlock.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/block/Int128ArrayOmniBlock.java @@ -84,6 +84,18 @@ public class Int128ArrayOmniBlock public Int128ArrayOmniBlock(int positionOffset, int positionCount, byte[] valueIsNull, long[] values) { + for (int i = positionOffset; i < positionCount; i++) { + int first = 2 * i; + int second = first + 1; + if (values[second] < 0) { + values[first] = ~values[first] + 1; + values[second] = values[second] ^ 0x7FFFFFFFFFFFFFFFL; + if (values[first] == 0) { + values[second] = values[second] + 1; + } + } + } + if (positionOffset < 0) { throw new IllegalArgumentException("positionOffset is negative"); } diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/HashBuilderOmniOperator.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/HashBuilderOmniOperator.java index f1e9195f815d71dda859d3c66b83b29c43e4290f..1e0bf7b76da5756207a78525fe2ff29ecb8f3c6f 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/HashBuilderOmniOperator.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/HashBuilderOmniOperator.java @@ -122,7 +122,7 @@ public class HashBuilderOmniOperator DataType[] omniBuildTypes = OperatorUtils.toDataTypes(buildTypes); String[] omniSearchFunctions = searchFunctions.stream().toArray(String[]::new); this.omniHashBuilderOperatorFactory = new OmniHashBuilderOperatorFactory(omniBuildTypes, - Ints.toArray(hashChannels), filterFunction, sortChannel, omniSearchFunctions, operatorCount); + Ints.toArray(hashChannels), operatorCount); } @Override diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperator.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperator.java index 3a81b9d98d5841548b3a313b368654342f37e6fa..74fdb02e6dbaea1234e5c0c8ef9b23beb134a0dc 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperator.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperator.java @@ -330,7 +330,7 @@ public class LookupJoinOmniOperator JoinBridgeManager lookupSourceFactoryManager, List probeTypes, List probeOutputChannels, List probeOutputChannelTypes, JoinType joinType, OptionalInt totalOperatorsCount, List probeJoinChannel, OptionalInt probeHashChannel, - HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory) + HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory, Optional filter) { this.operatorId = operatorId; this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); @@ -362,7 +362,7 @@ public class LookupJoinOmniOperator this.omniLookupJoinOperatorFactory = new OmniLookupJoinOperatorFactory(types, Ints.toArray(probeOutputChannels), Ints.toArray(probeJoinChannel), Ints.toArray(buildOutputChannels), buildOutputDataTypes, getOmniJoinType(joinType), - hashBuilderOmniOperatorFactory.getOmniHashBuilderOperatorFactory()); + hashBuilderOmniOperatorFactory.getOmniHashBuilderOperatorFactory(), filter); } private nova.hetu.omniruntime.constants.JoinType getOmniJoinType(JoinType joinType) diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperators.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperators.java index ce39ada9f449d4c6f1928db04b3c5d98ebb14f5a..358d7c4ad4aa4234b32fb0b9ceabc3cde73a97fa 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperators.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LookupJoinOmniOperators.java @@ -73,11 +73,11 @@ public class LookupJoinOmniOperators JoinBridgeManager lookupSourceFactory, List probeTypes, List probeJoinChannel, OptionalInt probeHashChannel, Optional> probeOutputChannels, OptionalInt totalOperatorsCount, - HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory) + HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory, Optional filter) { return createOmniJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), - LookupJoinOperators.JoinType.INNER, totalOperatorsCount, hashBuilderOmniOperatorFactory); + LookupJoinOperators.JoinType.INNER, totalOperatorsCount, hashBuilderOmniOperatorFactory, filter); } /** @@ -102,7 +102,7 @@ public class LookupJoinOmniOperators { return createOmniJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), - LookupJoinOperators.JoinType.PROBE_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory); + LookupJoinOperators.JoinType.PROBE_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory, Optional.empty()); } /** @@ -127,7 +127,7 @@ public class LookupJoinOmniOperators { return createOmniJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), - LookupJoinOperators.JoinType.LOOKUP_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory); + LookupJoinOperators.JoinType.LOOKUP_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory, Optional.empty()); } /** @@ -152,7 +152,7 @@ public class LookupJoinOmniOperators { return createOmniJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), - LookupJoinOperators.JoinType.FULL_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory); + LookupJoinOperators.JoinType.FULL_OUTER, totalOperatorsCount, hashBuilderOmniOperatorFactory, Optional.empty()); } private static List rangeList(int endExclusive) @@ -164,13 +164,13 @@ public class LookupJoinOmniOperators JoinBridgeManager lookupSourceFactoryManager, List probeTypes, List probeJoinChannel, OptionalInt probeHashChannel, List probeOutputChannels, LookupJoinOperators.JoinType joinType, OptionalInt totalOperatorsCount, - HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory) + HashBuilderOmniOperator.HashBuilderOmniOperatorFactory hashBuilderOmniOperatorFactory, Optional filter) { List probeOutputChannelTypes = probeOutputChannels.stream().map(probeTypes::get) .collect(toImmutableList()); return new LookupJoinOmniOperator.LookupJoinOmniOperatorFactory(operatorId, planNodeId, lookupSourceFactoryManager, probeTypes, probeOutputChannels, probeOutputChannelTypes, joinType, - totalOperatorsCount, probeJoinChannel, probeHashChannel, hashBuilderOmniOperatorFactory); + totalOperatorsCount, probeJoinChannel, probeHashChannel, hashBuilderOmniOperatorFactory, filter); } } diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/tool/OperatorUtils.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/tool/OperatorUtils.java index a95e2e9d88425d8bc2bca1b3f17d1c467fee37e8..fdb0449a5d41ef7cafbab7fc3a46ae2b938f400e 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/tool/OperatorUtils.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/tool/OperatorUtils.java @@ -916,8 +916,20 @@ public final class OperatorUtils private static Block buildInt128ArrayBlock(Block block, int positionCount) { Decimal128Vec decimal128Vec = (Decimal128Vec) block.getValues(); + long[] values = decimal128Vec.get(0, positionCount); + for (int i = 0; i < positionCount; i++) { + int first = 2 * i; + int second = first + 1; + if (values[second] < 0) { + values[first] = ~values[first] + 1; + values[second] = values[second] ^ 0x7FFFFFFFFFFFFFFFL; + if (values[first] == 0) { + values[second] = values[second] + 1; + } + } + } return new Int128ArrayBlock(positionCount, Optional.of(decimal128Vec.getValuesNulls(0, positionCount)), - decimal128Vec.get(0, positionCount)); + values); } private static Block buildDoubleArrayBLock(Block block, int positionCount) diff --git a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LookupJoinOmniOperatorTest.java b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LookupJoinOmniOperatorTest.java index 747dab1c0b4bfc863df20de7e889e456fef0d377..9c2c607ab3463e98dad52ed5f319317be5e9d57b 100644 --- a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LookupJoinOmniOperatorTest.java +++ b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LookupJoinOmniOperatorTest.java @@ -128,7 +128,7 @@ public class LookupJoinOmniOperatorTest case INNER: operatorFactory = innerJoin(operatorId, planNodeId, lookupSourceFactoryManager, probeTypes, probeJoinChannels, empty, Optional.of(probeOutputChannels), totalOperatorsCount, - hashBuilderOmniOperatorFactory); + hashBuilderOmniOperatorFactory, Optional.empty()); break; case PROBE_OUTER: operatorFactory = probeOuterJoin(operatorId, planNodeId, lookupSourceFactoryManager, probeTypes, diff --git a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/benchmark/BenchmarkHashJoinOmniOperators.java b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/benchmark/BenchmarkHashJoinOmniOperators.java index a2fc57dbbc2afe5998f73d59fda74d03a935a25f..0e8fa4154fab3d1e3c4e6e5d34ae1e521aac3b70 100644 --- a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/benchmark/BenchmarkHashJoinOmniOperators.java +++ b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/benchmark/BenchmarkHashJoinOmniOperators.java @@ -201,7 +201,7 @@ public class BenchmarkHashJoinOmniOperators HashBuilderOmniOperatorFactory hashBuilderOperatorFactory = createBuildOperatorFactory(); LookupJoinOmniOperators.innerJoin(HASH_JOIN_OPERATOR_ID, TEST_PLAN_NODE_ID, lookupSourceFactoryManager, getBuildTypes(), buildJoinChannels, buildHashChannel, - Optional.of(buildOutputChannels), OptionalInt.of(1), hashBuilderOperatorFactory); + Optional.of(buildOutputChannels), OptionalInt.of(1), hashBuilderOperatorFactory, Optional.empty()); return hashBuilderOperatorFactory; } @@ -552,7 +552,7 @@ public class BenchmarkHashJoinOmniOperators OperatorFactory operatorFactory = LookupJoinOmniOperators.innerJoin(HASH_JOIN_OPERATOR_ID, TEST_PLAN_NODE_ID, lookupSourceFactoryManager, getProbeTypes(), probeJoinChannels, probeHashChannel, - Optional.of(probeOutputChannels), OptionalInt.of(1), hashBuilderOperatorFactory); + Optional.of(probeOutputChannels), OptionalInt.of(1), hashBuilderOperatorFactory, Optional.empty()); buildDriverContext = super.createTaskContext().addPipelineContext(0, true, true, false) .addDriverContext(); buildOperator = hashBuilderOperatorFactory.createOperator(buildDriverContext);