代码拉取完成,页面将自动刷新
package RabbitmqRoute
import (
"bytes"
"encoding/json"
"fmt"
kelleyRabbimqPool "gitee.com/tym_hmm/rabbitmq-pool-go"
"strings"
"sync"
)
/**
生产者客户端
*/
const (
//rabbitmq连接错误
CODE_CLIENT_CONNECTION_ERROR = 60100
//数据发送失败
CODE_CLIENT_PUBLISH_ERROR = 60101
//数据解析失败
CODE_JSON_PARSE_ERROR = 60102
)
type ProductClientApi interface {
SetMaxConnection(maxConnection int32)
SetMaxChannel(maxChannel int32)
Publish(exchangeName, exChangeType, queueName, RouteKey, routePath, data string) *RabbitMqTaskError
}
type productClient struct {
host string
port int
user string
pwd string
virtualHosts string
maxConnection int32
maxChannel int32
/**
请求头信息
*/
header map[string]interface{}
poolOnce sync.Once
poolInstance *kelleyRabbimqPool.RabbitPool
poolErr error
}
/**
初始化客户端
*/
func NewProductClient(host string, port int, user string, pwd string) ProductClientApi {
client := &productClient{host: host, port: port, user: user, pwd: pwd, virtualHosts: "/"}
return client
}
/**
初始化客户端并自定义VirtualHosts
*/
func NewProductClientVirtualHosts(host string, port int, user string, pwd string, virtualHosts string) ProductClientApi {
client := &productClient{host: host, port: port, user: user, pwd: pwd, virtualHosts: virtualHosts}
return client
}
func (p *productClient) SetMaxConnection(maxConnection int32) {
p.maxConnection = maxConnection
}
func (p *productClient) SetMaxChannel(maxChannel int32) {
p.maxChannel = maxChannel
}
/**
发送数据
@param routePath string 路由路径
@param data string json数据格式
*/
func (p *productClient) Publish(exchangeName, exChangeType, queueName, RouteKey, routePath, data string) *RabbitMqTaskError {
//组合发送数据
dataRequest := NewDataRequestPath(routePath, data)
var dataBuf bytes.Buffer
errJson := json.NewEncoder(&dataBuf).Encode(dataRequest)
if errJson != nil {
return newError(CODE_JSON_PARSE_ERROR, fmt.Sprintf("json数据解析失败: %+v", dataRequest))
}
jsonStr := dataBuf.String()
dataBuf.Reset()
p.getPool()
if p.poolErr != nil {
return newError(CODE_CLIENT_PUBLISH_ERROR, p.poolErr.Error())
}
dataRabbitmq := &kelleyRabbimqPool.RabbitMqData{
ExchangeName: exchangeName,
ExchangeType: exChangeType,
QueueName: queueName,
Route: RouteKey,
Data: jsonStr,
}
sendErr := p.poolInstance.Push(dataRabbitmq)
if sendErr != nil {
return newError(sendErr.Code, sendErr.Message)
}
return nil
}
func (p *productClient) getPool() {
p.poolOnce.Do(func() {
instancePool := kelleyRabbimqPool.NewProductPool()
if p.maxChannel <= 0 {
p.maxChannel = 25
}
if p.maxConnection <= 0 {
p.maxConnection = 5
}
instancePool.SetMaxConnection(p.maxConnection)
instancePool.SetMaxConsumeChannel(p.maxChannel)
var err error
if len(strings.TrimSpace(p.virtualHosts)) > 0 {
err = instancePool.ConnectVirtualHost(p.host, p.port, p.user, p.pwd, p.virtualHosts)
} else {
err = instancePool.Connect(p.host, p.port, p.user, p.pwd)
}
if err != nil {
p.poolErr = err
p.poolInstance = nil
} else {
p.poolErr = nil
p.poolInstance = instancePool
}
})
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。