# flink-connector-mqtt
**Repository Path**: davidfantasy/flink-connector-mqtt
## Basic Information
- **Project Name**: flink-connector-mqtt
- **Description**: 基于flink最新的[FLIP-27]架构对MQTT connector的实现
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 4
- **Forks**: 2
- **Created**: 2023-08-21
- **Last Updated**: 2025-01-20
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## flink-connector-mqtt
基于flink最新的[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)架构对MQTT
connector的实现,主要特性如下:
- 兼容最新的flink版本(1.17.1)
- 支持多个topic同时读取数据,并基于topic进行自动分片
- 使用了高性能的MQTT客户端(hivemq-mqtt)
- 支持以flink sql的方式查询
## 依赖说明
- 必须使用JDK 17及以上版本
- 目前暂时只支持MQTT 3协议,后续会支持MQTT 5
- flink版本支持1.17.1及以上版本
## 使用方法
1. 引入依赖
```xml
com.github.davidfantasy
flink-connector-mqtt
1.1.0
```
2. 示例代码:
作为流式数据源使用:
```java
public class MqttSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
MqttProperties mqttProp = new MqttProperties();
mqttProp.setHost("broker-cn.emqx.io");
mqttProp.setPort(1883);
// mqttProp.setUsername("");
// mqttProp.setPassword("");
List topics = new ArrayList<>();
topics.add(new MqttTopic("/flink-connector/mqtt/source/test", 0));
var source = env.fromSource(new MqttSource(mqttProp, topics), WatermarkStrategy.noWatermarks(), "Mqtt Source");
source.map(v -> {
var msg = (MqttMessage) v;
return msg.getTopic() + ":" + new String(msg.getPayload());
}).print();
env.execute("MQTT Source Test");
}
}
```
在flink sql中创建表:
```sql
CREATE TABLE mqttTest (
id INTEGER,
code STRING
) WITH (
'connector' = 'mqtt',
'server' = 'broker-cn.emqx.io',
'port' = '1883',
'topic' = '/flink-connector/mqtt/source/test'
)
```
**注意**:使用flink sql时,mqtt的消息格式必须为JSON格式,上述表结构对应的json格式为:
```json
{"id":123,"code":"some hello"}
```
目前在table中可以使用的配置为:
- connector: 固定为mqtt
- server: mqtt broker host,必须
- port: mqtt broker port,必须
- username: 认证用户名,可选
- password: 认证密码,可选
- topic: 该表对应的MQTT topic,必须
- qos: 使用什么质量等级进行订阅,可选,默认0