From 418f2497cbc6fdf3b6217e0512c04bcc4cc534c9 Mon Sep 17 00:00:00 2001 From: liguang1 <751885226@qq.com> Date: Fri, 14 Jul 2023 16:54:56 +0800 Subject: [PATCH 1/2] Validate ORC ACID version base on data file metadata as a fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If the _orc_acid_version file is not present for a table we are not failing read flow immediatelly. Instead we are validating if ORC ACID version is supported by Trino using ORC data files user metadata. The ORC ACID version is recorded in data file metadata under hive.acid.version by recent Hive versions. author Ɓukasz Osipiuk on 2021/4/13 at 17:12 6cc154c6 committed on 2021/4/16 at 6:10 --- .../hive/BackgroundHiveSplitLoader.java | 18 +++++-- .../plugin/hive/DeleteDeltaLocations.java | 30 +++++++++-- .../plugin/hive/orc/OrcPageSourceFactory.java | 53 +++++++++++++++++++ .../plugin/hive/DeleteDeltaLocationsTest.java | 2 +- .../hive/orc/IcebergOrcPageSourceTest.java | 2 +- .../OrcSelectivePageSourceFactoryTest.java | 4 +- .../main/java/io/prestosql/orc/OrcWriter.java | 3 +- .../io/prestosql/orc/metadata/Footer.java | 11 +++- .../orc/metadata/OrcMetadataReader.java | 3 +- .../orc/metadata/OrcMetadataWriter.java | 2 +- 10 files changed, 110 insertions(+), 18 deletions(-) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java index e63372293..a1f7847b0 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java @@ -22,6 +22,7 @@ import com.google.common.collect.Streams; import com.google.common.io.CharStreams; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.log.Logger; +import io.prestosql.plugin.hive.DeleteDeltaLocations.Builder; import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext; import io.prestosql.plugin.hive.HiveBucketing.BucketingVersion; import io.prestosql.plugin.hive.HiveSplit.BucketConversion; @@ -462,6 +463,8 @@ public class BackgroundHiveSplitLoader Optional deleteDeltaLocations; long min = Long.MAX_VALUE; long max = Long.MIN_VALUE; + // Create a registry of delete_delta directories for the partition + DeleteDeltaLocations.Builder deleteDeltaLocationsBuilder = DeleteDeltaLocations.builder(path); if (AcidUtils.isTransactionalTable(table.getParameters())) { boolean isVacuum = queryType.map(type -> type == QueryType.VACUUM).orElse(false); AcidUtils.Directory directory = hdfsEnvironment.doAs(hdfsContext.getIdentity().getUser(), () -> { @@ -500,8 +503,14 @@ public class BackgroundHiveSplitLoader ? directory.getBaseDirectory() : (directory.getCurrentDirectories().size() > 0 ? directory.getCurrentDirectories().get(0).getPath() : null); - if (baseOrDeltaPath != null && AcidUtils.OrcAcidVersion.getAcidVersionFromMetaFile(baseOrDeltaPath, fs) < 2) { - throw new PrestoException(NOT_SUPPORTED, "Hive transactional tables are supported with Hive 3.0 and only after a major compaction has been run"); + if (baseOrDeltaPath != null && AcidUtils.OrcAcidVersion.getAcidVersionFromMetaFile(baseOrDeltaPath, fs) >= 2) { + // Trino cannot read ORC ACID tables with version < 2 (written by Hive older than 3.0) + // See https://github.com/trinodb/trino/issues/2790#issuecomment-591901728 for more context + + // We perform initial version check based on _orc_acid_version file here. + // If we cannot verify the version (the _orc_acid_version file may not exist), + // we will do extra check based on ORC datafile metadata in OrcPageSourceFactory. + deleteDeltaLocationsBuilder.setOrcAcidVersionValidated(true); } } @@ -538,8 +547,6 @@ public class BackgroundHiveSplitLoader } } - // Create a registry of delete_delta directories for the partition - DeleteDeltaLocations.Builder deleteDeltaLocationsBuilder = DeleteDeltaLocations.builder(path); for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { //In case of minor compaction, delete_delta directories should not be used for masking. if (delta.isDeleteDelta() && (!isVacuum || isFullVacuum)) { @@ -583,6 +590,7 @@ public class BackgroundHiveSplitLoader } } else { + deleteDeltaLocationsBuilder.setOrcAcidVersionValidated(true); // no ACID; no further validation needed readPaths = ImmutableList.of(path); deleteDeltaLocations = Optional.empty(); } @@ -628,7 +636,7 @@ public class BackgroundHiveSplitLoader if (filteredWriteIds.isEmpty()) { return Optional.empty(); } - return Optional.of(new DeleteDeltaLocations(allLocations.getPartitionLocation(), filteredWriteIds)); + return Optional.of(new DeleteDeltaLocations(allLocations.getPartitionLocation(), filteredWriteIds, allLocations.isOrcAcidVersionValidated())); } private ListenableFuture addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/DeleteDeltaLocations.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/DeleteDeltaLocations.java index 9b1a5ce38..b4cacce65 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/DeleteDeltaLocations.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/DeleteDeltaLocations.java @@ -37,14 +37,17 @@ public class DeleteDeltaLocations private final String partitionLocation; private final List deleteDeltas; + private final boolean orcAcidVersionValidated; @JsonCreator public DeleteDeltaLocations( @JsonProperty("partitionLocation") String partitionLocation, - @JsonProperty("deleteDeltas") List deleteDeltas) + @JsonProperty("deleteDeltas") List deleteDeltas, + @JsonProperty("orcAcidVersionValidated") boolean orcAcidVersionValidated) { this.partitionLocation = requireNonNull(partitionLocation, "partitionLocation is null"); this.deleteDeltas = ImmutableList.copyOf(requireNonNull(deleteDeltas, "deleteDeltas is null")); + this.orcAcidVersionValidated = orcAcidVersionValidated; checkArgument(!deleteDeltas.isEmpty(), "deleteDeltas is empty"); } @@ -60,6 +63,12 @@ public class DeleteDeltaLocations return deleteDeltas; } + @JsonProperty + public boolean isOrcAcidVersionValidated() + { + return orcAcidVersionValidated; + } + @Override public boolean equals(Object o) { @@ -73,13 +82,14 @@ public class DeleteDeltaLocations DeleteDeltaLocations that = (DeleteDeltaLocations) o; return partitionLocation.equals(that.partitionLocation) && - deleteDeltas.equals(that.deleteDeltas); + deleteDeltas.equals(that.deleteDeltas) && + (orcAcidVersionValidated == that.orcAcidVersionValidated); } @Override public int hashCode() { - return Objects.hash(partitionLocation, deleteDeltas); + return Objects.hash(partitionLocation, deleteDeltas, orcAcidVersionValidated); } @Override @@ -88,6 +98,7 @@ public class DeleteDeltaLocations return toStringHelper(this) .add("partitionLocation", partitionLocation) .add("deleteDeltas", deleteDeltas) + .add("orcAcidVersionValidated", orcAcidVersionValidated) .toString(); } @@ -106,6 +117,7 @@ public class DeleteDeltaLocations { private final Path partitionLocation; private final ImmutableList.Builder deleteDeltaInfoBuilder = ImmutableList.builder(); + private boolean orcAcidVersionValidated; private Builder(Path partitionPath) { @@ -126,13 +138,21 @@ public class DeleteDeltaLocations return this; } + public Builder setOrcAcidVersionValidated(boolean orcAcidVersionValidated) + { + this.orcAcidVersionValidated = orcAcidVersionValidated; + return this; + } + public Optional build() { List localDeleteDeltas = deleteDeltaInfoBuilder.build(); - if (localDeleteDeltas.isEmpty()) { + if (localDeleteDeltas.isEmpty() && orcAcidVersionValidated) { + // We do not want to bail out with `Optional.empty()` if ORC ACID version was not validated based on _orc_acid_version file. + // If we did so extra validation in OrcPageSourceFactory (based on file metadata) would not be performed. return Optional.empty(); } - return Optional.of(new DeleteDeltaLocations(partitionLocation.toString(), localDeleteDeltas)); + return Optional.of(new DeleteDeltaLocations(partitionLocation.toString(), localDeleteDeltas, orcAcidVersionValidated)); } } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java index 8e4aff9cc..5f10fcc95 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.orc.DynamicFilterOrcPredicate.DynamicFilterOrcPredicateBuilder; @@ -92,6 +93,7 @@ import static com.google.common.collect.Maps.uniqueIndex; import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.prestosql.orc.OrcReader.INITIAL_BATCH_SIZE; import static io.prestosql.orc.OrcReader.handleCacheLoadException; +import static io.prestosql.orc.metadata.OrcMetadataWriter.PRESTO_WRITER_ID; import static io.prestosql.orc.metadata.OrcType.OrcTypeKind.INT; import static io.prestosql.orc.metadata.OrcType.OrcTypeKind.LONG; import static io.prestosql.orc.metadata.OrcType.OrcTypeKind.STRUCT; @@ -112,9 +114,11 @@ import static io.prestosql.plugin.hive.HiveSessionProperties.isOrcRowDataCacheEn import static io.prestosql.plugin.hive.HiveSessionProperties.isOrcRowIndexCacheEnabled; import static io.prestosql.plugin.hive.HiveSessionProperties.isOrcStripeFooterCacheEnabled; import static io.prestosql.plugin.hive.orc.OrcPageSource.handleException; +import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.IntegerType.INTEGER; import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; @@ -322,6 +326,10 @@ public class OrcPageSourceFactory } OrcReader reader = new OrcReader(readerLocalDataSource, fileTail, maxMergeDistance, tinyStripeThreshold, maxReadBlockSize); + if (deleteDeltaLocations.isPresent() && !deleteDeltaLocations.get().isOrcAcidVersionValidated()) { + validateOrcAcidVersion(path, reader); + } + List fileColumns = reader.getRootColumn().getNestedColumns(); List fileReadColumns = isFullAcid ? new ArrayList<>(columns.size() + 5) : new ArrayList<>(columns.size()); List fileReadTypes = isFullAcid ? new ArrayList<>(columns.size() + 5) : new ArrayList<>(columns.size()); @@ -496,6 +504,51 @@ public class OrcPageSourceFactory } } + private static void validateOrcAcidVersion(Path path, OrcReader reader) + { + // Trino cannot read ORC ACID tables with version < 2 (written by Hive older than 3.0) + // See https://github.com/trinodb/trino/issues/2790#issuecomment-591901728 for more context + + // If we did not manage to validate if ORC ACID version used by table is supported one base don _orc_acid_version metadata file + // we check the data file footer. + + if (reader.getFooter().getNumberOfRows() == 0) { + // file is empty. assuming we are good. We do not want to depend on metadata in such case + // as some hadoop distributions do not write ORC ACID metadata for empty ORC files + return; + } + + int writerId = reader.getFooter().getWriterId().orElseThrow(() -> new PrestoException(HIVE_BAD_DATA, "writerId not set in ORC metadata in " + path)); + if (writerId == PRESTO_WRITER_ID) { + // file written by Presto. We are good. + return; + } + + Optional hiveAcidVersion = getHiveAcidVersion(reader); + if (!hiveAcidVersion.isPresent() || hiveAcidVersion.get() < 2) { + throw new PrestoException( + NOT_SUPPORTED, + format("Hive transactional tables are supported since Hive 3.0. Expected `hive.acid.version` in ORC metadata in %s to be >=2 but was %s. " + + "If you have upgraded from an older version of Hive, make sure a major compaction has been run at least once after the upgrade.", + path, + hiveAcidVersion.map(String::valueOf).orElse(""))); + } + } + + private static Optional getHiveAcidVersion(OrcReader reader) + { + Slice slice = reader.getFooter().getUserMetadata().get("hive.acid.version"); + if (slice == null) { + return Optional.empty(); + } + try { + return Optional.of(Integer.valueOf(slice.toString(UTF_8))); + } + catch (RuntimeException ignored) { + return Optional.empty(); + } + } + interface FSDataInputStreamProvider { FSDataInputStream provide() diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/DeleteDeltaLocationsTest.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/DeleteDeltaLocationsTest.java index 4bee5314d..3dbd3aa00 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/DeleteDeltaLocationsTest.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/DeleteDeltaLocationsTest.java @@ -26,7 +26,7 @@ public class DeleteDeltaLocationsTest public void setUp() throws Exception { deleteDeltaLocationsUnderTest = new DeleteDeltaLocations("partitionLocation", - Arrays.asList(new WriteIdInfo(0L, 0L, 0))); + Arrays.asList(new WriteIdInfo(0L, 0L, 0)), true); } @Test diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/IcebergOrcPageSourceTest.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/IcebergOrcPageSourceTest.java index d794adca4..98ccd5cd7 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/IcebergOrcPageSourceTest.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/IcebergOrcPageSourceTest.java @@ -66,7 +66,7 @@ public class IcebergOrcPageSourceTest icebergOrcPageSourceUnderTest = new IcebergOrcPageSource(mockRecordReader, Arrays.asList(IcebergOrcPageSource.ColumnAdaptation.sourceColumn(0)), mockOrcDataSource, Optional.of(new OrcDeletedRows("sourceFileName", Optional.of( - new DeleteDeltaLocations("partitionLocation", Arrays.asList(new WriteIdInfo(0L, 0L, 0)))), + new DeleteDeltaLocations("partitionLocation", Arrays.asList(new WriteIdInfo(0L, 0L, 0)), true)), new OrcDeleteDeltaPageSourceFactory("sessionUser", new Configuration(false), new HdfsEnvironment(hdfsConfiguration, new HiveConfig(), new NoHdfsAuthentication()), new DataSize(0.0, DataSize.Unit.BYTE), new DataSize(0.0, DataSize.Unit.BYTE), diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/OrcSelectivePageSourceFactoryTest.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/OrcSelectivePageSourceFactoryTest.java index d685d0af4..5b5d26994 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/OrcSelectivePageSourceFactoryTest.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/OrcSelectivePageSourceFactoryTest.java @@ -97,7 +97,7 @@ public class OrcSelectivePageSourceFactoryTest final Optional>> additionPredicates = Optional.of( Arrays.asList(TupleDomain.withColumnDomains(new HashMap<>()))); final Optional deleteDeltaLocations = Optional.of( - new DeleteDeltaLocations("partitionLocation", Arrays.asList(new WriteIdInfo(0L, 0L, 0)))); + new DeleteDeltaLocations("partitionLocation", Arrays.asList(new WriteIdInfo(0L, 0L, 0)), true)); final Optional> indexes = Optional.of( Arrays.asList(new IndexMetadata(null, "table", new String[]{"columns"}, "rootUri", "uri", 0L, 0L))); final List columnMappings = Arrays.asList( @@ -156,7 +156,7 @@ public class OrcSelectivePageSourceFactoryTest final DataSize maxReadBlockSize = new DataSize(0.0, DataSize.Unit.BYTE); final FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); final Optional deleteDeltaLocations = Optional.of( - new DeleteDeltaLocations("partitionLocation", Arrays.asList(new WriteIdInfo(0L, 0L, 0)))); + new DeleteDeltaLocations("partitionLocation", Arrays.asList(new WriteIdInfo(0L, 0L, 0)), true)); final Optional> indexes = Optional.of( Arrays.asList(new IndexMetadata(null, "table", new String[]{"columns"}, "rootUri", "uri", 0L, 0L))); final OrcCacheStore orcCacheStore = null; diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java b/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java index da69611c5..97f449e4f 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java +++ b/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java @@ -658,7 +658,8 @@ public final class OrcWriter .collect(toImmutableList()), orcTypes, toFileStats, - localUserMetadata); + localUserMetadata, + Optional.empty()); // writer id will be set by MetadataWriter closedStripes.clear(); closedStripesRetainedBytes = 0; diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/Footer.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/Footer.java index 655bfbff9..dff39e6a9 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/metadata/Footer.java +++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/Footer.java @@ -35,13 +35,15 @@ public class Footer private final ColumnMetadata types; private final Optional> fileStats; private final Map userMetadata; + private final Optional writerId; public Footer(long numberOfRows, int rowsInRowGroup, List stripes, ColumnMetadata types, Optional> fileStats, - Map userMetadata) + Map userMetadata, + Optional writerId) { this.numberOfRows = numberOfRows; this.rowsInRowGroup = rowsInRowGroup; @@ -50,6 +52,7 @@ public class Footer this.fileStats = requireNonNull(fileStats, "fileStats is null"); requireNonNull(userMetadata, "userMetadata is null"); this.userMetadata = ImmutableMap.copyOf(transformValues(userMetadata, Slices::copyOf)); + this.writerId = requireNonNull(writerId, "writerId is null"); } public long getNumberOfRows() @@ -82,6 +85,11 @@ public class Footer return ImmutableMap.copyOf(transformValues(userMetadata, Slices::copyOf)); } + public Optional getWriterId() + { + return writerId; + } + @Override public String toString() { @@ -92,6 +100,7 @@ public class Footer .add("types", types) .add("columnStatistics", fileStats) .add("userMetadata", userMetadata.keySet()) + .add("writerId", writerId) .toString(); } } diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java index 10f468330..80b55e8c0 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java @@ -139,7 +139,8 @@ public class OrcMetadataReader toStripeInformation(footer.getStripesList()), toType(footer.getTypesList()), toColumnStatistics(hiveWriterVersion, footer.getStatisticsList(), false), - toUserMetadata(footer.getMetadataList())); + toUserMetadata(footer.getMetadataList()), + Optional.of(footer.getWriter())); } private static List toStripeInformation(List types) diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java index c4c57e8c5..6a3f26cc7 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java +++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java @@ -44,7 +44,7 @@ import static java.util.stream.Collectors.toList; public class OrcMetadataWriter implements MetadataWriter { - private static final int PRESTO_WRITER_ID = 2; + public static final int PRESTO_WRITER_ID = 2; // in order to change this value, the master Apache ORC proto file must be updated private static final int PRESTO_WRITER_VERSION = 6; // maximum version readable by Hive 2.x before the ORC-125 fix -- Gitee From a32b96e7cdb45797b2b0c97cc356a00900513e3d Mon Sep 17 00:00:00 2001 From: liguang1 <751885226@qq.com> Date: Fri, 28 Jul 2023 20:33:23 +0800 Subject: [PATCH 2/2] bugfix ui timezone --- .../prestosql/queryeditorui/execution/ClientSessionFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/presto-main/src/main/java/io/prestosql/queryeditorui/execution/ClientSessionFactory.java b/presto-main/src/main/java/io/prestosql/queryeditorui/execution/ClientSessionFactory.java index fce4de569..5da15dad0 100644 --- a/presto-main/src/main/java/io/prestosql/queryeditorui/execution/ClientSessionFactory.java +++ b/presto-main/src/main/java/io/prestosql/queryeditorui/execution/ClientSessionFactory.java @@ -49,7 +49,7 @@ public class ClientSessionFactory this.source = source; this.catalog = catalog; this.defaultSchema = defaultSchema; - this.timeZoneId = TimeZone.getTimeZone("UTC").toZoneId(); + this.timeZoneId = TimeZone.getDefault().toZoneId(); this.locale = Locale.getDefault(); this.clientSessionTimeout = firstNonNull(clientSessionTimeout, succinctDuration(1, MINUTES)); } -- Gitee