diff --git a/flink-connector-pulsar/README.md b/flink-connector-pulsar/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..7180d0c9e47f2ee1f012904676a2d5dc84e7f0e4
--- /dev/null
+++ b/flink-connector-pulsar/README.md
@@ -0,0 +1,205 @@
+## flink-connector-pulsar
+
+> 概要说明:
+> 实现依附:https://gitee.com/apache/flink/tree/release-1.14/flink-connectors
+* Flink 官方自1.14版本支持 Flink-pulsar-connector(目前未支持 Flink-sql)
+* 在此版本前,自主实现了Flink-pulsar-connector,本次Flink-sql的实现向官方Flink-connector-pulsar对齐,更好的兼容使用,实现性能最优!
+* 就生产经验,避坑处理
+* 本次Pulsar版本使用版本:2.8.2 Flink版本:1.14.3
+
+## ★详情介绍 Pulsar-SQL Connector
+### Dependencies
+In order to use the Pulsar connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
+
+* Maven dependency
+
+```
+
+ org.apache.flink
+ flink-connector-Pulsar_2.11
+ 1.14.3
+
+```
+### How to create a Pulsar table
+```
+CREATE TABLE source_pulsar_n(
+ requestId VARCHAR,
+ `timestamp` BIGINT,
+ `date` VARCHAR,
+ appId VARCHAR,
+ appName VARCHAR,
+ forwardTimeMs VARCHAR,
+ processingTimeMs INT,
+ errCode VARCHAR,
+ userIp VARCHAR,
+ b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
+) WITH (
+ 'connector.type' = 'pulsar',
+ 'connector.version' = 'universal',
+ 'connector.topic' = 'persistent://streamx/dev/context.pulsar',
+ 'connector.service-url' = 'pulsar://pulsar-streamx-n.stream.com:6650',
+ 'connector.subscription-name' = 'tmp_print_detail',
+ 'connector.subscription-type' = 'Shared',
+ 'connector.subscription-initial-position' = 'Latest',
+ 'update-mode' = 'append',
+ 'format.type' = 'json',
+ 'format.derive-schema' = 'true'
+);
+```
+### Data Type Mapping
+Pulsar stores message keys and values as bytes, so Pulsar doesn’t have schema or data types. The Pulsar messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.
+
+### Connector Options
+| Option | Required | Default | Type | Description |
+| --------------------------------------- | ----------------- | ------- | ------ | ------------------------------------------------------------ |
+| connector.type | required | (none) | String | Specify what connector to use, for pulsar use `'pulsar'`. |
+| connector.version | required | (none) | String | universal |
+| connector.topic | required for sink | (none) | String | Topic name(s) to read data from when the table is used as source |
+| connector.service-url | optional | (none) | String | The address of the pulsar |
+| connector.subscription-name | required | (none) | String | The subscription name of the Pulsar |
+| connector.subscription-type | required | (none) | String | A subscription model of the Pulsar【Shared、Exclusive、Key_Shared、Failover】 |
+| connector.subscription-initial-position | required | (none) | String | initial-position[EARLIEST、LATEST、TIMESTAMP] |
+| update-mode | optional | (none) | String | append or upsert |
+| format.type | optional | (none) | String | json、csv...... |
+| format.derive-schema | optional | (none) | String | ture or false |
+| | | | | |
+
+
+
+
+
+## 🚀 快速上手
+```shell
+git clone https://github.com/streamxhub/streamx-connector.git
+cd streamx-connector/flink-connector-pulsar
+mvn clean install -DskipTests -Dflink.version=$version
+```
+
+## 🎉 Features
+
+* Key and Value Formats
+
+Both the key and value part of a Pulsar record can be serialized to and deserialized from raw bytes using one of the given
+
+* Value Format
+
+Since a key is optional in Pulsar records, the following statement reads and writes records with a configured value format but without a key format. The 'format' option is a synonym for 'value.format'. All format options are prefixed with the format identifier.
+
+## 👻 使用
+
+```sql
+-- Pulsar多集群形式,
+-- 此处分 n、b 两个集群
+
+--声明数据源
+CREATE TABLE source_pulsar_n(
+ requestId VARCHAR,
+ `timestamp` BIGINT,
+ `date` VARCHAR,
+ appId VARCHAR,
+ appName VARCHAR,
+ forwardTimeMs VARCHAR,
+ processingTimeMs INT,
+ errCode VARCHAR,
+ userIp VARCHAR,
+ createTime BIGINT,
+ b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
+) WITH (
+ 'connector.type' = 'pulsar',
+ 'connector.version' = 'universal',
+ 'connector.topic' = 'persistent://streamx/dev/context.pulsar',
+ 'connector.service-url' = 'pulsar://pulsar-streamx-n.stream.com:6650',
+ 'connector.subscription-name' = 'tmp_print_detail',
+ 'connector.subscription-type' = 'Shared',
+ 'connector.subscription-initial-position' = 'Latest',
+ 'update-mode' = 'append',
+ 'format.type' = 'json',
+ 'format.derive-schema' = 'true'
+);
+
+
+CREATE TABLE source_pulsar_b(
+ requestId VARCHAR,
+ `timestamp` BIGINT,
+ `date` VARCHAR,
+ appId VARCHAR,
+ appName VARCHAR,
+ forwardTimeMs VARCHAR,
+ processingTimeMs INT,
+ errCode VARCHAR,
+ userIp VARCHAR,
+ createTime BIGINT,
+ b_create_im_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')
+) WITH (
+ 'connector.type' = 'pulsar',
+ 'connector.version' = 'universal',
+ 'connector.topic' = 'persistent://streamx/dev/context.pulsar',
+ 'connector.service-url' = 'pulsar://pulsar-streamx-b.stream.com:6650',
+ 'connector.subscription-name' = 'tmp_print_detail',
+ 'connector.subscription-type' = 'Shared',
+ 'connector.subscription-initial-position' = 'Latest',
+ 'update-mode' = 'append',
+ 'format.type' = 'json',
+ 'format.derive-schema' = 'true'
+);
+
+-- 合并数据源
+create view pulsar_source_all AS
+select
+ requestId ,
+ `timestamp`,
+ `date`,
+ appId,
+ appName,
+ forwardTimeMs,
+ processingTim,
+ errCode,
+ userIp,
+ b_create_time
+from source_pulsar_n
+union all
+select
+ requestId ,
+ `timestamp`,
+ `date`,
+ appId,
+ appName,
+ forwardTimeMs,
+ processingTim,
+ errCode,
+ userIp,
+ b_create_time
+from source_pulsar_b;
+
+-- 创建 sink
+create table sink_pulsar_result(
+ requestId VARCHAR,
+ `timestamp` BIGINT,
+ `date` VARCHAR,
+ appId VARCHAR,
+ appName VARCHAR,
+ forwardTimeMs VARCHAR,
+ processingTimeMs INT,
+ errCode VARCHAR,
+ userIp VARCHAR
+) with (
+ 'connector' = 'print'
+);
+
+-- 执行逻辑
+-- 查看 pulsar主题明细数据
+insert into sink_pulsar_result
+select
+ requestId ,
+ `timestamp`,
+ `date`,
+ appId,
+ appName,
+ forwardTimeMs,
+ processingTim,
+ errCode,
+ userIp,
+ b_create_time
+from pulsar_source_all;
+
+```
diff --git a/flink-connector-pulsar/pom.xml b/flink-connector-pulsar/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..57b7bd6c23a86bf16b2dd6ba43e361fd80fa9154
--- /dev/null
+++ b/flink-connector-pulsar/pom.xml
@@ -0,0 +1,330 @@
+
+
+
+ streamx-connector
+ com.streamxhub.streamx
+ 1.1.0
+
+ 4.0.0
+
+ flink-connector-pulsar
+
+
+ 2.8.2
+
+ 2.11
+ 1.4.3
+ 2.4.1
+
+ 1.2.7
+ 1.14.3
+
+
+
+
+ org.apache.pulsar
+ pulsar-client
+ ${pulsar.version}
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
+ org.apache.flink
+ flink-connector-pulsar_2.11
+ 1.14.3
+
+
+ org.apache.flink
+ flink-json
+ 1.14.3
+
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge_2.11
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-table-planner_2.11
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-streaming-scala_2.11
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
+
+
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+ provided
+
+
+
+
+
+ org.apache.flink
+ flink-scala_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge_${scala.binary.version}
+ ${flink.version}
+ provided
+ true
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+
+ io.netty
+ *
+
+
+ provided
+
+
+
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-core
+
+
+
+
+
+ org.apache.flink
+ flink-hadoop-compatibility_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-shaded-include-yarn_${scala.binary.version}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+ ${hadoop.version}
+ test
+
+
+
+ jdk.tools
+ jdk.tools
+
+
+
+ io.netty
+ *
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop.version}
+ test-jar
+ test
+
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-test-utils_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ test-jar
+ test
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
+
+
+
+
+
+
+
+
+
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+ provided
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.0
+
+ 1.8
+ 1.8
+ UTF-8
+
+
+
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+ false
+
+
+
+ package
+
+ shade
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+ maven-antrun-plugin
+ 1.2
+
+
+ copy-resources
+ package
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarConnectorOptions.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarConnectorOptions.java
new file mode 100644
index 0000000000000000000000000000000000000000..b83915959256aee8765658a42e178308b5491979
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarConnectorOptions.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.streamxhub.streamx.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * @author DarrenDa
+ * * @version 1.0
+ * * @Desc:
+ **/
+
+/** Options for the Pulsar connector. */
+@PublicEvolving
+public class PulsarConnectorOptions {
+
+ // --------------------------------------------------------------------------------------------
+ // Format options
+ // --------------------------------------------------------------------------------------------
+ public static final ConfigOption SERVICE_URL =
+ ConfigOptions.key("connector.service-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines pulsar service url. ");
+
+ public static final ConfigOption ADMIN_URL =
+ ConfigOptions.key("connector.admin-url")
+ .stringType()
+ .defaultValue("http://pulsar-streamx-qa.streamx.com:8080")
+ .withDescription(
+ "Defines pulsar admin url. ");
+
+ public static final ConfigOption TOPIC =
+ ConfigOptions.key("connector.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines pulsar topic. ");
+
+ public static final ConfigOption SUBSCRIPTION_NAME =
+ ConfigOptions.key("connector.subscription-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines pulsar subscription name. ");
+
+ public static final ConfigOption SUBSCRIPTION_TYPE =
+ ConfigOptions.key("connector.subscription-type")
+ .enumType(SubscriptionType.class)
+ .defaultValue(SubscriptionType.Shared)
+ .withDescription(
+ "Defines pulsar subscription type. ");
+
+ public static final ConfigOption SUBSCRIPTION_INITIAL_POSITION =
+ ConfigOptions.key("connector.subscription-initial-position")
+ .enumType(ScanStartupMode.class)
+ .defaultValue(ScanStartupMode.LATEST)
+ .withDescription("Startup mode for Pulsar consumer.");
+
+ public static final ConfigOption SUBSCRIPTION_INITIAL_POSITION_TIMESTAMP =
+ ConfigOptions.key("connector.subscription-initial-position.timestamp")
+ .longType()
+ .noDefaultValue()
+ .withDescription("Start from the specified message time by Message.getPublishTime().");
+
+ public static final ConfigOption UPDATE_MODE =
+ ConfigOptions.key("update-mode")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines pulsar update mode. ");
+ public static final ConfigOption SOURCE_PARALLELISM =
+ ConfigOptions.key("source-parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines pulsar sink parallelism. ");
+ public static final ConfigOption SINK_PARALLELISM =
+ ConfigOptions.key("sink-parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines pulsar sink parallelism. ");
+
+ //与老平台 1.14.3之前版本的sql进行兼容,但是并未使用的参数
+ public static final ConfigOption VERSION =
+ ConfigOptions.key("connector.version")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines pulsar version. ");
+
+ //与老平台 1.14.3之前版本的sql进行兼容,但是并未使用的参数
+ public static final ConfigOption DERIVE_SCHEMA =
+ ConfigOptions.key("format.derive-schema")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines pulsar derive schema. ");
+
+// public static final ConfigOption PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE =
+// ConfigOptions.key("pulsar.source.enableAutoAcknowledgeMessage")
+// .booleanType()
+// .noDefaultValue()
+// .withDescription(
+// "Defines pulsar enable auto acknowledge message. ");
+
+ // --------------------------------------------------------------------------------------------
+ // Enums
+ // --------------------------------------------------------------------------------------------
+
+ /** Startup mode for the Pulsar consumer, see {@link #SUBSCRIPTION_INITIAL_POSITION}. */
+ public enum ScanStartupMode implements DescribedEnum {
+ EARLIEST("Earliest", text("Start from the earliest available message in the topic..")),
+ LATEST("Latest", text("Start from the latest available message in the topic.")),
+ TIMESTAMP("Timestamp", text("Start from the specified message time by Message.getPublishTime()."));
+
+ private final String value;
+ private final InlineElement description;
+
+ ScanStartupMode(String value, InlineElement description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return description;
+ }
+ }
+
+ private PulsarConnectorOptions() {}
+}
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarConnectorOptionsUtil.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarConnectorOptionsUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..fd761ae3b22d1e9291f867f98fc801ce8037a871
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarConnectorOptionsUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.streamxhub.streamx.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author DarrenDa
+ * * @version 1.0
+ * * @Desc:
+ **/
+
+/** Utilities for {@link PulsarConnectorOptions}. */
+@PublicEvolving
+public class PulsarConnectorOptionsUtil {
+
+ // Prefix for Pulsar specific properties.
+ public static final String PROPERTIES_PREFIX = "properties.";
+ public static final String PROPERTIES_CLIENT_PREFIX = "properties_client.";
+
+ public static Properties getPulsarProperties(Map tableOptions, String prefix) {
+ final Properties pulsarProperties = new Properties();
+
+ if (hasPulsarClientProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(prefix))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey = key.substring((prefix).length());
+ pulsarProperties.put(subKey, value);
+ });
+ }
+ return pulsarProperties;
+ }
+
+
+ /**
+ * Decides if the table options contains Pulsar client properties that start with prefix
+ * 'properties'.
+ */
+ private static boolean hasPulsarClientProperties(Map tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+ }
+
+ private PulsarConnectorOptionsUtil() {}
+}
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicSink.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicSink.java
new file mode 100644
index 0000000000000000000000000000000000000000..093d95a3f4ef594dadee7a35df38abe399b5e12a
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicSink.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.streamxhub.streamx.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * @author DarrenDa
+ * * @version 1.0
+ * * @Desc:
+ **/
+
+/** A version-agnostic Pulsar {@link DynamicTableSink}. */
+@Internal
+public class PulsarDynamicSink implements DynamicTableSink {
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Metadata that is appended at the end of a physical sink row. */
+ protected List metadataKeys;
+
+ // --------------------------------------------------------------------------------------------
+ // Format attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type of consumed data type. */
+ protected DataType consumedDataType;
+
+ /** Data type to configure the formats. */
+ protected final DataType physicalDataType;
+
+ /** Optional format for encoding to Pulsar. */
+ protected final @Nullable
+ EncodingFormat> encodingFormat;
+
+ // --------------------------------------------------------------------------------------------
+ // Pulsar-specific attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** The Pulsar topic to write to. */
+ protected final String topic;
+
+ /** The Pulsar service url config. */
+ protected final String serviceUrl;
+
+ /** The Pulsar update mode to. */
+ protected final String updateMode;
+
+
+ /** Properties for the Pulsar producer. */
+ protected final Properties pulsarProducerProperties;
+
+ /** Properties for the Pulsar producer. */
+ protected final Properties pulsarClientProperties;
+
+ /** Properties for the Pulsar producer parallelism. */
+ protected final Integer sinkParallelism;
+
+ public PulsarDynamicSink(
+ DataType physicalDataType,
+ @Nullable EncodingFormat> encodingFormat,
+ String topic,
+ String service_url,
+ String update_mode,
+ Properties pulsarProducerProperties,
+ Properties pulsarClientProperties,
+ Integer sinkParallelism) {
+ // Format attributes
+ this.physicalDataType =
+ checkNotNull(physicalDataType, "Physical data type must not be null.");
+ this.encodingFormat = encodingFormat;
+ // Mutable attributes
+ this.metadataKeys = Collections.emptyList();
+ // Pulsar-specific attributes
+ this.topic = checkNotNull(topic, "Topic must not be null.");
+ this.serviceUrl = checkNotNull(service_url, "Service url must not be null.");
+ this.updateMode = checkNotNull(update_mode, "Update mode must not be null.");
+ this.pulsarProducerProperties = checkNotNull(pulsarProducerProperties, "pulsarProducerProperties must not be null.");
+ this.pulsarClientProperties = checkNotNull(pulsarClientProperties, "pulsarClientProperties must not be null.");
+ this.sinkParallelism = sinkParallelism;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ if(updateMode.equals("append")){
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .build();
+ }else {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+// .addContainedKind(RowKind.UPDATE_BEFORE)
+// .addContainedKind(RowKind.DELETE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
+ }
+// return encodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ SerializationSchema runtimeEncoder = encodingFormat.createRuntimeEncoder(context, physicalDataType);
+
+ PulsarSinkFunction sinkFunction =
+ new PulsarSinkFunction<>(
+ topic,
+ serviceUrl,
+ pulsarProducerProperties,
+ pulsarClientProperties,
+ runtimeEncoder);
+ //sink的并行度设置
+ if(sinkParallelism != null){
+ return SinkFunctionProvider.of(sinkFunction,sinkParallelism);
+ }else{
+ return SinkFunctionProvider.of(sinkFunction);
+ }
+
+ }
+
+
+ @Override
+ public DynamicTableSink copy() {
+ final PulsarDynamicSink copy =
+ new PulsarDynamicSink(
+ physicalDataType,
+ encodingFormat,
+ topic,
+ serviceUrl,
+ updateMode,
+ pulsarProducerProperties,
+ pulsarClientProperties,
+ sinkParallelism);
+ copy.metadataKeys = metadataKeys;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Pulsar table sink";
+ }
+
+
+}
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicSource.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..38c370967fd998789d28fdd2ea09ba66f30be17d
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicSource.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.streamxhub.streamx.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * @author DarrenDa
+ * * @version 1.0
+ * * @Desc:
+ **/
+
+/** A version-agnostic Pulsar {@link ScanTableSource}. */
+@Internal
+public class PulsarDynamicSource
+ implements ScanTableSource, SupportsWatermarkPushDown {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarDynamicSource.class);
+ private final String serviceUrl;
+ private final String adminUrl;
+ private final String subscriptionName;
+ private final SubscriptionType subscriptionType;
+ private final PulsarConnectorOptions.ScanStartupMode startupMode;
+ private final String topic;
+ private final DecodingFormat> decodingFormat;
+ private final DataType producedDataType;
+ private final String tableIdentifier;
+ private final Properties properties;
+ private final Long timestamp;
+ private final Integer sourceParallelism;
+
+ /** Watermark strategy that is used to generate per-partition watermark. */
+ protected WatermarkStrategy watermarkStrategy;
+
+
+ public PulsarDynamicSource(
+ String serviceUrl,
+ String adminUrl,
+ String subscriptionName,
+ SubscriptionType subscriptionType,
+ PulsarConnectorOptions.ScanStartupMode startupMode,
+ Long timestamp,
+ String topic,
+ DecodingFormat> decodingFormat,
+ DataType producedDataType,
+ String tableIdentifier,
+ Properties properties,
+ Integer sourceParallelism) {
+ this.serviceUrl = serviceUrl;
+ this.adminUrl = adminUrl;
+ this.subscriptionName = subscriptionName;
+ this.subscriptionType = subscriptionType;
+ this.startupMode = startupMode;
+ this.timestamp = timestamp;
+ this.topic = topic;
+ this.decodingFormat = decodingFormat;
+ this.producedDataType = producedDataType;
+ this.tableIdentifier = tableIdentifier;
+ this.properties = properties;
+ this.sourceParallelism = sourceParallelism;
+ this.watermarkStrategy = null;
+
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ // in our example the format decides about the changelog mode
+ // but it could also be the source itself
+ return decodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+
+ // create runtime classes that are shipped to the cluster
+
+ final DeserializationSchema deserializer =
+ decodingFormat.createRuntimeDecoder(runtimeProviderContext, producedDataType);
+
+ final PulsarSource pulsarSource =
+ createPulsarSource( deserializer);
+
+
+ return new DataStreamScanProvider() {
+ @Override
+ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) {
+ if (watermarkStrategy == null) {
+ LOG.info("WatermarkStrategy 为空");
+ watermarkStrategy = WatermarkStrategy.noWatermarks();
+ }else{
+ LOG.info("WatermarkStrategy 不为空");
+ }
+
+ DataStreamSource rowDataDataStreamSource = execEnv.fromSource(
+ pulsarSource, watermarkStrategy, "PulsarSource-" + tableIdentifier);
+
+ //设置source并行度
+ if(sourceParallelism != null){
+ rowDataDataStreamSource.setParallelism(sourceParallelism);
+ }
+ return rowDataDataStreamSource;
+ }
+
+ @Override
+ public boolean isBounded() {
+ return pulsarSource.getBoundedness() == Boundedness.BOUNDED;
+ }
+ };
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new PulsarDynamicSource(
+ serviceUrl,
+ adminUrl,
+ subscriptionName,
+ subscriptionType,
+ startupMode,
+ timestamp,
+ topic,
+ decodingFormat,
+ producedDataType,
+ tableIdentifier,
+ properties,
+ sourceParallelism
+ );
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Pulsar Table Source";
+ }
+
+
+ //---------------------------------------------------------------------------------------------
+ protected PulsarSource createPulsarSource(
+ DeserializationSchema deserializer) {
+
+ final PulsarSourceBuilder pulsarSourceBuilder = PulsarSource.builder();
+
+
+ pulsarSourceBuilder
+ .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics(topic)
+ .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(deserializer))
+ .setConfig(Configuration.fromMap((Map)properties))
+ .setSubscriptionName(subscriptionName);
+
+ switch (subscriptionType) {
+ case Shared:
+ pulsarSourceBuilder.setSubscriptionType(SubscriptionType.Shared);
+ break;
+ case Exclusive:
+ pulsarSourceBuilder.setSubscriptionType(SubscriptionType.Exclusive);
+ break;
+ case Key_Shared:
+ pulsarSourceBuilder.setSubscriptionType(SubscriptionType.Key_Shared);
+ break;
+ case Failover:
+ pulsarSourceBuilder.setSubscriptionType(SubscriptionType.Failover);
+ break;
+ default:
+ throw new TableException(
+ "Unsupported subscriptionType. Validator should have checked that.");
+ }
+
+ switch (startupMode) {
+ case EARLIEST:
+ pulsarSourceBuilder.setStartCursor(StartCursor.earliest());
+ break;
+ case LATEST:
+ pulsarSourceBuilder.setStartCursor(StartCursor.latest());
+ break;
+ case TIMESTAMP:
+ checkNotNull(timestamp, "No timestamp supplied.");
+ pulsarSourceBuilder.setStartCursor(StartCursor.fromMessageTime(timestamp));
+ break;
+ default:
+ throw new TableException(
+ "Unsupported startup mode. Validator should have checked that.");
+ }
+
+ return pulsarSourceBuilder.build();
+
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+
+}
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicTableFactory.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicTableFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..6e9381b4698582233368fb921707587cd76212eb
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarDynamicTableFactory.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.streamxhub.streamx.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.*;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.types.DataType;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static com.streamxhub.streamx.flink.connector.pulsar.table.PulsarConnectorOptions.*;
+import static com.streamxhub.streamx.flink.connector.pulsar.table.PulsarConnectorOptionsUtil.*;
+
+
+/**
+ * Factory for creating configured instances of {@link PulsarDynamicSource} and {
+ * @link PulsarDynamicSink}.
+ *
+ * @author DarrenDa
+ * * @version 1.0
+ * * @Desc:
+ */
+
+@Internal
+public class PulsarDynamicTableFactory
+ implements DynamicTableSourceFactory,DynamicTableSinkFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarDynamicTableFactory.class);
+
+ public static final String IDENTIFIER = "pulsar";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ final Set> options = new HashSet<>();
+ options.add(SERVICE_URL);
+ return options;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ final Set> options = new HashSet<>();
+ options.add(ADMIN_URL);
+ options.add(SUBSCRIPTION_NAME);
+ options.add(SUBSCRIPTION_TYPE);
+ options.add(SUBSCRIPTION_INITIAL_POSITION);
+ options.add(SUBSCRIPTION_INITIAL_POSITION_TIMESTAMP);
+ options.add(FactoryUtil.FORMAT);
+ options.add(TOPIC);
+ options.add(UPDATE_MODE);
+ options.add(SOURCE_PARALLELISM);
+ options.add(SINK_PARALLELISM);
+ options.add(VERSION);
+ options.add(DERIVE_SCHEMA);
+
+ return options;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ // either implement your custom validation logic here ...
+ // or use the provided helper utility
+ final TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+
+ // discover a suitable decoding format
+ final DecodingFormat> decodingFormat =
+ helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class, FactoryUtil.FORMAT);
+
+ // validate all options
+// helper.validate();
+ helper.validateExcept(PROPERTIES_PREFIX, PROPERTIES_CLIENT_PREFIX);
+
+ // get the validated options
+ final ReadableConfig tableOptions = helper.getOptions();
+ final String serviceUrl = tableOptions.get(SERVICE_URL);
+ final String adminUrl = tableOptions.get(ADMIN_URL);
+ final String subscriptionName = tableOptions.get(SUBSCRIPTION_NAME);
+ final SubscriptionType subscriptionType = tableOptions.get(SUBSCRIPTION_TYPE);
+ final ScanStartupMode startupMode = tableOptions.get(SUBSCRIPTION_INITIAL_POSITION);
+ final Long timestamp = tableOptions.get(SUBSCRIPTION_INITIAL_POSITION_TIMESTAMP);
+ final String topic = tableOptions.get(TOPIC);
+ final Integer sourceParallelism = tableOptions.get(SOURCE_PARALLELISM);
+
+ // derive the produced data type (excluding computed columns) from the catalog table
+ final DataType producedDataType =
+ context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+ // create and return dynamic table source
+ return new PulsarDynamicSource(
+ serviceUrl,
+ adminUrl,
+ subscriptionName,
+ subscriptionType,
+ startupMode,
+ timestamp,
+ topic,
+ decodingFormat,
+ producedDataType,
+ context.getObjectIdentifier().asSummaryString(),
+ getPulsarProperties(context.getCatalogTable().getOptions(), PROPERTIES_PREFIX),
+ sourceParallelism
+ );
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(
+ this, context);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+ final String update_mode = tableOptions.get(UPDATE_MODE);
+ final Integer sinkParallelism = tableOptions.get(SINK_PARALLELISM);
+
+
+ helper.validateExcept(PROPERTIES_PREFIX, PROPERTIES_CLIENT_PREFIX);
+
+ final EncodingFormat> encodingFormat =
+ helper.discoverEncodingFormat(
+ SerializationFormatFactory.class, FactoryUtil.FORMAT);
+
+ //校验sql建表时是否指定主键约束
+ //我们一般使用flink自动推导出来的主键,不显式设置主键约束,所以这个校验方法暂时不使用
+// validatePKConstraints(
+// update_mode, context.getObjectIdentifier(), context.getCatalogTable(), encodingFormat);
+
+ final DataType physicalDataType =
+ context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+ return createPulsarTableSink(
+ physicalDataType,
+ encodingFormat,
+ tableOptions.get(TOPIC),
+ tableOptions.get(SERVICE_URL),
+ update_mode,
+ getPulsarProperties(context.getCatalogTable().getOptions(), PROPERTIES_PREFIX),
+ getPulsarProperties(context.getCatalogTable().getOptions(), PROPERTIES_CLIENT_PREFIX),
+ sinkParallelism
+ );
+ }
+
+
+ //校验sql建表时是否指定主键约束
+ private static void validatePKConstraints(
+ @Nullable String update_mode, ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
+
+ if(!update_mode.equals("append") && !update_mode.equals("upsert")){
+ throw new ValidationException(
+ String.format(
+ "The Pulsar table '%s' with update-mode should be 'append' or 'upsert'",
+ tableName.asSummaryString()));
+ }else if (catalogTable.getSchema().getPrimaryKey().isPresent()
+ && update_mode.equals("append")) {
+ throw new ValidationException(
+ String.format(
+ "The Pulsar table '%s' with append update-mode doesn't support defining PRIMARY KEY constraint"
+ + " on the table, because it can't guarantee the semantic of primary key.",
+ tableName.asSummaryString()));
+ }else if(!catalogTable.getSchema().getPrimaryKey().isPresent()
+ && update_mode.equals("upsert")){
+ throw new ValidationException(
+ "'upsert' tables require to define a PRIMARY KEY constraint. "
+ + "The PRIMARY KEY specifies which columns should be read from or write to the Pulsar message key. "
+ + "The PRIMARY KEY also defines records in the 'upsert' table should update or delete on which keys.");
+ }
+ }
+
+ protected PulsarDynamicSink createPulsarTableSink(
+ DataType physicalDataType,
+ @Nullable EncodingFormat> encodingFormat,
+ String topic,
+ String service_url,
+ String update_mode,
+ Properties pulsarProducerProperties,
+ Properties pulsarClientProperties,
+ Integer sinkParallelism) {
+ return new PulsarDynamicSink(
+ physicalDataType,
+ encodingFormat,
+ topic,
+ service_url,
+ update_mode,
+ pulsarProducerProperties,
+ pulsarClientProperties,
+ sinkParallelism);
+ }
+
+}
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarSinkFunction.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarSinkFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..19eed941fa2fc80417ece803de20156c8662ccc2
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarSinkFunction.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.streamxhub.streamx.flink.connector.pulsar.table;
+
+import com.alibaba.fastjson.JSONObject;
+import com.streamxhub.streamx.flink.connector.pulsar.table.util.PulsarConnectionHolder;
+import com.streamxhub.streamx.flink.connector.pulsar.table.util.PulsarProducerHolder;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializableObject;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The sink function for Pulsar.
+ * @author DarrenDa
+ * * @version 1.0
+ * * @Desc:
+ */
+@Internal
+public class PulsarSinkFunction extends RichSinkFunction
+ implements CheckpointedFunction{
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkFunction.class);
+
+ private final String topic;
+ private final String serviceUrl;
+ private final Properties pulsarProducerProperties;
+ private final Properties pulsarClientProperties;
+ SerializationSchema runtimeEncoder;
+// private transient PulsarClient pulsarClient;
+ private transient Producer producer;
+ private transient volatile boolean closed = false;
+
+ /** Flag indicating whether to accept failures (and log them), or to fail on failures. Default is False.*/
+ protected boolean logFailuresOnly;
+
+ /**
+ * If true, the producer will wait until all outstanding records have been send to the broker. Default is True.
+ */
+ protected boolean flushOnCheckpoint = true;
+
+ /** The callback than handles error propagation or logging callbacks. */
+ protected transient BiConsumer sendCallback;
+
+ /** Errors encountered in the async producer are stored here. */
+ protected transient volatile Exception asyncException;
+
+ /** Lock for accessing the pending records. */
+ protected final SerializableObject pendingRecordsLock = new SerializableObject();
+
+ /** Number of unacknowledged records. */
+ protected long pendingRecords;
+
+
+ public PulsarSinkFunction(
+ String topic,
+ String serviceUrl,
+ Properties pulsarProducerProperties,
+ Properties pulsarClientProperties,
+ SerializationSchema runtimeEncoder
+ ) {
+ this.topic = topic;
+ this.serviceUrl=serviceUrl;
+ this.pulsarProducerProperties = pulsarProducerProperties;
+ this.pulsarClientProperties = pulsarClientProperties;
+ this.runtimeEncoder = runtimeEncoder;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ LOG.info("start open ...");
+ try {
+ RuntimeContext ctx = getRuntimeContext();
+
+ LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into (※) pulsar topic {}",
+ ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), topic);
+
+// this.producer = createProducer();
+ this.producer = createReusedProducer();
+ LOG.info("Pulsar producer has been created.");
+
+ } catch (IOException ioe) {
+ LOG.error("Exception while creating connection to Pulsar.", ioe);
+ throw new RuntimeException("Cannot create connection to Pulsar.", ioe);
+ }catch (Exception ex){
+ LOG.error("Exception while creating connection to Pulsar.", ex);
+ throw new RuntimeException("Cannot create connection to Pulsar.", ex);
+ }
+
+ if (flushOnCheckpoint
+ && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
+ LOG.warn(
+ "Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
+ flushOnCheckpoint = false;
+ }
+
+ if (logFailuresOnly) {
+ this.sendCallback =
+ (t, u) -> {
+ if (u != null) {
+ LOG.error(
+ "Error while sending message to Pulsar: {}",
+ ExceptionUtils.stringifyException(u));
+ }
+ acknowledgeMessage();
+ };
+ } else {
+ this.sendCallback =
+ (t, u) -> {
+ if (asyncException == null && u != null) {
+ asyncException = new Exception(u);
+ }
+ acknowledgeMessage();
+ };
+ }
+ LOG.info("end open.");
+ }
+
+
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ LOG.info("start to invoke, send pular message.");
+
+ // propagate asynchronous errors
+ checkErroneous();
+
+ byte[] serializeValue = runtimeEncoder.serialize(value);
+ String strValue = new String(serializeValue);
+ TypedMessageBuilder typedMessageBuilder = producer.newMessage();
+ typedMessageBuilder.value(serializeValue);
+ typedMessageBuilder.key(getKey(strValue));
+
+ if (flushOnCheckpoint) {
+ synchronized (pendingRecordsLock) {
+ pendingRecords++;
+ }
+ }
+
+ //异步发送
+ CompletableFuture messageIdCompletableFuture = typedMessageBuilder.sendAsync();
+ messageIdCompletableFuture.whenComplete(sendCallback);
+
+ }
+
+
+ @Override
+ public void close() throws Exception {
+
+ //采用pulsar producer复用的方式,close方法不要具体实现,否则producer会被关闭
+ LOG.error("PulsarProducerBase Class close function called");
+// closed = true;
+//
+// if (producer != null) {
+// try {
+// producer.close();
+// } catch (IOException e) {
+// LOG.warn("Exception occurs while closing Pulsar producer.", e);
+// }
+// this.producer = null;
+// }
+ checkErroneous();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+
+ if(flushOnCheckpoint){
+ synchronized (pendingRecordsLock) {
+ if (pendingRecords != 0) {
+ try {
+ LOG.info("等待notify");
+ pendingRecordsLock.wait();
+ checkErroneous();
+ flush();
+ LOG.info("等待waite之后");
+ } catch (InterruptedException e) {
+ // this can be interrupted when the Task has been cancelled.
+ // by throwing an exception, we ensure that this checkpoint doesn't get
+ // confirmed
+ throw new IllegalStateException(
+ "Flushing got interrupted while checkpointing", e);
+ }
+ }
+ }
+
+ }
+
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // nothing to do.
+ }
+
+
+ public String getKey(String strValue){
+ JSONObject jsonObject = JSONObject.parseObject(strValue);
+ String key = jsonObject.getString("key");
+ return key == null ? "" : key;
+ }
+
+ //获取Pulsar Producer
+ public Producer createProducer() throws Exception{
+ LOG.info("current pulsar version is " + PulsarVersion.getVersion());
+
+ ClientBuilder builder = PulsarClient.builder();
+ ProducerBuilder producerBuilder = builder.serviceUrl(serviceUrl)
+ .maxNumberOfRejectedRequestPerConnection(50)
+ .loadConf((Map)pulsarClientProperties)
+ .build()
+ .newProducer()
+ .topic(topic)
+ .blockIfQueueFull(Boolean.TRUE)
+ .compressionType(CompressionType.LZ4)
+ .hashingScheme(HashingScheme.JavaStringHash)
+// .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .loadConf((Map) pulsarProducerProperties);//实现配置透传功能
+ Producer producer = producerBuilder.create();
+ return producer;
+
+// return PulsarClient.builder()
+// .serviceUrl(serviceUrl)
+// .build()
+// .newProducer()
+// .loadConf((Map)properties)//实现配置透传功能
+// .topic(topic)
+// .blockIfQueueFull(Boolean.TRUE)
+// .compressionType(CompressionType.LZ4)
+// .hashingScheme(HashingScheme.JavaStringHash)
+// .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+// .create();
+ }
+
+ //获取复用的Pulsar Producer
+ public Producer createReusedProducer() throws Exception{
+ LOG.info("now create client, serviceUrl is :" + serviceUrl);
+ PulsarClientImpl client = PulsarConnectionHolder.getProducerClient(serviceUrl, pulsarClientProperties);
+
+ LOG.info("current pulsar version is " + PulsarVersion.getVersion());
+
+ LOG.info("now create producer, topic is :" + topic);
+// ProducerConfigurationData configuration = new ProducerConfigurationData();
+// configuration.setHashingScheme(HashingScheme.JavaStringHash);
+ return PulsarProducerHolder.getProducer(topic, pulsarProducerProperties, client);
+
+ }
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them. If this is set to true,
+ * then exceptions will be only logged, if set to false, exceptions will be eventually thrown
+ * and cause the streaming program to fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ /**
+ * If set to true, the Flink producer will wait for all outstanding messages in the Pulsar
+ * buffers to be acknowledged by the Pulsar producer on a checkpoint. This way, the producer can
+ * guarantee that messages in the Pulsar buffers are part of the checkpoint.
+ *
+ * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+ */
+ public void setFlushOnCheckpoint(boolean flush) {
+ this.flushOnCheckpoint = flush;
+ }
+
+
+ protected void checkErroneous() throws Exception {
+ Exception e = asyncException;
+ if (e != null) {
+ // prevent double throwing
+ asyncException = null;
+ throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
+ }
+ }
+
+ private void acknowledgeMessage() {
+ if (flushOnCheckpoint) {
+ synchronized (pendingRecordsLock) {
+ LOG.info("pendingRecords:" + pendingRecords);
+ pendingRecords--;
+ if (pendingRecords == 0) {
+ pendingRecordsLock.notifyAll();
+ LOG.info("notify完成");
+ }
+ }
+ }
+ }
+
+
+ /** Flush pending records. */
+ protected void flush() throws Exception{
+ producer.flush();
+ };
+
+
+}
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarSinkFunction_bak.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarSinkFunction_bak.java
new file mode 100644
index 0000000000000000000000000000000000000000..956f4ed0b655c35299d08d1b49d42abd8db8adbd
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/PulsarSinkFunction_bak.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.streamxhub.streamx.flink.connector.pulsar.table;
+
+import com.alibaba.fastjson.JSONObject;
+import com.streamxhub.streamx.flink.connector.pulsar.table.util.PulsarConnectionHolder;
+import com.streamxhub.streamx.flink.connector.pulsar.table.util.PulsarProducerHolder;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * The sink function for Pulsar.
+ @author DarrenDa
+ * @version 1.0
+ * @Desc:
+ */
+@Internal
+public class PulsarSinkFunction_bak extends RichSinkFunction
+ implements CheckpointedFunction{
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkFunction_bak.class);
+
+ private final String topic;
+ private final String serviceUrl;
+ private final Properties pulsarProducerProperties;
+ private final Properties pulsarClientProperties;
+ SerializationSchema runtimeEncoder;
+// private transient PulsarClient pulsarClient;
+ private transient Producer producer;
+ private transient volatile boolean closed = false;
+
+
+ public PulsarSinkFunction_bak(
+ String topic,
+ String serviceUrl,
+ Properties pulsarProducerProperties,
+ Properties pulsarClientProperties,
+ SerializationSchema runtimeEncoder
+ ) {
+ this.topic = topic;
+ this.serviceUrl=serviceUrl;
+ this.pulsarProducerProperties = pulsarProducerProperties;
+ this.pulsarClientProperties = pulsarClientProperties;
+ this.runtimeEncoder = runtimeEncoder;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ LOG.info("start open ...");
+ try {
+ RuntimeContext ctx = getRuntimeContext();
+
+ LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into (※) pulsar topic {}",
+ ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), topic);
+
+// this.producer = createProducer();
+ this.producer = createReusedProducer();
+ LOG.info("Pulsar producer has been created.");
+
+ } catch (IOException ioe) {
+ LOG.error("Exception while creating connection to Pulsar.", ioe);
+ throw new RuntimeException("Cannot create connection to Pulsar.", ioe);
+ }catch (Exception ex){
+ LOG.error("Exception while creating connection to Pulsar.", ex);
+ throw new RuntimeException("Cannot create connection to Pulsar.", ex);
+ }
+ LOG.info("end open.");
+ }
+
+
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ LOG.info("start to invoke, send pular message.");
+
+ byte[] serializeValue = runtimeEncoder.serialize(value);
+ String strValue = new String(serializeValue);
+ TypedMessageBuilder typedMessageBuilder = producer.newMessage();
+ typedMessageBuilder.value(serializeValue);
+ typedMessageBuilder.key(getKey(strValue));
+ typedMessageBuilder.send();
+ }
+
+
+ @Override
+ public void close() throws Exception {
+
+ //采用pulsar producer复用的方式,close方法不要具体实现,否则producer会被关闭
+ LOG.error("PulsarProducerBase Class close function called");
+// closed = true;
+//
+// if (producer != null) {
+// try {
+// producer.close();
+// } catch (IOException e) {
+// LOG.warn("Exception occurs while closing Pulsar producer.", e);
+// }
+// this.producer = null;
+// }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ //
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // nothing to do.
+ }
+
+
+ public String getKey(String strValue){
+ JSONObject jsonObject = JSONObject.parseObject(strValue);
+ String key = jsonObject.getString("key");
+ return key == null ? "" : key;
+ }
+
+ //获取Pulsar Producer
+ public Producer createProducer() throws Exception{
+ LOG.info("current pulsar version is " + PulsarVersion.getVersion());
+
+ ClientBuilder builder = PulsarClient.builder();
+ ProducerBuilder producerBuilder = builder.serviceUrl(serviceUrl)
+ .maxNumberOfRejectedRequestPerConnection(50)
+ .loadConf((Map)pulsarClientProperties)
+ .build()
+ .newProducer()
+ .topic(topic)
+ .blockIfQueueFull(Boolean.TRUE)
+ .compressionType(CompressionType.LZ4)
+ .hashingScheme(HashingScheme.JavaStringHash)
+// .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .loadConf((Map) pulsarProducerProperties);//实现配置透传功能
+ Producer producer = producerBuilder.create();
+ return producer;
+
+// return PulsarClient.builder()
+// .serviceUrl(serviceUrl)
+// .build()
+// .newProducer()
+// .loadConf((Map)properties)//实现配置透传功能
+// .topic(topic)
+// .blockIfQueueFull(Boolean.TRUE)
+// .compressionType(CompressionType.LZ4)
+// .hashingScheme(HashingScheme.JavaStringHash)
+// .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+// .create();
+ }
+
+ //获取复用的Pulsar Producer
+ public Producer createReusedProducer() throws Exception{
+ LOG.info("now create client, serviceUrl is :" + serviceUrl);
+ PulsarClientImpl client = PulsarConnectionHolder.getProducerClient(serviceUrl, pulsarClientProperties);
+
+ LOG.info("current pulsar version is " + PulsarVersion.getVersion());
+
+ LOG.info("now create producer, topic is :" + topic);
+// ProducerConfigurationData configuration = new ProducerConfigurationData();
+// configuration.setHashingScheme(HashingScheme.JavaStringHash);
+ return PulsarProducerHolder.getProducer(topic, pulsarProducerProperties, client);
+
+ }
+
+}
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/util/PulsarConnectionHolder.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/util/PulsarConnectionHolder.java
new file mode 100644
index 0000000000000000000000000000000000000000..74cf276fe86e0e62cc198cb6ac234ea7cbf4e725
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/util/PulsarConnectionHolder.java
@@ -0,0 +1,67 @@
+package com.streamxhub.streamx.flink.connector.pulsar.table.util;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author DarrenDa
+ * * @version 1.0
+ * * @Desc:
+ */
+public class PulsarConnectionHolder {
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarConnectionHolder.class);
+ private static final Map PULSAR_CLIENT_MAP = new ConcurrentHashMap<>();
+
+ public static PulsarClientImpl getConsumerClient(String serviceUrl, Properties properties) throws Exception {
+ return get(serviceUrl, true, properties);
+ }
+
+ public static PulsarClientImpl getProducerClient(String serviceUrl, Properties properties) throws Exception {
+ return get(serviceUrl, false, properties);
+ }
+
+ private static PulsarClientImpl get(String serviceUrl, boolean consumer, Properties properties) throws Exception {
+ synchronized (PulsarConnectionHolder.class) {
+ String pulsarClientCacheKey = getPulsarClientCacheKey(serviceUrl, consumer);
+ PulsarClientImpl pulsarClient = PULSAR_CLIENT_MAP.get(pulsarClientCacheKey);
+ if (null != pulsarClient) {
+ return pulsarClient;
+ }
+
+ // return PULSAR_CLIENT_MAP.computeIfAbsent(pulsarClientCacheKey, serviceUrlTag -> createPulsarClient(serviceUrl));
+ PulsarClientImpl pulsarClientImpl = createPulsarClient(serviceUrl, properties);
+ PulsarClientImpl newPulsarClientImpl = PULSAR_CLIENT_MAP.putIfAbsent(pulsarClientCacheKey, pulsarClientImpl);
+ if (newPulsarClientImpl == null) {
+ return pulsarClientImpl;
+ }
+ return newPulsarClientImpl;
+ }
+ }
+
+ private static String getPulsarClientCacheKey(String serviceUrl, boolean consumer) {
+ return serviceUrl + consumer;
+ }
+
+ private static PulsarClientImpl createPulsarClient(String serviceUrl, Properties properties) {
+ try {
+ LOG.info("create client, and ID is " + UUID.randomUUID() + ", and cache map size is " + PULSAR_CLIENT_MAP.size());
+
+ return (PulsarClientImpl) PulsarClient
+ .builder()
+ .serviceUrl(serviceUrl)
+ .maxNumberOfRejectedRequestPerConnection(50)
+ .loadConf((Map) properties)
+ .build();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("创建PulsarClient失败", e);
+ }
+ }
+}
diff --git a/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/util/PulsarProducerHolder.java b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/util/PulsarProducerHolder.java
new file mode 100644
index 0000000000000000000000000000000000000000..a74d166b71921ced5b6bbe3d445125e2d3de51fe
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/com/streamxhub/streamx/flink/connector/pulsar/table/util/PulsarProducerHolder.java
@@ -0,0 +1,72 @@
+package com.streamxhub.streamx.flink.connector.pulsar.table.util;
+
+import org.apache.pulsar.client.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author DarrenDa
+ * * @version 1.0
+ * * @Desc:
+ */
+public class PulsarProducerHolder {
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarProducerHolder.class);
+ private static final Map PULSAR_PRODUCER_MAP = new ConcurrentHashMap<>();
+
+ public static Producer getProducer(String defaultTopicName, Properties properties, PulsarClient client) throws Exception {
+ return get(defaultTopicName, properties, client);
+ }
+
+ private static Producer get(String defaultTopicName, Properties properties, PulsarClient client) throws Exception {
+ synchronized (PulsarProducerHolder.class) {
+ String pulsarProducerCacheKey = defaultTopicName;
+ Producer pulsarProducer = PULSAR_PRODUCER_MAP.get(pulsarProducerCacheKey);
+ LOG.info("get pulsarProducer from map result is " + pulsarProducer);
+ if (null != pulsarProducer) {
+ return pulsarProducer;
+ }
+
+ Producer producer = createPulsarProducer(defaultTopicName, properties, client);
+ Producer newPulsarProducer = PULSAR_PRODUCER_MAP.putIfAbsent(pulsarProducerCacheKey, producer);
+ if(newPulsarProducer == null) {
+ return producer;
+ }
+ return newPulsarProducer;
+ }
+ }
+
+ private static Producer createPulsarProducer(String defaultTopicName, Properties properties, PulsarClient client) {
+ try {
+ LOG.info("create producer, and ID is " + UUID.randomUUID() + ", and cache map size is " + PULSAR_PRODUCER_MAP.size());
+ LOG.info("now defaultTopicName is " + defaultTopicName + ", and map content is " + PULSAR_PRODUCER_MAP.get(defaultTopicName));
+
+ ProducerBuilder producerBuilder = client.newProducer();
+ producerBuilder.
+ blockIfQueueFull(Boolean.TRUE).
+ compressionType(CompressionType.LZ4).
+ topic(defaultTopicName).
+ hashingScheme(HashingScheme.JavaStringHash).
+// batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).
+ loadConf((Map)properties);
+ Producer producer = producerBuilder.create();
+ return producer;
+
+// return client.newProducer().
+// blockIfQueueFull(Boolean.TRUE).
+// compressionType(CompressionType.LZ4).
+// topic(defaultTopicName).
+// hashingScheme(HashingScheme.JavaStringHash).
+//// batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).
+// loadConf((Map)properties).
+// create();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("创建Producer失败", e);
+ }
+ }
+}
diff --git a/flink-connector-pulsar/src/main/resources/services/org.apache.flink.table.factories.Factory b/flink-connector-pulsar/src/main/resources/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000000000000000000000000000000000..5ef2eaa1dcaeb4b3837ed45ed89069d8523cd3eb
--- /dev/null
+++ b/flink-connector-pulsar/src/main/resources/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+com.streamxhub.streamx.flink.connector.pulsar.table.PulsarDynamicTableFactory
\ No newline at end of file
diff --git a/flink-connector-pulsar/src/test/java/com/streamxhub/streamx/flink/connector/pulsar/PulsarSqlCase.java b/flink-connector-pulsar/src/test/java/com/streamxhub/streamx/flink/connector/pulsar/PulsarSqlCase.java
new file mode 100644
index 0000000000000000000000000000000000000000..321bbc2f7eb48d19e80a9e477f263742278dfc5d
--- /dev/null
+++ b/flink-connector-pulsar/src/test/java/com/streamxhub/streamx/flink/connector/pulsar/PulsarSqlCase.java
@@ -0,0 +1,83 @@
+package com.streamxhub.streamx.flink.connector.pulsar;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.junit.Test;
+
+/**
+ * @author DarrenDa
+ * @version 1.0
+ * @Desc: Test case
+ */
+public class PulsarSqlCase {
+
+ @Test
+ public void mytest() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance()
+ .inStreamingMode()
+ .build();
+ TableEnvironment tableEnvironment = TableEnvironment.create(settings);
+
+ tableEnvironment.executeSql("" +
+ "CREATE TABLE source_pulsar_n(\n" +
+ " requestId VARCHAR,\n" +
+ " `timestamp` BIGINT,\n" +
+ " `date` VARCHAR,\n" +
+ " appId VARCHAR,\n" +
+ " appName VARCHAR,\n" +
+ " forwardTimeMs VARCHAR,\n" +
+ " processingTimeMs INT,\n" +
+ " errCode VARCHAR,\n" +
+ " userIp VARCHAR,\n" +
+ " createTime bigint,\n" +
+ " b_create_time as TO_TIMESTAMP(FROM_UNIXTIME(createTime/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss')\n" +
+ ") WITH (\n" +
+ " 'connector.type' = 'pulsar',\n" +
+ " 'connector.version' = 'universal',\n" +
+ " 'connector.topic' = 'persistent://streamx/dev/context.pulsar',\n" +
+ " 'connector.service-url' = 'pulsar://pulsar-streamx-n.stream.com:6650',\n" +
+ " 'connector.subscription-name' = 'tmp_print_detail',\n" +
+ " 'connector.subscription-type' = 'Shared',\n" +
+ " 'connector.subscription-initial-position' = 'Latest',\n" +
+ " 'update-mode' = 'append',\n" +
+ " 'format.type' = 'json',\n" +
+ " 'format.derive-schema' = 'true'\n" +
+ ")" )
+ ;
+
+ tableEnvironment.executeSql("" +
+ "create table sink_pulsar_result(\n" +
+ " requestId VARCHAR,\n" +
+ " `timestamp` BIGINT,\n" +
+ " `date` VARCHAR,\n" +
+ " appId VARCHAR,\n" +
+ " appName VARCHAR,\n" +
+ " forwardTimeMs VARCHAR,\n" +
+ " processingTimeMs INT,\n" +
+ " errCode VARCHAR,\n" +
+ " userIp VARCHAR\n" +
+ ") with (\n" +
+ " 'connector' = 'print'\n" +
+ ")");
+
+ tableEnvironment.executeSql("" +
+ "insert into sink_pulsar_result\n" +
+ "select \n" +
+ " requestId ,\n" +
+ " `timestamp`,\n" +
+ " `date`,\n" +
+ " appId,\n" +
+ " appName,\n" +
+ " forwardTimeMs,\n" +
+ " processingTimeMs,\n" +
+ " errCode,\n" +
+ " userIp,\n" +
+ " b_create_time\n" +
+ "from source_pulsar_n");
+
+ Table tb = tableEnvironment.sqlQuery("select * from sink_pulsar_result");
+
+ tb.execute().print();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 0d1ec05891f2e22f642b09b4e51e3eb4a751a9b2..1b0ef1be56f521ced97924714bea1ec6ca32c054 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,6 +14,7 @@
flink-connector-clickhouse
flink-connector-native-clickhouse
+ flink-connector-pulsar