From 265d2271f9ae160a645082c0a30451f86861ff61 Mon Sep 17 00:00:00 2001 From: Vijay Kumar Date: Wed, 1 Mar 2023 15:00:29 +0530 Subject: [PATCH] delete the sourceHandlesFiles for the completed stage. --- .../exchange/FileSystemExchange.java | 5 ++ .../HetuFileSystemExchangeStorage.java | 5 ++ .../execution/SqlStageExecution.java | 48 +++++++++++++++++++ .../FaultTolerantStageScheduler.java | 19 ++++++-- .../scheduler/SqlQueryScheduler.java | 3 +- .../scheduler/TaskDescriptorStorage.java | 10 ++-- 6 files changed, 82 insertions(+), 8 deletions(-) diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchange.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchange.java index 109f622da..2130f46c9 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchange.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchange.java @@ -309,4 +309,9 @@ public class FileSystemExchange "." + exchangeContext.getExchangeId() + "." + taskPartitionId + PATH_SEPARATOR); } + + public FileSystemExchangeStorage getExchangeStorage() + { + return exchangeStorage; + } } diff --git a/presto-main/src/main/java/io/prestosql/exchange/storage/HetuFileSystemExchangeStorage.java b/presto-main/src/main/java/io/prestosql/exchange/storage/HetuFileSystemExchangeStorage.java index 5c1bce39a..30919e08c 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/storage/HetuFileSystemExchangeStorage.java +++ b/presto-main/src/main/java/io/prestosql/exchange/storage/HetuFileSystemExchangeStorage.java @@ -148,4 +148,9 @@ public class HetuFileSystemExchangeStorage { return new HetuFileSystemExchangeWriter(file, fileSystemClient, secretKey, exchangeCompressionEnabled, algorithmParameterSpec, directSerialisationType, directSerialisationBufferSize); } + + public HetuFileSystemClient getFileSystemClient() + { + return fileSystemClient; + } } diff --git a/presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java b/presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java index ad847703e..47147db57 100644 --- a/presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java +++ b/presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java @@ -29,10 +29,17 @@ import io.prestosql.dynamicfilter.DynamicFilterService; import io.prestosql.exchange.Exchange; import io.prestosql.exchange.ExchangeSinkHandle; import io.prestosql.exchange.ExchangeSinkInstanceHandle; +import io.prestosql.exchange.ExchangeSourceHandle; +import io.prestosql.exchange.FileStatus; +import io.prestosql.exchange.FileSystemExchange; +import io.prestosql.exchange.FileSystemExchangeSourceHandle; +import io.prestosql.exchange.storage.HetuFileSystemExchangeStorage; import io.prestosql.execution.StateMachine.StateChangeListener; import io.prestosql.execution.buffer.OutputBuffers; +import io.prestosql.execution.scheduler.FaultTolerantStageScheduler; import io.prestosql.execution.scheduler.PartitionIdAllocator; import io.prestosql.execution.scheduler.SplitSchedulerStats; +import io.prestosql.execution.scheduler.TaskDescriptor; import io.prestosql.failuredetector.FailureDetector; import io.prestosql.metadata.InternalNode; import io.prestosql.metadata.Split; @@ -42,6 +49,7 @@ import io.prestosql.snapshot.QueryRecoveryManager; import io.prestosql.snapshot.QuerySnapshotManager; import io.prestosql.spi.PrestoException; import io.prestosql.spi.QueryId; +import io.prestosql.spi.filesystem.HetuFileSystemClient; import io.prestosql.spi.plan.JoinNode; import io.prestosql.spi.plan.PlanNode; import io.prestosql.spi.plan.PlanNodeId; @@ -55,7 +63,9 @@ import io.prestosql.sql.planner.plan.SemiJoinNode; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import java.io.IOException; import java.net.URI; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -75,6 +85,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -146,6 +157,8 @@ public final class SqlStageExecution private long captureSnapshotId; private final Optional sinkExchange; + private FaultTolerantStageScheduler stageScheduler; + public static SqlStageExecution createSqlStageExecution( StageId stageId, URI location, @@ -875,6 +888,36 @@ public final class SqlStageExecution // Once stage is done, consider restore is complete stateMachine.setAllTasksFinal(finalTaskInfos, false, captureSnapshotId); } + + if (stateMachine.getState() == StageState.PENDING && stageScheduler.isFinished()) { + FileSystemExchange fileSystemExchange = (FileSystemExchange) sinkExchange.get(); + HetuFileSystemExchangeStorage exchangeStorage = (HetuFileSystemExchangeStorage) fileSystemExchange.getExchangeStorage(); + HetuFileSystemClient fileSystemClient = exchangeStorage.getFileSystemClient(); + + Map> descriptors = stageScheduler.getDescriptors(); + for (Map.Entry> entry : descriptors.entrySet()) { + if (entry.getKey().equals(stateMachine.getStageId())) { + for (TaskDescriptor descriptor : entry.getValue()) { + List> exchangeSourceHandles = descriptor.getExchangeSourceHandles().asMap().values().stream().collect(Collectors.toList()); + for (Collection sourceHandles : exchangeSourceHandles) { + List exchangeSourceHandleList = (List) sourceHandles; + for (ExchangeSourceHandle exchangeSourceHandle : exchangeSourceHandleList) { + FileSystemExchangeSourceHandle fileSystemExchangeSourceHandle = (FileSystemExchangeSourceHandle) exchangeSourceHandle; + List files = fileSystemExchangeSourceHandle.getFiles(); + files.forEach(file -> { + try { + fileSystemClient.deleteIfExists(Paths.get(URI.create(file.getFilePath()))); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } + } + } + } + } } private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo) @@ -1029,4 +1072,9 @@ public final class SqlStageExecution } } } + + public void setStageScheduler(FaultTolerantStageScheduler stageScheduler) + { + this.stageScheduler = stageScheduler; + } } diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/FaultTolerantStageScheduler.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/FaultTolerantStageScheduler.java index 421fe6dbf..04fbfe5b3 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/FaultTolerantStageScheduler.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/FaultTolerantStageScheduler.java @@ -137,7 +137,7 @@ public class FaultTolerantStageScheduler private final Duration minRetryDelay; private final Duration maxRetryDelay; private final double retryDelayScaleFactor; - + private Map> descriptors = new HashMap<>(); @GuardedBy("this") private Optional delaySchedulingDuration = Optional.empty(); @GuardedBy("this") @@ -629,8 +629,8 @@ public class FaultTolerantStageScheduler // Remove taskDescriptor for finished partition to conserve memory // We may revisit the approach when we support volatile exchanges, for which // it may be needed to restart already finished task to recreate output it produced. - taskDescriptorStorage.remove(stage.getStageId(), partitionId); - + TaskDescriptor descriptor = taskDescriptorStorage.remove(stage.getStageId(), partitionId); + addFinishedDescriptors(descriptor); break; case CANCELED: log.debug("Task cancelled: %s", taskId); @@ -730,6 +730,14 @@ public class FaultTolerantStageScheduler } } + private void addFinishedDescriptors(TaskDescriptor descriptor) + { + if (!descriptors.containsKey(stage.getStageId())) { + descriptors.put(stage.getStageId(), new ArrayList<>()); + } + descriptors.get(stage.getStageId()).add(descriptor); + } + private boolean shouldDelayScheduling(ErrorCode errorCode) { return errorCode.getType() == INTERNAL_ERROR || errorCode.getType() == EXTERNAL; @@ -779,4 +787,9 @@ public class FaultTolerantStageScheduler { void completeFuture(SettableFuture future, Duration delay); } + + public Map> getDescriptors() + { + return descriptors; + } } diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java index 621438ed9..a6083eebd 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java @@ -2341,7 +2341,7 @@ public class SqlQueryScheduler verify(sourceExchange != null, "exchange not found for fragment: %s", childFragmentId); sourceExchanges.put(childFragmentId, sourceExchange); } - + SqlStageExecution stageExecution = stageManager.get(fragment.getId()); BucketToPartition inputBucketToPartition = bucketToPartitionCache.apply(fragment.getPartitioning()); FaultTolerantStageScheduler scheduler = new FaultTolerantStageScheduler( querySession, @@ -2365,6 +2365,7 @@ public class SqlQueryScheduler maxTasksWaitingForNodePerStage); sqlStageSchedulers.add(scheduler); + stageExecution.setStageScheduler(scheduler); } Set coordinatorConsumedFragments = coordinatorConsumedFragmentsBuilder.build(); diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/TaskDescriptorStorage.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/TaskDescriptorStorage.java index 287275193..69f75cf6d 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/TaskDescriptorStorage.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/TaskDescriptorStorage.java @@ -115,18 +115,19 @@ public class TaskDescriptorStorage * * @throws java.util.NoSuchElementException if {@link TaskDescriptor} for a given task does not exist */ - public synchronized void remove(StageId stageId, int partitionId) + public synchronized TaskDescriptor remove(StageId stageId, int partitionId) { TaskDescriptors storage = storages.get(stageId.getQueryId()); if (storage == null) { // query has been terminated - return; + return null; } long previousReservedBytes = storage.getReservedBytes(); - storage.remove(stageId, partitionId); + TaskDescriptor descriptor = storage.remove(stageId, partitionId); long currentReservedBytes = storage.getReservedBytes(); long delta = currentReservedBytes - previousReservedBytes; updateMemoryReservation(delta); + return descriptor; } /** @@ -198,7 +199,7 @@ public class TaskDescriptorStorage return descriptor; } - public void remove(StageId stageId, int partitionId) + public TaskDescriptor remove(StageId stageId, int partitionId) { throwIfFailed(); TaskDescriptorKey key = new TaskDescriptorKey(stageId, partitionId); @@ -207,6 +208,7 @@ public class TaskDescriptorStorage throw new NoSuchElementException(format("descriptor not found for key %s", key)); } reservedBytes -= descriptor.getRetainedSizeInBytes(); + return descriptor; } public long getReservedBytes() -- Gitee