# go-zookeeper-demo
**Repository Path**: yemucn/go-zookeeper-demo
## Basic Information
- **Project Name**: go-zookeeper-demo
- **Description**: 基于zookeeper与go实现的分布式系统的尝试。参考源码:
https://gitee.com/ken.yang/xconsumer
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2022-03-30
- **Last Updated**: 2022-03-30
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 1. docker安装zookeeper
- 搜索zookeeper镜像
- `docker search zookeeper`
- 下载官方镜像 zookeeper
- `docker pull zookeeper`
- 容器安装
- `docker run --name myzookeeper -p 2181:2181 -it -d zookeeper`
- -p: host-port:container-port
- ```bash
yemuc@LAPTOP-FIBSOFB0 ~ ❯❯❯ docker run --name myzookeeper -p 2181:2181 -it -d zookeeper
f846fb57b2f2e9c7ad768cd1635785c27896fa2db893e72c5228c9f1ff192399
```
- 进入容器
```bash
yemuc@LAPTOP-FIBSOFB0 ~ ❯❯❯ docker exec -it myzookeeper /bin/bash
root@f846fb57b2f2:/apache-zookeeper-3.8.0-bin#
```
## 2. go mod init
```go
go mod init go-zookeeper-demo
```
## 3. 项目功能定义
- 利用zookeeper使用一个分布式系统
- 基于zk的集群;leader选举;任务分发;事件监听;http管理
- 流程步骤:
- zk集群初始化
- leader选举
- leader集群任务分配
- 分发任务
- 开始工作
- 功能流程
- 只有leader节点操作,并且leader节点和其他follower节点一样,需要进行任务工作处理。
## 4. 项目应用场景
- 分布式场景,自动任务负载
- 服务发现:通过zk实现node节点接入
- 消息队列:根据不同topic使用情况,分配不同的任务工作数量进行处理
- 分布式任务处理
## 5. coding
### 5.1 配置
`app.toml app/common/config.go app/common/log.go`
- 任务
```text
# 任务相关配置
[[task]]
name = "test01" # 任务名称
workName = "test01" # 任务执行的work
workerNum = 3 # 任务数量
```
- ConfigNew()、LogNew()、zookeeper.New()
- zookeeper.New()
```go
type Zookeeper struct {
conn *zk.Conn
connForHttp *zk.Conn
} // zookeeper 结构体
// 初始化
var z *Zookeeper
func New() {
zookeeper := new(Zookeeper)
zookeeper.conn = zookeeper.connect()
//
zookeeper.connForHttp = zookeeper.connectForHttp()
z = zookeeper
}
```
### 5.2 任务work
`/work`
- 定义 work 结构体
```go
type Work struct {
TaskName string
WorkName string
NodeName string // 结点名称
CodeNum int // 编号
Message chan string // 管道:当前运行的的信息管道
}
```
- 为了方便使用反射,约定Run+WorkName的方式,RunTestExample 执行调用方法实例
### 5.3 main.go
- app/cluster/bootstrap.go:初始引导,建立集群
- (1) init():
- 初始化数据:initData(): common.ConfigNew("app");common.LogNew();zookeeper.New()
- 初始化集群:initCluster();
集群定义:集群基于zookeeper结构体
```go
// Cluster 集群
type Cluster struct {
path string //zk路径
data []byte //zk数据
flags int32 //zk标示
acl []zk.ACL //zk权限
}
```
- 根据配置,检查集群是否存在
- 初始化任务:initTask() `app/cluster/task.go`
- 定义`TaskInfo Task` ,其中TaskInfo与配置文件中`[Task]`结点得内容需要匹配
- Task结构体和Cluster结构体类型
- 初始化集群结点:initNodes() : `app/cluster/nodes.go`
- Nodes结构体: :star2:
```go
// Nodes 集群节点
type Nodes struct {
path string //zk路径
data []byte //zk数据
flags int32 //zk标示
acl []zk.ACL //zk权限
children []string //当前拥有的子节点列表
childNum int //当前拥有的子节点数量
}
```
- 实现了leader选举的逻辑
- 根据key值判断
- 初始化节点:initNode(): `app/cluster/node.go`
```go
type Node struct {
name string //节点名字
leader bool //是否是leader
path string //zk路径
data map[string]interface{} //zk数据,保持ip地址等相关信息
flags int32 //zk标示
acl []zk.ACL //zk权限
version int32 //zk版本
}
```
- 初始任务:initNodeTask(): `app/cluster/node_task.go`
```go
type NodeTask struct {
path string //zk路径
data []byte //zk数据
flags int32 //zk标示
acl []zk.ACL //zk权限
}
```
- 设置监听:`zookeeper.Connect().GetW(nt.path)`
- **(2) cluster.Run()**
- 集群开始工作
```go
// Run 集群开始执行
// 必须要在集群成功启动没有问题后,才能启动其他服务
// 1.cluster
// 2.http
func Run() {
// leader选举
nodes.LeaderElection()
// 节点监听
go NodesWatch()
// 任务监听
go TaskWatch()
// 节点任务监听
go NodeTaskWatch()
// 开始工作
nodes.DoWork()
// 定时处理任务,防止监听未收到通知的问题
worker.doWorkTicker()
for {
// 保持程序运行,不退出
}
}
```
- worker.DoWork() :star2:
- var workOnce sync.Once : 单例模式
- init()执行:workOnce.Do(初始化 map[string]chan)
- doWorkTicker(): 根据节点的身份,会分别执行两个方法:`doLeaderWork() doFollowerWork()`
- doLeaderWorker:
- 动态计算nodes任务分配数量
- worker.go: 均分;首先获得任务列表,然后计算计算平均每个结点需要处理多少任务。使用map[string-nodeName]map[string-task.Name]int存储结果
- 分发任务,并更新计算结果到zookeeper : updateNodeTaskForZookeeper worker.go
- node_task.go :UpdateNodeTaskForZookeeper 更新变更的node task信息到zookeeper
- 获取zookeeper现有的tasklist,然后对比,有变化再更新
- ```go
zookeeper.Connect().Set(nt.path, nodeTaskString, info.Version)
```
- 开启web
```go
//startWeb 启动web
func (wer *Worker) startWeb() {
enable := viper.GetBool("http.enable")
if enable == true {
web.New()
// 开启一个协程做hhtp服务
go web.RunHttpServer()
}
}
```
- web 包:`app/web`: 实现web服务器
- doFollowerWork(): work.go
- 等待任务分配,不做任何计算或者服务的工作
### 5.4 此时,服务已经启动,下面实现web包
#### 5.4.1 web.go 总控
- var once sync.Once: Once保证了初始化工作只会进行一次
- 主要流程:
- New(): 只会执行一次
- RunHttpServer(): 暴露public接口,调用Watch()启动服务
- Watch():
- 设置路由
- 从配置文件获取监听的地址+接口
- ListenAndServe,启动Http服务
#### 5.4.2 router.go 路由
```go
func (r *Route) LoadController() {
c := Controller{}
http.HandleFunc("/", c.Index)
http.HandleFunc("/task", c.GetTask)
http.HandleFunc("/task/update", c.UpdateTask)
http.HandleFunc("/nodes", c.GetNodes)
http.HandleFunc("/node", c.GetNode)
http.HandleFunc("/node-task", c.GetNodeTask)
}
```
#### 5.4.3 Output.go 结构体
```go
// Output 输出结构体
type Output struct {
Code int
Message string
Data interface{}
ResponseWriter http.ResponseWriter
}
```
- 定义回复消息
#### 5.4.4 server.go 与节点之间的交互
- func (s *Server) GetTask() ([]map[string]interface{}, error) : 获得节点的任务信息
- 调用zookeeper.ConnectForHttp(): 返回一个可用的zookeeper链接,共http web使用
- func (c *Conn) Get(path string) ([]byte, *Stat, error) 包函数: 根据路径
- taskInfo, _, err := conn.Get(zookeeper.GetTaskPath())
- Get(path) 根据zk路径,查找节点,返回节点的信息
- zookeeper.GetTaskPath(): 返回任务的路径,任务路径的默认构造方法是/集群名字/task `data.go`
#### 5.4.4 controller.go 处理器
- "/" 路由: Index()
- 输出Welcome信息
- "/task"
- GetTask(w,*r)
- 调用server.GetTask():获得节点的任务信息
- 输出
- {"code":200,"data":[{"name":"test01","workName":"test01","workerNum":13},{"name":"test02","workName":"test02","workerNum":12},{"name":"test03","workName":"test03","workerNum":22}],"message":"ok"}
- "/task/update"
- func (c *Controller) UpdateTask(w http.ResponseWriter, r *http.Request) : 更新任务信息
- ```
//func (r *Request) PostFormValue(key string) string
data := r.PostFormValue("data")
```
- 获得传输的data后,调用server模块中的UpdateTask(data)函数,对zk中的任务结点进行更新
- (1)获得zk链接conn *zk.Conn
- (2)conn.Set(path, data, version)
- "/nodes"
- GetNodes 获取集群所有节点
- conn *zk.Conn ; conn.Chindren(path)
- {
"code": 200,
"data": [
"0000000001"
],
"message": "ok"
}
- "/node"
- GetNode 获取单个节点信息
- 需要参数nodeName
- "/node-task"
- 获得节点任务信息
- data, err := s.GetNodeTask()
- {
"code": 200,
"data": {
"0000000001": {
"test01": 13,
"test02": 12,
"test03": 22
}
},
"message": "ok"
}
### 5.5