diff --git a/hetu-carbondata/src/main/java/io/hetu/core/plugin/carbondata/CarbondataConnectorFactory.java b/hetu-carbondata/src/main/java/io/hetu/core/plugin/carbondata/CarbondataConnectorFactory.java
index 98cc6deb3b18d4d3f6fcb14c3b4babe28f90257f..dae2f8069ae2363fffe92baf827a4c9a10eb9b0b 100755
--- a/hetu-carbondata/src/main/java/io/hetu/core/plugin/carbondata/CarbondataConnectorFactory.java
+++ b/hetu-carbondata/src/main/java/io/hetu/core/plugin/carbondata/CarbondataConnectorFactory.java
@@ -23,6 +23,10 @@ import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
import io.airlift.units.DataSize;
import io.hetu.core.plugin.carbondata.impl.CarbondataTableConfig;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.plugin.base.jmx.MBeanServerModule;
import io.prestosql.plugin.hive.ConnectorObjectNameGeneratorModule;
import io.prestosql.plugin.hive.HiveAnalyzeProperties;
@@ -55,10 +59,6 @@ import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.spi.procedure.Procedure;
import io.prestosql.spi.type.TypeManager;
import org.apache.carbondata.hive.CarbonHiveSerDe;
diff --git a/hetu-hazelcast/pom.xml b/hetu-hazelcast/pom.xml
index ee93729f7799d5b203ac3ddaa13c62165e77c84b..ce85ab9fe0b045277f2eb5391e62b5ac0b67cc47 100644
--- a/hetu-hazelcast/pom.xml
+++ b/hetu-hazelcast/pom.xml
@@ -15,7 +15,6 @@
${project.parent.basedir}
4.6.3
3.21.0
- 5.1
1.9.4
5.3.27
1.1.1
diff --git a/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/IcebergConnector.java b/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/IcebergConnector.java
index 844b1a80273ef8ec4528e65cc304da986df4ab83..b55b76031979900cde7a76a00108a26fc0d29310 100644
--- a/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/IcebergConnector.java
+++ b/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/IcebergConnector.java
@@ -16,6 +16,7 @@ package io.hetu.core.plugin.iceberg;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.bootstrap.LifeCycleManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
import io.prestosql.plugin.base.session.SessionPropertiesProvider;
import io.prestosql.plugin.hive.HiveTransactionHandle;
import io.prestosql.spi.connector.Connector;
@@ -29,7 +30,6 @@ import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.TableProcedureMetadata;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import io.prestosql.spi.procedure.Procedure;
import io.prestosql.spi.security.ConnectorIdentity;
import io.prestosql.spi.session.PropertyMetadata;
diff --git a/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/InternalIcebergConnectorFactory.java b/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/InternalIcebergConnectorFactory.java
index 5e9b26bbeb82aa755ef200db45ff83f48fe06756..3c54d842c3b166c0cdd0f2295af0932464a94be6 100644
--- a/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/InternalIcebergConnectorFactory.java
+++ b/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/InternalIcebergConnectorFactory.java
@@ -22,6 +22,10 @@ import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
import io.hetu.core.plugin.iceberg.catalog.IcebergCatalogModule;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.plugin.base.jmx.MBeanServerModule;
import io.prestosql.plugin.base.session.SessionPropertiesProvider;
import io.prestosql.plugin.hive.ConnectorObjectNameGeneratorModule;
@@ -45,10 +49,6 @@ import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.TableProcedureMetadata;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.spi.procedure.Procedure;
import io.prestosql.spi.type.TypeManager;
import org.weakref.jmx.guice.MBeanModule;
diff --git a/hetu-mpp/src/main/java/io/hetu/core/plugin/mpp/MppConnectorFactory.java b/hetu-mpp/src/main/java/io/hetu/core/plugin/mpp/MppConnectorFactory.java
index 6d143578543272a101a68619adc971f5b7c9c5e7..fc3fd2d4d73ca079a5e0aaa174198e913d32a0cb 100644
--- a/hetu-mpp/src/main/java/io/hetu/core/plugin/mpp/MppConnectorFactory.java
+++ b/hetu-mpp/src/main/java/io/hetu/core/plugin/mpp/MppConnectorFactory.java
@@ -21,6 +21,10 @@ import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.plugin.base.jmx.MBeanServerModule;
import io.prestosql.plugin.hive.ConnectorObjectNameGeneratorModule;
import io.prestosql.plugin.hive.HiveAnalyzeProperties;
@@ -53,10 +57,6 @@ import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.spi.heuristicindex.IndexClient;
import io.prestosql.spi.procedure.Procedure;
import io.prestosql.spi.type.TypeManager;
diff --git a/pom.xml b/pom.xml
index 84168c5d552f2290cf957beebb736bb6554eb270..0a4b073efd91c6e3e868a38c3af94b8ddf74aeb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,8 @@
1.66
0.7.0
5.0.3
+ 2.4.1
+ 5.1
io.hetu.core
@@ -176,6 +161,11 @@
+
+ org.assertj
+ assertj-core
+
+
io.airlift
log-manager
@@ -194,31 +184,6 @@
-
- com.101tec
- zkclient
- 0.10
- runtime
-
-
- log4j
- log4j
-
-
- org.slf4j
- slf4j-log4j12
-
-
- zookeeper
- org.apache.zookeeper
-
-
- netty
- io.netty
-
-
-
-
org.testng
diff --git a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaAdminFactory.java
similarity index 39%
rename from presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
rename to presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaAdminFactory.java
index 7807c7ff740750313e9a7c65121e6e62fabd5510..98554486c661fb0b274a1b3a53f2111e65528e29 100644
--- a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaAdminFactory.java
@@ -11,41 +11,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.prestosql.plugin.kafka;
-import io.prestosql.plugin.kafka.util.EmbeddedKafka;
-import io.prestosql.tests.AbstractTestIntegrationSmokeTest;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.Test;
+import io.prestosql.spi.HostAddress;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+
+import javax.inject.Inject;
-import java.io.IOException;
+import java.util.Properties;
+import java.util.Set;
-import static io.airlift.tpch.TpchTable.ORDERS;
-import static io.prestosql.plugin.kafka.KafkaQueryRunner.createKafkaQueryRunner;
-import static io.prestosql.plugin.kafka.util.EmbeddedKafka.createEmbeddedKafka;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.joining;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
-@Test
-public class TestKafkaIntegrationSmokeTest
- extends AbstractTestIntegrationSmokeTest
+public class KafkaAdminFactory
{
- private final EmbeddedKafka embeddedKafka;
+ private final Set nodes;
- public TestKafkaIntegrationSmokeTest()
- throws Exception
+ @Inject
+ public KafkaAdminFactory(KafkaConfig kafkaConfig)
{
- this(createEmbeddedKafka());
+ requireNonNull(kafkaConfig, "kafkaConfig is null");
+ nodes = kafkaConfig.getNodes();
}
- public TestKafkaIntegrationSmokeTest(EmbeddedKafka embeddedKafka)
+ public AdminClient create()
{
- super(() -> createKafkaQueryRunner(embeddedKafka, ORDERS));
- this.embeddedKafka = embeddedKafka;
+ return KafkaAdminClient.create(configure());
}
- @AfterClass(alwaysRun = true)
- public void destroy()
- throws IOException
+ public Properties configure()
{
- embeddedKafka.close();
+ Properties properties = new Properties();
+ properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
+ .map(HostAddress::toString)
+ .collect(joining(",")));
+ return properties;
}
}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaColumnHandle.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaColumnHandle.java
index 64a53c8bff143ac6a62510374063dab45e6f8173..66846d65250ccdea12a5eb42874e5ff6828ffa23 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaColumnHandle.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaColumnHandle.java
@@ -16,6 +16,7 @@ package io.prestosql.plugin.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.plugin.kafka.encoder.EncoderColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.type.Type;
@@ -24,14 +25,9 @@ import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
-/**
- * Kafka specific connector column handle.
- */
public final class KafkaColumnHandle
- implements DecoderColumnHandle, Comparable
+ implements EncoderColumnHandle, DecoderColumnHandle
{
- private final int ordinalPosition;
-
/**
* Column Name
*/
@@ -43,24 +39,24 @@ public final class KafkaColumnHandle
private final Type type;
/**
- * Mapping hint for the decoder. Can be null.
+ * Mapping hint for the codec. Can be null.
*/
private final String mapping;
/**
- * Data format to use (selects the decoder). Can be null.
+ * Data format to use (selects the codec). Can be null.
*/
private final String dataFormat;
/**
- * Additional format hint for the selected decoder. Selects a decoder subtype (e.g. which timestamp decoder).
+ * Additional format hint for the selected codec. Selects a codec subtype (e.g. which timestamp codec).
*/
private final String formatHint;
/**
- * True if the key decoder should be used, false if the message decoder should be used.
+ * True if the key codec should be used, false if the message codec should be used.
*/
- private final boolean keyDecoder;
+ private final boolean keyCodec;
/**
* True if the column should be hidden.
@@ -74,33 +70,25 @@ public final class KafkaColumnHandle
@JsonCreator
public KafkaColumnHandle(
- @JsonProperty("ordinalPosition") int ordinalPosition,
@JsonProperty("name") String name,
@JsonProperty("type") Type type,
@JsonProperty("mapping") String mapping,
@JsonProperty("dataFormat") String dataFormat,
@JsonProperty("formatHint") String formatHint,
- @JsonProperty("keyDecoder") boolean keyDecoder,
+ @JsonProperty("keyCodec") boolean keyCodec,
@JsonProperty("hidden") boolean hidden,
@JsonProperty("internal") boolean internal)
{
- this.ordinalPosition = ordinalPosition;
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
this.mapping = mapping;
this.dataFormat = dataFormat;
this.formatHint = formatHint;
- this.keyDecoder = keyDecoder;
+ this.keyCodec = keyCodec;
this.hidden = hidden;
this.internal = internal;
}
- @JsonProperty
- public int getOrdinalPosition()
- {
- return ordinalPosition;
- }
-
@Override
@JsonProperty
public String getName()
@@ -137,9 +125,9 @@ public final class KafkaColumnHandle
}
@JsonProperty
- public boolean isKeyDecoder()
+ public boolean isKeyCodec()
{
- return keyDecoder;
+ return keyCodec;
}
@JsonProperty
@@ -163,7 +151,7 @@ public final class KafkaColumnHandle
@Override
public int hashCode()
{
- return Objects.hash(ordinalPosition, name, type, mapping, dataFormat, formatHint, keyDecoder, hidden, internal);
+ return Objects.hash(name, type, mapping, dataFormat, formatHint, keyCodec, hidden, internal);
}
@Override
@@ -177,34 +165,26 @@ public final class KafkaColumnHandle
}
KafkaColumnHandle other = (KafkaColumnHandle) obj;
- return Objects.equals(this.ordinalPosition, other.ordinalPosition) &&
- Objects.equals(this.name, other.name) &&
+ return Objects.equals(this.name, other.name) &&
Objects.equals(this.type, other.type) &&
Objects.equals(this.mapping, other.mapping) &&
Objects.equals(this.dataFormat, other.dataFormat) &&
Objects.equals(this.formatHint, other.formatHint) &&
- Objects.equals(this.keyDecoder, other.keyDecoder) &&
+ Objects.equals(this.keyCodec, other.keyCodec) &&
Objects.equals(this.hidden, other.hidden) &&
Objects.equals(this.internal, other.internal);
}
- @Override
- public int compareTo(KafkaColumnHandle otherHandle)
- {
- return Integer.compare(this.getOrdinalPosition(), otherHandle.getOrdinalPosition());
- }
-
@Override
public String toString()
{
return toStringHelper(this)
- .add("ordinalPosition", ordinalPosition)
.add("name", name)
.add("type", type)
.add("mapping", mapping)
.add("dataFormat", dataFormat)
.add("formatHint", formatHint)
- .add("keyDecoder", keyDecoder)
+ .add("keyCodec", keyCodec)
.add("hidden", hidden)
.add("internal", internal)
.toString();
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConfig.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..f4c18d58f136a83b7ca3143cd4888d3a8658738b
--- /dev/null
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConfig.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.kafka;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.DefunctConfig;
+import io.airlift.units.DataSize;
+import io.airlift.units.DataSize.Unit;
+import io.prestosql.spi.HostAddress;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import java.io.File;
+import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+@DefunctConfig("kafka.connect-timeout")
+public class KafkaConfig
+{
+ private static final int KAFKA_DEFAULT_PORT = 9092;
+
+ private Set nodes = ImmutableSet.of();
+ private DataSize kafkaBufferSize = new DataSize(64, Unit.KILOBYTE);
+ private String defaultSchema = "default";
+ private Set tableNames = ImmutableSet.of();
+ private File tableDescriptionDir = new File("etc/kafka/");
+ private boolean hideInternalColumns = true;
+ private int messagesPerSplit = 100_000;
+ private boolean timestampUpperBoundPushDownEnabled;
+
+ @Size(min = 1)
+ public Set getNodes()
+ {
+ return nodes;
+ }
+
+ @Config("kafka.nodes")
+ @ConfigDescription("Seed nodes for Kafka cluster. At least one must exist")
+ public KafkaConfig setNodes(String nodes)
+ {
+ this.nodes = (nodes == null) ? null : parseNodes(nodes);
+ return this;
+ }
+
+ public DataSize getKafkaBufferSize()
+ {
+ return kafkaBufferSize;
+ }
+
+ @Config("kafka.buffer-size")
+ @ConfigDescription("Kafka message consumer buffer size")
+ public KafkaConfig setKafkaBufferSize(String kafkaBufferSize)
+ {
+ this.kafkaBufferSize = DataSize.valueOf(kafkaBufferSize);
+ return this;
+ }
+
+ @NotNull
+ public String getDefaultSchema()
+ {
+ return defaultSchema;
+ }
+
+ @Config("kafka.default-schema")
+ @ConfigDescription("Schema name to use in the connector")
+ public KafkaConfig setDefaultSchema(String defaultSchema)
+ {
+ this.defaultSchema = defaultSchema;
+ return this;
+ }
+
+ @NotNull
+ public Set getTableNames()
+ {
+ return tableNames;
+ }
+
+ @Config("kafka.table-names")
+ @ConfigDescription("Set of tables known to this connector")
+ public KafkaConfig setTableNames(String tableNames)
+ {
+ this.tableNames = ImmutableSet.copyOf(Splitter.on(',').omitEmptyStrings().trimResults().split(tableNames));
+ return this;
+ }
+
+ public boolean isHideInternalColumns()
+ {
+ return hideInternalColumns;
+ }
+
+ @Config("kafka.hide-internal-columns")
+ @ConfigDescription("Whether internal columns are shown in table metadata or not. Default is no")
+ public KafkaConfig setHideInternalColumns(boolean hideInternalColumns)
+ {
+ this.hideInternalColumns = hideInternalColumns;
+ return this;
+ }
+
+ @NotNull
+ public File getTableDescriptionDir()
+ {
+ return tableDescriptionDir;
+ }
+
+ @Config("kafka.table-description-dir")
+ @ConfigDescription("Folder holding JSON description files for Kafka topics")
+ public KafkaConfig setTableDescriptionDir(File tableDescriptionDir)
+ {
+ this.tableDescriptionDir = tableDescriptionDir;
+ return this;
+ }
+
+ private static ImmutableSet parseNodes(String nodes)
+ {
+ Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
+ return StreamSupport.stream(splitter.split(nodes).spliterator(), false)
+ .map(KafkaConfig::toHostAddress)
+ .collect(toImmutableSet());
+ }
+
+ private static HostAddress toHostAddress(String value)
+ {
+ return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT);
+ }
+
+ @Min(1)
+ public int getMessagesPerSplit()
+ {
+ return messagesPerSplit;
+ }
+
+ @Config("kafka.messages-per-split")
+ @ConfigDescription("Count of Kafka messages to be processed by single Presto Kafka connector split")
+ public KafkaConfig setMessagesPerSplit(int messagesPerSplit)
+ {
+ this.messagesPerSplit = messagesPerSplit;
+ return this;
+ }
+
+ public boolean isTimestampUpperBoundPushDownEnabled()
+ {
+ return timestampUpperBoundPushDownEnabled;
+ }
+
+ @Config("kafka.timestamp-upper-bound-force-push-down-enabled")
+ @ConfigDescription("timestamp upper bound force pushing down enabled")
+ public KafkaConfig setTimestampUpperBoundPushDownEnabled(boolean timestampUpperBoundPushDownEnabled)
+ {
+ this.timestampUpperBoundPushDownEnabled = timestampUpperBoundPushDownEnabled;
+ return this;
+ }
+}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnector.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnector.java
index 610cdf0f40fad63e5369b3cdd81d05ea424c78fd..6241416f6e50f14324b45ba5c3dfa0795eceace1 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnector.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnector.java
@@ -14,44 +14,48 @@
package io.prestosql.plugin.kafka;
import io.airlift.bootstrap.LifeCycleManager;
-import io.airlift.log.Logger;
import io.prestosql.spi.connector.Connector;
import io.prestosql.spi.connector.ConnectorMetadata;
+import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorRecordSetProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.session.PropertyMetadata;
import io.prestosql.spi.transaction.IsolationLevel;
import javax.inject.Inject;
+import java.util.List;
+
import static io.prestosql.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.prestosql.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
-/**
- * Kafka specific implementation of the Presto Connector SPI. This is a read only connector.
- */
public class KafkaConnector
implements Connector
{
- private static final Logger log = Logger.get(KafkaConnector.class);
-
private final LifeCycleManager lifeCycleManager;
- private final KafkaMetadata metadata;
- private final KafkaSplitManager splitManager;
- private final KafkaRecordSetProvider recordSetProvider;
+ private final ConnectorMetadata metadata;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorRecordSetProvider recordSetProvider;
+ private final ConnectorPageSinkProvider pageSinkProvider;
+ private final KafkaSessionProperties sessionProperties;
@Inject
public KafkaConnector(
LifeCycleManager lifeCycleManager,
- KafkaMetadata metadata,
- KafkaSplitManager splitManager,
- KafkaRecordSetProvider recordSetProvider)
+ ConnectorMetadata metadata,
+ ConnectorSplitManager splitManager,
+ ConnectorRecordSetProvider recordSetProvider,
+ ConnectorPageSinkProvider pageSinkProvider,
+ KafkaSessionProperties sessionProperties)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
+ this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null");
}
@Override
@@ -79,14 +83,21 @@ public class KafkaConnector
return recordSetProvider;
}
+ @Override
+ public ConnectorPageSinkProvider getPageSinkProvider()
+ {
+ return pageSinkProvider;
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties.getSessionProperties();
+ }
+
@Override
public final void shutdown()
{
- try {
- lifeCycleManager.stop();
- }
- catch (Exception e) {
- log.error(e, "Error shutting down connector");
- }
+ lifeCycleManager.stop();
}
}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorFactory.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorFactory.java
index 2668963add837524a139350e32cf613e988dc7ea..82072a7f65150e1ea6e162914a2c618855f4eb0b 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorFactory.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorFactory.java
@@ -14,8 +14,7 @@
package io.prestosql.plugin.kafka;
import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.TypeLiteral;
+import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.prestosql.spi.NodeManager;
@@ -23,27 +22,20 @@ import io.prestosql.spi.connector.Connector;
import io.prestosql.spi.connector.ConnectorContext;
import io.prestosql.spi.connector.ConnectorFactory;
import io.prestosql.spi.connector.ConnectorHandleResolver;
-import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.type.TypeManager;
import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
-import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;
-/**
- * Creates Kafka Connectors based off catalogName and specific configuration.
- */
public class KafkaConnectorFactory
implements ConnectorFactory
{
- private final Optional>> tableDescriptionSupplier;
+ private final Module extension;
- KafkaConnectorFactory(Optional>> tableDescriptionSupplier)
+ KafkaConnectorFactory(Module extension)
{
- this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
+ this.extension = requireNonNull(extension, "extension is null");
}
@Override
@@ -64,32 +56,22 @@ public class KafkaConnectorFactory
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
- try {
- Bootstrap app = new Bootstrap(
- new JsonModule(),
- new KafkaConnectorModule(),
- binder -> {
- binder.bind(TypeManager.class).toInstance(context.getTypeManager());
- binder.bind(NodeManager.class).toInstance(context.getNodeManager());
-
- if (tableDescriptionSupplier.isPresent()) {
- binder.bind(new TypeLiteral>>() {}).toInstance(tableDescriptionSupplier.get());
- }
- else {
- binder.bind(new TypeLiteral>>() {}).to(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON);
- }
- });
+ Bootstrap app = new Bootstrap(
+ new JsonModule(),
+ new KafkaConnectorModule(),
+ extension,
+ binder -> {
+ binder.bind(ClassLoader.class).toInstance(KafkaConnectorFactory.class.getClassLoader());
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+ });
- Injector injector = app.strictConfig()
- .doNotInitializeLogging()
- .setRequiredConfigurationProperties(config)
- .initialize();
+ Injector injector = app
+ .strictConfig()
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
- return injector.getInstance(KafkaConnector.class);
- }
- catch (Exception e) {
- throwIfUnchecked(e);
- throw new RuntimeException(e);
- }
+ return injector.getInstance(KafkaConnector.class);
}
}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java
index 20e0b30128d8a251185d4d2e027d0f04a5a150e9..806401279db50493e8f1b0fe38587fcf6efd56f2 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java
@@ -19,40 +19,57 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.prestosql.decoder.DecoderModule;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorRecordSetProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ForClassLoaderSafe;
+import io.prestosql.plugin.kafka.encoder.EncoderModule;
+import io.prestosql.spi.connector.ConnectorMetadata;
+import io.prestosql.spi.connector.ConnectorPageSinkProvider;
+import io.prestosql.spi.connector.ConnectorRecordSetProvider;
+import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;
import javax.inject.Inject;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static io.prestosql.spi.type.TypeSignature.parseTypeSignature;
import static java.util.Objects.requireNonNull;
-/**
- * Guice module for the Apache Kafka connector.
- */
public class KafkaConnectorModule
implements Module
{
@Override
public void configure(Binder binder)
{
+ binder.bind(ConnectorMetadata.class).to(KafkaMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorRecordSetProvider.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorRecordSetProvider.class).to(ClassLoaderSafeConnectorRecordSetProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorPageSinkProvider.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaPageSinkProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorPageSinkProvider.class).to(ClassLoaderSafeConnectorPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaInternalFieldManager.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaSessionProperties.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaAdminFactory.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaFilterManager.class).in(Scopes.SINGLETON);
- binder.bind(KafkaMetadata.class).in(Scopes.SINGLETON);
- binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON);
- binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
-
- binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
-
- configBinder(binder).bindConfig(KafkaConnectorConfig.class);
+ //binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
+ //configBinder(binder).bindConfig(KafkaConnectorConfig.class);
+ configBinder(binder).bindConfig(KafkaConfig.class);
+ newSetBinder(binder, TableDescriptionSupplier.class).addBinding().toProvider(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON);
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class);
binder.install(new DecoderModule());
+ binder.install(new EncoderModule());
+ binder.install(new KafkaProducerModule());
}
public static final class TypeDeserializer
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerFactory.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..9872e7809459b0293f35188bd11f20f4d9344fc3
--- /dev/null
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.prestosql.plugin.kafka;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.util.Properties;
+
+public interface KafkaConsumerFactory
+{
+ default KafkaConsumer create()
+ {
+ return new KafkaConsumer<>(configure());
+ }
+
+ Properties configure();
+}
diff --git a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/NumberEncoder.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerModule.java
similarity index 52%
rename from presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/NumberEncoder.java
rename to presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerModule.java
index c7086a26434de719fbf5138deccebd417c262499..0759a716e065c9d0d197679e87d992f1b621d4b0 100644
--- a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/NumberEncoder.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerModule.java
@@ -11,27 +11,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.plugin.kafka.util;
+package io.prestosql.plugin.kafka;
-import kafka.serializer.Encoder;
-import kafka.utils.VerifiableProperties;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
-import java.nio.ByteBuffer;
-
-public class NumberEncoder
- implements Encoder
+public class KafkaConsumerModule
+ implements Module
{
- @SuppressWarnings("UnusedParameters")
- public NumberEncoder(VerifiableProperties properties)
- {
- // constructor required by Kafka
- }
-
@Override
- public byte[] toBytes(Number value)
+ public void configure(Binder binder)
{
- ByteBuffer buf = ByteBuffer.allocate(8);
- buf.putLong(value == null ? 0L : value.longValue());
- return buf.array();
+ binder.bind(KafkaConsumerFactory.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON);
}
}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaErrorCode.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaErrorCode.java
index 006a70a52ac1974e898040889ca3e41662f7caf4..44a883464dee744658250e508a8a92a9f5ced003 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaErrorCode.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaErrorCode.java
@@ -18,6 +18,7 @@ import io.prestosql.spi.ErrorCodeSupplier;
import io.prestosql.spi.ErrorType;
import static io.prestosql.spi.ErrorType.EXTERNAL;
+import static io.prestosql.spi.ErrorType.INTERNAL_ERROR;
/**
* Kafka connector specific error codes.
@@ -25,7 +26,10 @@ import static io.prestosql.spi.ErrorType.EXTERNAL;
public enum KafkaErrorCode
implements ErrorCodeSupplier
{
- KAFKA_SPLIT_ERROR(0, EXTERNAL);
+ KAFKA_SPLIT_ERROR(0, EXTERNAL),
+ KAFKA_SCHEMA_ERROR(1, EXTERNAL),
+ KAFKA_PRODUCER_ERROR(2, INTERNAL_ERROR)
+ /**/;
private final ErrorCode errorCode;
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaFilterManager.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaFilterManager.java
new file mode 100644
index 0000000000000000000000000000000000000000..a2c7e962d82cd3c6554d3b7b2d48781517706ec0
--- /dev/null
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaFilterManager.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.predicate.Domain;
+import io.prestosql.spi.predicate.Marker;
+import io.prestosql.spi.predicate.Ranges;
+import io.prestosql.spi.predicate.SortedRangeSet;
+import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.spi.predicate.ValueSet;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+
+import javax.inject.Inject;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static io.prestosql.plugin.kafka.KafkaErrorCode.KAFKA_SPLIT_ERROR;
+import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.OFFSET_TIMESTAMP_FIELD;
+import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.PARTITION_ID_FIELD;
+import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.PARTITION_OFFSET_FIELD;
+import static java.lang.Math.floorDiv;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class KafkaFilterManager
+{
+ public static final int MICROSECONDS_PER_MILLISECOND = 1_000;
+ private static final long INVALID_KAFKA_RANGE_INDEX = -1;
+ private static final String TOPIC_CONFIG_TIMESTAMP_KEY = "message.timestamp.type";
+ private static final String TOPIC_CONFIG_TIMESTAMP_VALUE_LOG_APPEND_TIME = "LogAppendTime";
+
+ private final KafkaConsumerFactory consumerFactory;
+ private final KafkaAdminFactory adminFactory;
+
+ @Inject
+ public KafkaFilterManager(KafkaConsumerFactory consumerFactory, KafkaAdminFactory adminFactory)
+ {
+ this.consumerFactory = requireNonNull(consumerFactory, "consumerManager is null");
+ this.adminFactory = requireNonNull(adminFactory, "adminFactory is null");
+ }
+
+ public KafkaFilteringResult getKafkaFilterResult(
+ ConnectorSession session,
+ KafkaTableHandle kafkaTableHandle,
+ List partitionInfos,
+ Map partitionBeginOffsets,
+ Map partitionEndOffsets)
+ {
+ requireNonNull(session, "session is null");
+ requireNonNull(kafkaTableHandle, "kafkaTableHandle is null");
+ requireNonNull(partitionInfos, "partitionInfos is null");
+ requireNonNull(partitionBeginOffsets, "partitionBeginOffsets is null");
+ requireNonNull(partitionEndOffsets, "partitionEndOffsets is null");
+
+ TupleDomain constraint = kafkaTableHandle.getConstraint();
+ verify(!constraint.isNone(), "constraint is none");
+
+ if (!constraint.isAll()) {
+ Set partitionIds = partitionInfos.stream().map(partitionInfo -> (long) partitionInfo.partition()).collect(toImmutableSet());
+ Optional offsetRanged = Optional.empty();
+ Optional offsetTimestampRanged = Optional.empty();
+ Set partitionIdsFiltered = partitionIds;
+ Optional