1 Star 9 Fork 1

tym_hmm/rabbitmq-pool-router-path-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ProductClient.go 3.17 KB
一键复制 编辑 原始数据 按行查看 历史
天蝎儿 提交于 2022-03-12 20:55 +08:00 . 兼容连接池 增加自定义Virtual Hosts
package RabbitmqRoute
import (
"bytes"
"encoding/json"
"fmt"
kelleyRabbimqPool "gitee.com/tym_hmm/rabbitmq-pool-go"
"strings"
"sync"
)
/**
生产者客户端
*/
const (
//rabbitmq连接错误
CODE_CLIENT_CONNECTION_ERROR = 60100
//数据发送失败
CODE_CLIENT_PUBLISH_ERROR = 60101
//数据解析失败
CODE_JSON_PARSE_ERROR = 60102
)
type ProductClientApi interface {
SetMaxConnection(maxConnection int32)
SetMaxChannel(maxChannel int32)
Publish(exchangeName, exChangeType, queueName, RouteKey, routePath, data string) *RabbitMqTaskError
}
type productClient struct {
host string
port int
user string
pwd string
virtualHosts string
maxConnection int32
maxChannel int32
/**
请求头信息
*/
header map[string]interface{}
poolOnce sync.Once
poolInstance *kelleyRabbimqPool.RabbitPool
poolErr error
}
/**
初始化客户端
*/
func NewProductClient(host string, port int, user string, pwd string) ProductClientApi {
client := &productClient{host: host, port: port, user: user, pwd: pwd, virtualHosts: "/"}
return client
}
/**
初始化客户端并自定义VirtualHosts
*/
func NewProductClientVirtualHosts(host string, port int, user string, pwd string, virtualHosts string) ProductClientApi {
client := &productClient{host: host, port: port, user: user, pwd: pwd, virtualHosts: virtualHosts}
return client
}
func (p *productClient) SetMaxConnection(maxConnection int32) {
p.maxConnection = maxConnection
}
func (p *productClient) SetMaxChannel(maxChannel int32) {
p.maxChannel = maxChannel
}
/**
发送数据
@param routePath string 路由路径
@param data string json数据格式
*/
func (p *productClient) Publish(exchangeName, exChangeType, queueName, RouteKey, routePath, data string) *RabbitMqTaskError {
//组合发送数据
dataRequest := NewDataRequestPath(routePath, data)
var dataBuf bytes.Buffer
errJson := json.NewEncoder(&dataBuf).Encode(dataRequest)
if errJson != nil {
return newError(CODE_JSON_PARSE_ERROR, fmt.Sprintf("json数据解析失败: %+v", dataRequest))
}
jsonStr := dataBuf.String()
dataBuf.Reset()
p.getPool()
if p.poolErr != nil {
return newError(CODE_CLIENT_PUBLISH_ERROR, p.poolErr.Error())
}
dataRabbitmq := &kelleyRabbimqPool.RabbitMqData{
ExchangeName: exchangeName,
ExchangeType: exChangeType,
QueueName: queueName,
Route: RouteKey,
Data: jsonStr,
}
sendErr := p.poolInstance.Push(dataRabbitmq)
if sendErr != nil {
return newError(sendErr.Code, sendErr.Message)
}
return nil
}
func (p *productClient) getPool() {
p.poolOnce.Do(func() {
instancePool := kelleyRabbimqPool.NewProductPool()
if p.maxChannel <= 0 {
p.maxChannel = 25
}
if p.maxConnection <= 0 {
p.maxConnection = 5
}
instancePool.SetMaxConnection(p.maxConnection)
instancePool.SetMaxConsumeChannel(p.maxChannel)
var err error
if len(strings.TrimSpace(p.virtualHosts)) > 0 {
err = instancePool.ConnectVirtualHost(p.host, p.port, p.user, p.pwd, p.virtualHosts)
} else {
err = instancePool.Connect(p.host, p.port, p.user, p.pwd)
}
if err != nil {
p.poolErr = err
p.poolInstance = nil
} else {
p.poolErr = nil
p.poolInstance = instancePool
}
})
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/tym_hmm/rabbitmq-pool-router-path-go.git
git@gitee.com:tym_hmm/rabbitmq-pool-router-path-go.git
tym_hmm
rabbitmq-pool-router-path-go
rabbitmq-pool-router-path-go
master

搜索帮助