# gomqtt **Repository Path**: xcode-zone/gomqtt ## Basic Information - **Project Name**: gomqtt - **Description**: MQTT Client For Golang - **Primary Language**: Go - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2020-04-16 - **Last Updated**: 2025-09-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # GoMQTT - 高性能MQTT客户端库 GoMQTT是一个基于Go语言开发的高性能MQTT客户端库,提供了连接池管理、异步发布、批量发布、智能订阅等高级功能,适用于物联网、消息队列等场景。 ## ✨ 特性 - 🔄 **连接池管理** - 支持多连接复用,提高并发性能 - ⚡ **异步发布** - 非阻塞消息发布,提升吞吐量 - 📦 **批量发布** - 批量消息发送,减少网络开销 - 🔔 **智能订阅** - 自动重试和重连订阅机制 - 🔒 **SSL/TLS支持** - 安全的加密通信 - ⏱️ **超时控制** - 完善的超时和错误处理机制 - 📊 **性能优化** - 连接配置优化,支持高并发场景 ## 🚀 快速开始 ### 安装 ```bash go get gitee.com/xcode-zone/gomqtt ``` ### 基本使用示例 ```go package main import ( "fmt" "log" "time" "gitee.com/xcode-zone/gomqtt" mqtt "github.com/eclipse/paho.mqtt.golang" ) func main() { // 配置MQTT连接 config := &gomqtt.Config{ Broker: "tcp://127.0.0.1:1883", Username: "user", Password: "pass", ClientID: "test_client", } // 获取客户端选项 opts, err := gomqtt.GetClientOptions(config) if err != nil { log.Fatal(err) } // 初始化客户端 err = gomqtt.Init(opts) if err != nil { log.Fatal(err) } // 订阅消息 gomqtt.Subscribe(gomqtt.SubscribeType{ Topic: "test/topic", Qos: 1, Callback: func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("收到消息: %s\n", string(msg.Payload())) }, }) // 发布消息 err = gomqtt.Publish("test/topic", "Hello MQTT", 1, false) if err != nil { log.Fatal(err) } // 等待消息处理 time.Sleep(2 * time.Second) } ``` ## 📚 API文档 ### 配置结构体 #### Config ```go type Config struct { Broker string // Broker地址,例如 tcp://127.0.0.1:1883 或 ssl://127.0.0.1:8883 Username string // 用户名,可选 Password string // 密码,可选 CACert string // CA证书路径,SSL连接时使用 ClientCert string // 客户端证书路径,双向认证时使用 ClientKey string // 客户端私钥路径,双向认证时使用 ClientID string // 客户端ID,可选(自动生成) } ``` ### 核心函数 #### GetClientOptions ```go func GetClientOptions(conf *Config) (*mqtt.ClientOptions, error) ``` 根据配置创建MQTT客户端选项,包含性能优化设置。 #### Init ```go func Init(opts *mqtt.ClientOptions) error ``` 初始化MQTT客户端连接。 ### 发布消息接口 #### Publish ```go func Publish(topic string, payload interface{}, qos byte, retained bool) error ``` 同步发布消息,默认5秒超时。 #### PublishWithTimeout ```go func PublishWithTimeout(topic string, payload interface{}, qos byte, retained bool, timeout time.Duration) error ``` 带自定义超时的同步发布消息。 #### PublishAsync ```go func PublishAsync(topic string, payload interface{}, qos byte, retained bool) ``` 异步发布消息,不等待确认。 ### 批量发布器 #### BatchPublisher ```go type BatchPublisher struct { // 内部消息队列 } func NewBatchPublisher() *BatchPublisher func (bp *BatchPublisher) AddMessage(topic string, payload interface{}, qos byte, retained bool) func (bp *BatchPublisher) PublishBatch() error func (bp *BatchPublisher) PublishBatchAsync() ``` 使用示例: ```go bp := gomqtt.NewBatchPublisher() bp.AddMessage("topic1", "message1", 1, false) bp.AddMessage("topic2", "message2", 1, false) err := bp.PublishBatch() // 同步批量发布 // 或 bp.PublishBatchAsync() // 异步批量发布 ``` ### 订阅消息 #### SubscribeType ```go type SubscribeType struct { Topic string Qos byte Callback mqtt.MessageHandler RetryTimes int // 重试次数,0表示无限重试 } ``` #### Subscribe ```go func Subscribe(item SubscribeType) ``` 注册消息订阅。 #### ResubscribeAll ```go func ResubscribeAll() ``` 重新订阅所有已注册的主题。 ### 连接管理 #### GetClient ```go func GetClient(opts *mqtt.ClientOptions) (mqtt.Client, error) ``` 获取MQTT客户端(支持连接池)。 #### CloseClient ```go func CloseClient(clientID string) ``` 关闭指定客户端连接。 #### CloseAllClients ```go func CloseAllClients() ``` 关闭所有客户端连接。 ## 🔧 高级用法 ### SSL/TLS连接示例 ```go config := &gomqtt.Config{ Broker: "ssl://mqtt.example.com:8883", CACert: "/path/to/ca.crt", ClientCert: "/path/to/client.crt", // 可选,双向认证 ClientKey: "/path/to/client.key", // 可选,双向认证 } ``` ### 异步发布示例 ```go // 高性能场景使用异步发布 for i := 0; i < 1000; i++ { gomqtt.PublishAsync("sensor/data", fmt.Sprintf("data-%d", i), 0, false) } ``` ### 批量发布示例 ```go bp := gomqtt.NewBatchPublisher() // 收集一批消息 for i := 0; i < 100; i++ { bp.AddMessage("batch/topic", fmt.Sprintf("message-%d", i), 1, false) } // 一次性批量发布 err := bp.PublishBatch() if err != nil { log.Printf("批量发布失败: %v", err) } ``` ### 连接池使用示例 ```go // 多个goroutine共享连接池 for i := 0; i < 10; i++ { go func(id int) { config := &gomqtt.Config{ Broker: "tcp://127.0.0.1:1883", ClientID: fmt.Sprintf("client_%d", id), } opts, _ := gomqtt.GetClientOptions(config) client, _ := gomqtt.GetClient(opts) // 复用连接 // 使用client进行发布/订阅操作 }(i) } ``` ## 🚨 错误处理 ### 超时错误 ```go err := gomqtt.PublishWithTimeout("topic", "data", 1, false, 10*time.Second) if err != nil { if errors.Is(err, gomqtt.ErrTimeout) { log.Println("发布超时") } else { log.Printf("发布失败: %v", err) } } ``` ### 连接错误处理 ```go opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { log.Printf("连接丢失: %v", err) // 自动重连逻辑 }) ``` ## 📊 性能基准测试 ### 发布性能测试 ```go func BenchmarkPublish(b *testing.B) { // 初始化客户端... b.ResetTimer() for i := 0; i < b.N; i++ { gomqtt.PublishAsync("benchmark", "test", 0, false) } } ``` ### 批量发布性能测试 ```go func BenchmarkBatchPublish(b *testing.B) { // 初始化客户端... b.ResetTimer() for i := 0; i < b.N; i++ { bp := gomqtt.NewBatchPublisher() for j := 0; j < 100; j++ { bp.AddMessage("batch", "test", 0, false) } bp.PublishBatchAsync() } } ``` ## 🔒 安全最佳实践 1. **使用SSL/TLS加密**:生产环境务必使用加密连接 2. **认证信息保护**:避免硬编码用户名密码 3. **客户端ID管理**:使用有意义的客户端ID便于监控 4. **连接超时设置**:合理设置连接和发布超时时间 5. **错误日志记录**:完善的错误处理和日志记录 ## 🤝 贡献 欢迎提交Issue和Pull Request来改进这个项目。 ## 📄 许可证 本项目基于MIT许可证开源,详见LICENSE文件。 ## 📞 支持 如有问题请提交Issue或联系维护者。