diff --git a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LimitOmniOperator.java b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LimitOmniOperator.java index 6863feae5ffd1e80a7a04008bb5dbb04ddcb345c..1536c1618ef6193804debd9b277cc55f3d8af47b 100644 --- a/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LimitOmniOperator.java +++ b/omnioperator/omniop-openlookeng-extension/src/main/java/nova/hetu/olk/operator/LimitOmniOperator.java @@ -20,23 +20,15 @@ import io.prestosql.operator.Operator; import io.prestosql.operator.OperatorContext; import io.prestosql.operator.OperatorFactory; import io.prestosql.spi.Page; +import io.prestosql.spi.block.Block; import io.prestosql.spi.plan.PlanNodeId; import io.prestosql.spi.type.Type; import nova.hetu.olk.tool.BlockUtils; -import nova.hetu.olk.tool.VecAllocatorHelper; -import nova.hetu.olk.tool.VecBatchToPageIterator; -import nova.hetu.omniruntime.operator.OmniOperator; -import nova.hetu.omniruntime.operator.limit.OmniLimitOperatorFactory; -import nova.hetu.omniruntime.vector.VecAllocator; -import nova.hetu.omniruntime.vector.VecBatch; - -import java.util.Iterator; + import java.util.List; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; -import static nova.hetu.olk.tool.OperatorUtils.buildVecBatch; /** * The type limit omni operator. @@ -48,55 +40,43 @@ public class LimitOmniOperator { private long remainingLimit; - private boolean finishing; - - private boolean finished; - private final OperatorContext operatorContext; - private final OmniOperator omniOperator; - - private Iterator pages; // The Pages + private Page nextPage; // The Pages /** * Instantiates a new Top n omni operator. * * @param operatorContext the operator context - * @param omniOperator the omni operator * @param limit the limit record count */ - public LimitOmniOperator(OperatorContext operatorContext, OmniOperator omniOperator, long limit) + public LimitOmniOperator(OperatorContext operatorContext, long limit) { checkArgument(limit >= 0, "limit must be at least zero"); this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); - this.omniOperator = omniOperator; this.remainingLimit = limit; - this.pages = null; + this.nextPage = null; } @Override public void finish() { - finishing = true; + remainingLimit = 0; } @Override public boolean isFinished() { - return finished; + return remainingLimit == 0 && nextPage == null; } @Override public void close() throws Exception { - // free page if it has next - if (pages != null) { - while (pages.hasNext()) { - Page next = pages.next(); - BlockUtils.freePage(next); - } + // free page if it is not null + if (nextPage != null) { + BlockUtils.freePage(nextPage); } - omniOperator.close(); } @Override @@ -108,48 +88,41 @@ public class LimitOmniOperator @Override public boolean needsInput() { - if (finishing) { - return false; - } - - return true; + return remainingLimit > 0 && nextPage == null; } @Override public void addInput(Page page) { - checkState(!finishing, "Operator is already finishing"); requireNonNull(page, "page is null"); int rowCount = page.getPositionCount(); - if (remainingLimit == 0 || rowCount == 0) { + if (rowCount == 0 || !needsInput()) { BlockUtils.freePage(page); return; } - remainingLimit = (remainingLimit >= rowCount) ? (remainingLimit - rowCount) : 0; - - VecBatch vecBatch = buildVecBatch(omniOperator.getVecAllocator(), page, getClass().getSimpleName()); - omniOperator.addInput(vecBatch); - pages = new VecBatchToPageIterator(omniOperator.getOutput()); + if (rowCount <= remainingLimit) { + remainingLimit -= rowCount; + nextPage = page; + } + else { + Block[] blocks = new Block[page.getChannelCount()]; + for (int channel = 0; channel < page.getChannelCount(); channel++) { + Block block = page.getBlock(channel); + blocks[channel] = block.getRegion(0, (int) remainingLimit); + } + nextPage = new Page((int) remainingLimit, blocks); + remainingLimit = 0; + BlockUtils.freePage(page); + } } @Override public Page getOutput() { - if (finishing) { - finished = true; - } - - if (pages == null) { - return null; - } - - Page page = null; - if (pages.hasNext()) { - page = pages.next(); - } - pages = null; + Page page = nextPage; + nextPage = null; return page; } @@ -163,8 +136,6 @@ public class LimitOmniOperator { private final long limit; - private final OmniLimitOperatorFactory omniLimitOperatorFactory; - /** * Instantiates a new Top n omni operator factory. * @@ -178,23 +149,14 @@ public class LimitOmniOperator this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); this.limit = limit; this.sourceTypes = sourceTypes; - omniLimitOperatorFactory = getOmniLimitOperatorFactory(limit); - } - - private OmniLimitOperatorFactory getOmniLimitOperatorFactory(long limit) - { - return new OmniLimitOperatorFactory(limit); } @Override public Operator createOperator(DriverContext driverContext) { - VecAllocator vecAllocator = VecAllocatorHelper.createOperatorLevelAllocator(driverContext, - VecAllocator.UNLIMIT, LimitOmniOperator.class); OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, LimitOmniOperator.class.getSimpleName()); - OmniOperator omniOperator = omniLimitOperatorFactory.createOperator(vecAllocator); - return new LimitOmniOperator(operatorContext, omniOperator, limit); + return new LimitOmniOperator(operatorContext, limit); } @Override diff --git a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LimitOmniOperatorTest.java b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LimitOmniOperatorTest.java index 7ae33d15f7a5c7a00b838d2b487ad6bfaf70a946..f773a9a41a7755c3039f6ff1232ab320c4746fdb 100644 --- a/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LimitOmniOperatorTest.java +++ b/omnioperator/omniop-openlookeng-extension/src/test/java/nova/hetu/olk/operator/LimitOmniOperatorTest.java @@ -36,7 +36,6 @@ import java.util.Random; import java.util.UUID; import static nova.hetu.olk.mock.MockUtil.mockNewVecWithAnyArguments; -import static nova.hetu.olk.mock.MockUtil.mockOmniOperator; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -70,7 +69,7 @@ public class LimitOmniOperatorTest @Override protected Operator createOperator(Operator originalOperator) { - return new LimitOmniOperator(originalOperator.getOperatorContext(), mockOmniOperator(), limit); + return new LimitOmniOperator(originalOperator.getOperatorContext(), limit); } @Override