# go-zookeeper-demo **Repository Path**: yemucn/go-zookeeper-demo ## Basic Information - **Project Name**: go-zookeeper-demo - **Description**: 基于zookeeper与go实现的分布式系统的尝试。参考源码: https://gitee.com/ken.yang/xconsumer - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-03-30 - **Last Updated**: 2022-03-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 1. docker安装zookeeper - 搜索zookeeper镜像 - `docker search zookeeper` - 下载官方镜像 zookeeper - `docker pull zookeeper` - 容器安装 - `docker run --name myzookeeper -p 2181:2181 -it -d zookeeper` - -p: host-port:container-port - ```bash yemuc@LAPTOP-FIBSOFB0 ~ ❯❯❯ docker run --name myzookeeper -p 2181:2181 -it -d zookeeper f846fb57b2f2e9c7ad768cd1635785c27896fa2db893e72c5228c9f1ff192399 ``` - 进入容器 ```bash yemuc@LAPTOP-FIBSOFB0 ~ ❯❯❯ docker exec -it myzookeeper /bin/bash root@f846fb57b2f2:/apache-zookeeper-3.8.0-bin# ``` ## 2. go mod init ```go go mod init go-zookeeper-demo ``` ## 3. 项目功能定义 - 利用zookeeper使用一个分布式系统 - 基于zk的集群;leader选举;任务分发;事件监听;http管理 - 流程步骤: - zk集群初始化 - leader选举 - leader集群任务分配 - 分发任务 - 开始工作 img.png - 功能流程 img.png - 只有leader节点操作,并且leader节点和其他follower节点一样,需要进行任务工作处理。 ## 4. 项目应用场景 - 分布式场景,自动任务负载 - 服务发现:通过zk实现node节点接入 - 消息队列:根据不同topic使用情况,分配不同的任务工作数量进行处理 - 分布式任务处理 ## 5. coding ### 5.1 配置 `app.toml app/common/config.go app/common/log.go` - 任务 ```text # 任务相关配置 [[task]] name = "test01" # 任务名称 workName = "test01" # 任务执行的work workerNum = 3 # 任务数量 ``` - ConfigNew()、LogNew()、zookeeper.New() - zookeeper.New() ```go type Zookeeper struct { conn *zk.Conn connForHttp *zk.Conn } // zookeeper 结构体 // 初始化 var z *Zookeeper func New() { zookeeper := new(Zookeeper) zookeeper.conn = zookeeper.connect() // zookeeper.connForHttp = zookeeper.connectForHttp() z = zookeeper } ``` ### 5.2 任务work `/work` - 定义 work 结构体 ```go type Work struct { TaskName string WorkName string NodeName string // 结点名称 CodeNum int // 编号 Message chan string // 管道:当前运行的的信息管道 } ``` - 为了方便使用反射,约定Run+WorkName的方式,RunTestExample 执行调用方法实例 ### 5.3 main.go - app/cluster/bootstrap.go:初始引导,建立集群 - (1) init(): - 初始化数据:initData(): common.ConfigNew("app");common.LogNew();zookeeper.New() - 初始化集群:initCluster(); 集群定义:集群基于zookeeper结构体 ```go // Cluster 集群 type Cluster struct { path string //zk路径 data []byte //zk数据 flags int32 //zk标示 acl []zk.ACL //zk权限 } ``` - 根据配置,检查集群是否存在 - 初始化任务:initTask() `app/cluster/task.go` - 定义`TaskInfo Task` ,其中TaskInfo与配置文件中`[Task]`结点得内容需要匹配 - Task结构体和Cluster结构体类型 - 初始化集群结点:initNodes() : `app/cluster/nodes.go` - Nodes结构体: :star2: ```go // Nodes 集群节点 type Nodes struct { path string //zk路径 data []byte //zk数据 flags int32 //zk标示 acl []zk.ACL //zk权限 children []string //当前拥有的子节点列表 childNum int //当前拥有的子节点数量 } ``` - 实现了leader选举的逻辑 - 根据key值判断 - 初始化节点:initNode(): `app/cluster/node.go` ```go type Node struct { name string //节点名字 leader bool //是否是leader path string //zk路径 data map[string]interface{} //zk数据,保持ip地址等相关信息 flags int32 //zk标示 acl []zk.ACL //zk权限 version int32 //zk版本 } ``` - 初始任务:initNodeTask(): `app/cluster/node_task.go` ```go type NodeTask struct { path string //zk路径 data []byte //zk数据 flags int32 //zk标示 acl []zk.ACL //zk权限 } ``` - 设置监听:`zookeeper.Connect().GetW(nt.path)` - **(2) cluster.Run()** - 集群开始工作 ```go // Run 集群开始执行 // 必须要在集群成功启动没有问题后,才能启动其他服务 // 1.cluster // 2.http func Run() { // leader选举 nodes.LeaderElection() // 节点监听 go NodesWatch() // 任务监听 go TaskWatch() // 节点任务监听 go NodeTaskWatch() // 开始工作 nodes.DoWork() // 定时处理任务,防止监听未收到通知的问题 worker.doWorkTicker() for { // 保持程序运行,不退出 } } ``` - worker.DoWork() :star2: - var workOnce sync.Once : 单例模式 - init()执行:workOnce.Do(初始化 map[string]chan) - doWorkTicker(): 根据节点的身份,会分别执行两个方法:`doLeaderWork() doFollowerWork()` - doLeaderWorker: - 动态计算nodes任务分配数量 - worker.go: 均分;首先获得任务列表,然后计算计算平均每个结点需要处理多少任务。使用map[string-nodeName]map[string-task.Name]int存储结果 - 分发任务,并更新计算结果到zookeeper : updateNodeTaskForZookeeper worker.go - node_task.go :UpdateNodeTaskForZookeeper 更新变更的node task信息到zookeeper - 获取zookeeper现有的tasklist,然后对比,有变化再更新 - ```go zookeeper.Connect().Set(nt.path, nodeTaskString, info.Version) ``` - 开启web ```go //startWeb 启动web func (wer *Worker) startWeb() { enable := viper.GetBool("http.enable") if enable == true { web.New() // 开启一个协程做hhtp服务 go web.RunHttpServer() } } ``` - web 包:`app/web`: 实现web服务器 - doFollowerWork(): work.go - 等待任务分配,不做任何计算或者服务的工作 ### 5.4 此时,服务已经启动,下面实现web包 #### 5.4.1 web.go 总控 - var once sync.Once: Once保证了初始化工作只会进行一次 - 主要流程: - New(): 只会执行一次 - RunHttpServer(): 暴露public接口,调用Watch()启动服务 - Watch(): - 设置路由 - 从配置文件获取监听的地址+接口 - ListenAndServe,启动Http服务 #### 5.4.2 router.go 路由 ```go func (r *Route) LoadController() { c := Controller{} http.HandleFunc("/", c.Index) http.HandleFunc("/task", c.GetTask) http.HandleFunc("/task/update", c.UpdateTask) http.HandleFunc("/nodes", c.GetNodes) http.HandleFunc("/node", c.GetNode) http.HandleFunc("/node-task", c.GetNodeTask) } ``` #### 5.4.3 Output.go 结构体 ```go // Output 输出结构体 type Output struct { Code int Message string Data interface{} ResponseWriter http.ResponseWriter } ``` - 定义回复消息 #### 5.4.4 server.go 与节点之间的交互 - func (s *Server) GetTask() ([]map[string]interface{}, error) : 获得节点的任务信息 - 调用zookeeper.ConnectForHttp(): 返回一个可用的zookeeper链接,共http web使用 - func (c *Conn) Get(path string) ([]byte, *Stat, error) 包函数: 根据路径 - taskInfo, _, err := conn.Get(zookeeper.GetTaskPath()) - Get(path) 根据zk路径,查找节点,返回节点的信息 - zookeeper.GetTaskPath(): 返回任务的路径,任务路径的默认构造方法是/集群名字/task `data.go` #### 5.4.4 controller.go 处理器 - "/" 路由: Index() - 输出Welcome信息 - "/task" - GetTask(w,*r) - 调用server.GetTask():获得节点的任务信息 - 输出 - {"code":200,"data":[{"name":"test01","workName":"test01","workerNum":13},{"name":"test02","workName":"test02","workerNum":12},{"name":"test03","workName":"test03","workerNum":22}],"message":"ok"} - "/task/update" - func (c *Controller) UpdateTask(w http.ResponseWriter, r *http.Request) : 更新任务信息 - ``` //func (r *Request) PostFormValue(key string) string data := r.PostFormValue("data") ``` - 获得传输的data后,调用server模块中的UpdateTask(data)函数,对zk中的任务结点进行更新 - (1)获得zk链接conn *zk.Conn - (2)conn.Set(path, data, version) - "/nodes" - GetNodes 获取集群所有节点 - conn *zk.Conn ; conn.Chindren(path) - { "code": 200, "data": [ ​ "0000000001" ], "message": "ok" } - "/node" - GetNode 获取单个节点信息 - 需要参数nodeName - "/node-task" - 获得节点任务信息 - data, err := s.GetNodeTask() - { "code": 200, "data": { ​ "0000000001": { ​ "test01": 13, ​ "test02": 12, ​ "test03": 22 ​ } }, "message": "ok" } ### 5.5