2 Star 5 Fork 2

Beta版厨子3.0®/kiteq-client-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
kite_registry_watcher.go 3.75 KB
一键复制 编辑 原始数据 按行查看 历史
package client
import (
"context"
"errors"
"github.com/blackbeans/kiteq-common/protocol"
"github.com/blackbeans/kiteq-common/registry"
log "github.com/blackbeans/log4go"
"github.com/blackbeans/turbo"
"strings"
)
func (self *kite) NodeChange(path string, eventType registry.RegistryEvent, children []string) {
//如果是订阅关系变更则处理
if strings.HasPrefix(path, registry.KITEQ_SERVER) {
//获取topic
split := strings.Split(path, "/")
if len(split) < 4 {
//不合法的订阅璐姐
log.WarnLog("kite", "kite|ChildWatcher|INVALID SERVER PATH |%s|%t", path, children)
return
}
//获取topic
topic := split[3]
log.WarnLog("kite", "kite|ChildWatcher|Change|%s|%v|%+v", path, children, eventType)
//search topic
for _, t := range self.topics {
if t == topic {
self.onQServerChanged(topic, children)
break
}
}
}
}
//当触发QServer地址发生变更
func (self *kite) onQServerChanged(topic string, hosts []string) {
//重建一下topic下的kiteclient
addresses := make([]string, 0, 10)
for _, host := range hosts {
//如果能查到remoteClient 则直接复用
newHost := host
newFutureTask := turbo.NewFutureTask(func(ctx context.Context) (interface{}, error) {
return self.onTClientInit(newHost)
})
_, loaded := self.addressToTClient.LoadOrStore(host, newFutureTask)
//不存在这个任务,那么使用的是创建的这个任务
if !loaded {
//执行运行一下
newFutureTask.Run(self.ctx)
}
addresses = append(addresses, host)
}
log.InfoLog("kite", "kite|onQServerChanged|SUCC|%s|%s", topic, hosts)
//替换掉线的server
_, loaded := self.topicToAddress.LoadOrStore(topic, addresses)
if loaded {
//放入新的地址列表
self.topicToAddress.Store(topic, addresses)
} else {
//说明没有旧的
}
//目前使用的链接地址
usingAddr := make(map[string]interface{}, 10)
self.topicToAddress.Range(func(key, value interface{}) bool {
for _, addr := range value.([]string) {
usingAddr[addr] = nil
}
return true
})
dels := make([]string, 0, 2)
self.addressToTClient.Range(func(key, value interface{}) bool {
//如果所有的topic都不再使用这个kiteio地址,那么则进行移除
if _, ok := usingAddr[key.(string)]; !ok {
dels = append(dels, key.(string))
}
return true
})
//需要删掉已经废弃的连接
if len(dels) > 0 {
for _, del := range dels {
self.addressToTClient.Delete(del)
self.clientManager.DeleteClients(del)
}
log.InfoLog("kite", "kite|onQServerChanged.RemoveUnusedAddr|%s|%s", topic, dels)
}
}
//创建kiteio
func (self *kite) onTClientInit(host string) (*turbo.TClient, error) {
//优先从clientmanager中获取,不存在则创建开启
remoteClient := self.clientManager.FindTClient(host)
if nil == remoteClient {
//这里就新建一个remote客户端连接
conn, err := dial(host)
if nil != err {
log.ErrorLog("kite", "kite|onTClientInit|Create REMOTE CLIENT|FAIL|%s|%s", err, host)
return nil, err
}
remoteClient = turbo.NewTClient(self.ctx, conn, func() turbo.ICodec {
return protocol.KiteQBytesCodec{
MaxFrameLength: turbo.MAX_PACKET_BYTES}
}, self.fire, self.config)
remoteClient.Start()
auth, err := handshake(self.ga, remoteClient)
if !auth || nil != err {
remoteClient.Shutdown()
log.ErrorLog("kite", "kite|onTClientInit|HANDSHAKE|FAIL|%s|%s", err, auth)
return nil, errors.New("onTClientInit FAIL ")
}
//授权
self.clientManager.Auth(self.ga, remoteClient)
}
return remoteClient, nil
}
func (self *kite) DataChange(path string, binds []*registry.Binding) {
//IGNORE
log.InfoLog("kite", "kite|DataChange|%s|%s", path, binds)
}
func (self *kite) OnSessionExpired() {
//推送订阅关系和topics
self.Start()
log.InfoLog("kite", "kite|OnSessionExpired|Restart...")
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/blackbeans/kiteq-client-go.git
git@gitee.com:blackbeans/kiteq-client-go.git
blackbeans
kiteq-client-go
kiteq-client-go
master

搜索帮助