代码拉取完成,页面将自动刷新
package messageservice
import (
"encoding/json"
"fmt"
"log"
clientclass "messageservice/client-class"
interfaceDir "messageservice/interface-dir"
messageclass "messageservice/message-class"
"net/http"
"net/url"
"os"
"os/signal"
"time"
"github.com/gorilla/websocket"
)
// 节点ID
var svrid = ""
var rpcPort = ""
// 该节点用户列表
var userList = make(map[string]interface{}, 0)
// 是否结束此用户
var userListCloseChannel = make(map[string]chan bool, 0)
var redisClient = new(RedisClient)
// --------------------------------------------------------------------------------
// websocket 客户端支持
var upgrader = websocket.Upgrader{
ReadBufferSize: 5160,
WriteBufferSize: 5160,
CheckOrigin: func(r *http.Request) bool { return true },
}
// 发送信息到某个位置,转发
func SendToUser(sendid string, recvid string, msgid string, msgtype string, Msg interface{}) bool {
if recvClient, ok := userList[recvid]; ok {
//在当前节点用户列表中
//发送信息
recvClient.(interfaceDir.Client).Send(messageclass.Message{
Id: msgid,
SendId: sendid,
Type: msgtype,
Msg: Msg,
})
} else {
//不在当前节点
var ipaddr = redisClient.Get(fmt.Sprintf(RD_USER_RPCNODE, recvid))
if ipaddr == "" {
log.Println("用户" + recvid + "信息已不存在")
return false
}
var kvmap = url.Values{}
kvmap.Add("id", msgid)
kvmap.Add("type", msgtype)
if msgtype == messageclass.MSG_SENDGROUP {
data := messageclass.GroupMsg{}
MapToStruct(Msg, data)
kvmap.Add("group", data.Group)
}
kvmap.Add("send_id", sendid)
jsonbytes, err := json.Marshal(Msg)
if err != nil {
log.Println("JSON解析错误2", err)
return false
}
kvmap.Add("msg", string(jsonbytes))
resp, err := http.PostForm("http://"+ipaddr+"/rpc", kvmap)
if err != nil {
respbody := make([]byte, 1024)
l, err1 := resp.Body.Read(respbody)
if err1 != nil {
log.Println("解析rpc的body错误", err1)
}
log.Println("http://"+ipaddr+"/rpc", kvmap, "请求RPC结果", string(respbody[:l]), "错误为", err)
}
//接收结果
// resp,err :=http.PostForm("http://"+ipaddr+"/rpc",kvmap)
// var buff = make([]byte,100)
// buff_n,err :=resp.Body.Read(buff)
// map1 := make(map[string]string,0)
// err = json.Unmarshal(buff[0:buff_n],&map1)
// if err != nil {
// log.Println("JSON解析错误3", err)
// continue
// }
// if v,ok:=map1["status"];ok{
// if v=="1" {
// }
// }
}
return true
}
func wsMsgLoop(client interfaceDir.Client, userid string) {
if wsclient, ok := client.(*clientclass.WebSocket); ok {
wsclient.Send(messageclass.Message{
Id: CreateUUID32(),
SendId: "0",
Type: messageclass.MSG_IDNOTIFY,
Msg: wsclient.Id,
})
var timeoutMax int64 = 10 * 1000
var lastTime = time.Now().Local().UnixMilli()
IPADDRPORT := GetLocalIp()[0] + ":" + rpcPort
log.Println("用户", userid, "已连接 IP->", IPADDRPORT)
redisClient.Set(fmt.Sprintf(RD_USER_RPCNODE, userid), IPADDRPORT, "")
if onConnect != nil {
if next := onConnect(client, userid); !next {
return
}
}
for {
time.Sleep(time.Millisecond)
//循环读取信息
msgtype, buff, err := wsclient.Conn.ReadMessage()
if err != nil {
// log.Println("读取信息错误", err)
if time.Now().Local().UnixMilli()-lastTime >= timeoutMax {
break
}
continue
}
lastTime = time.Now().Local().UnixMilli()
if msgtype == websocket.TextMessage {
var msg = new(messageclass.Message)
err = json.Unmarshal(buff, msg)
buff = nil //清空buff
if err != nil {
log.Println("JSON解析错误", err)
continue
}
if onMessage != nil {
if next := onMessage(client, userid, msg.Id, msg.Type, buff); !next {
continue
}
}
if msg.Type == messageclass.MSG_LEFTGROUP {
//离开组
data := messageclass.LeftGroup{}
MapToStruct(msg.Msg, &data)
if data.Group != "" {
ok := redisClient.GroupDel(fmt.Sprintf(RD_GROUP_MEMBERS, data.Group), userid)
if ok {
delete(wsclient.Group, data.Group)
}
wsclient.Send(messageclass.Message{
Id: CreateUUID(),
SendId: "0",
Type: messageclass.MSG_LEFTGROUPRESULT,
Msg: messageclass.GroupResult{
Group: data.Group,
Status: ok,
},
})
}
continue
}
if msg.Type == messageclass.MSG_JOINGROUP {
//加入组
data := messageclass.JoinGroup{}
MapToStruct(msg.Msg, &data)
if data.Group != "" {
ok := redisClient.GroupSet(fmt.Sprintf(RD_GROUP_MEMBERS, data.Group), userid)
if ok {
wsclient.Group[data.Group] = data.Group
}
wsclient.Send(messageclass.Message{
Id: CreateUUID(),
SendId: "0",
Type: messageclass.MSG_JOINGROUPRESULT,
Msg: messageclass.GroupResult{
Group: data.Group,
Status: ok,
},
})
}
continue
}
if msg.Type == messageclass.MSG_SENDGROUP {
//处理群发
data := messageclass.GroupMsg{}
MapToStruct(msg.Msg, &data)
// data.Group
list := redisClient.GroupList(fmt.Sprintf(RD_GROUP_MEMBERS, data.Group))
log.Println(data.Group, "群成员数量", len(list))
for _, recvid := range list {
SendToUser(userid, recvid, msg.Id, messageclass.MSG_RECEIVE, msg.Msg)
}
continue
}
if msg.Type == messageclass.MSG_SEND {
//处理转发
data := messageclass.RelayMsg{}
MapToStruct(msg.Msg, &data)
SendToUser(userid, data.RecvId, msg.Id, messageclass.MSG_RECEIVE, data)
continue
}
if msg.Type == messageclass.MSG_ACKNOTIFY {
data := messageclass.AckMsg{}
MapToStruct(msg.Msg, &data)
SendToUser(userid, data.RecvId, msg.Id, messageclass.MSG_ACKNOTIFY, data)
continue
}
}
}
redisClient.Del(fmt.Sprintf(RD_USER_RPCNODE, userid))
log.Println("用户", userid, "断开连接")
//结束掉当前用户连接
userListCloseChannel[userid] <- true
} else {
redisClient.Del(fmt.Sprintf(RD_USER_RPCNODE, userid))
log.Println("用户", userid, "断开连接")
//结束掉当前用户连接
userListCloseChannel[userid] <- true
}
}
var onAuth func(http.ResponseWriter, *http.Request, string) bool = nil
// 设置连入ws时验证请求是否继续
func SetOnAuth(hook func(http.ResponseWriter, *http.Request, string) bool) {
onAuth = hook
}
func ws(w http.ResponseWriter, r *http.Request) {
if stop != nil {
w.WriteHeader(404)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
id := r.URL.Query().Get("id")
if id == "" {
id = svrid + CreateUUID()
} else {
ip := redisClient.Get(fmt.Sprintf(RD_USER_RPCNODE, id))
if ip != "" {
w.Write([]byte("id不可用"))
return
}
}
if onAuth != nil {
if next := onAuth(w, r, id); !next {
log.Println("onAuth阻止用户连入ws")
return
}
}
var client = new(clientclass.WebSocket)
client.Id = id
client.Conn = conn
client.Group = make(map[string]string, 0)
userList[id] = client
userListCloseChannel[id] = make(chan bool)
conn.SetCloseHandler(func(code int, text string) error {
log.Println(id, "closehandle")
return nil
})
go wsMsgLoop(client, id)
<-userListCloseChannel[id]
if onClose != nil {
onClose(client, id)
}
if cl, ok := userList[id]; ok {
wscl := cl.(*clientclass.WebSocket)
for _, groupname := range wscl.Group {
redisClient.GroupDel(fmt.Sprintf(RD_GROUP_MEMBERS, groupname), id)
}
}
//删除已结束连接
delete(userListCloseChannel, id)
delete(userList, id)
redisClient.Del(fmt.Sprintf(RD_USER_RPCNODE, id))
log.Println("用户", id, "结束请求")
}
func server_ws(macid string, ipaddr string) {
//用户连接后生成一个唯一id返回给用户并存储一些必要信息到redis和本节点对应的一些信息
http.HandleFunc("/ws", ws)
http.ListenAndServe(ipaddr, nil)
}
// --------------------------------------------------------------------------------
// RPC转发服务
func rpc(w http.ResponseWriter, r *http.Request) {
if stop != nil {
w.WriteHeader(404)
return
}
if r.Method != "POST" {
w.WriteHeader(404)
w.Write([]byte("不支持POST以外的请求"))
return
}
err := r.ParseForm()
if err != nil {
w.WriteHeader(404)
w.Write([]byte("解析表单失败"))
return
}
id := r.PostForm.Get("id")
sendId := r.PostForm.Get("send_id")
group := r.PostForm.Get("group")
type1 := r.PostForm.Get("type")
msg := r.PostForm.Get("msg")
// log.Println("RPC收到请求", r.PostForm)
if onMessage != nil {
msgtest := messageclass.RecvIdMsgTest{}
err := json.Unmarshal([]byte(msg), &msgtest)
if err == nil {
if client, ok := userList[msgtest.RecvId]; ok {
if next := onMessage(client.(interfaceDir.Client), sendId, id, type1, []byte(msg)); !next {
return
}
} else {
//解释失败则调用onmessage时client为空
if next := onMessage(nil, sendId, id, type1, []byte(msg)); !next {
return
}
}
}
}
if type1 == messageclass.MSG_RECEIVE {
var msg1 interface{} = nil
var err error = nil
if group != "" {
//群转发,此处RPC不再执行RPC转发,否则节点互转死循环
msg1 = messageclass.GroupMsg{}
err = json.Unmarshal([]byte(msg), &msg1)
if err == nil {
list := redisClient.GroupList(msg1.(messageclass.GroupMsg).Group)
var recvid1 = ""
for _, recvid := range list {
recvid1 = redisClient.Get(recvid)
if recvid1 != "" {
if _, ok := userList[recvid1]; ok {
//只处理当前userlist中存在的user
SendToUser(sendId, recvid1, id, type1, msg1)
}
}
}
} else {
log.Println("JSON解析失败,群转发失败", id, type1, msg)
}
} else {
//单转发
var msg1 = messageclass.RelayMsg{}
err = json.Unmarshal([]byte(msg), &msg1)
if err == nil {
if client, ok := userList[msg1.RecvId]; ok {
client.(interfaceDir.Client).Send(messageclass.Message{
Id: id,
SendId: sendId,
Type: type1,
Msg: msg1,
})
w.Header().Add("Content-type", "application/json")
w.Write([]byte("{\"status\":\"1\"}"))
return
}
log.Println("用户", msg1.RecvId, "不在线")
w.Header().Add("Content-type", "application/json")
w.Write([]byte("{\"status\":\"0\"}"))
return
} else {
log.Println("JSON解析失败,转发失败", id, type1, msg)
}
}
}
if type1 == messageclass.MSG_ACKNOTIFY {
var msg1 = messageclass.AckMsg{}
err := json.Unmarshal([]byte(msg), &msg1)
if err == nil {
if client, ok := userList[msg1.RecvId]; ok {
client.(interfaceDir.Client).Send(messageclass.Message{
Id: id,
SendId: sendId,
Type: type1,
Msg: msg1,
})
w.Header().Add("Content-type", "application/json")
w.Write([]byte("{\"status\":\"1\"}"))
return
}
log.Println("用户", msg1.RecvId, "不在线")
w.Header().Add("Content-type", "application/json")
w.Write([]byte("{\"status\":\"0\"}"))
return
} else {
log.Println("JSON解析失败,转发失败", id, type1, msg)
}
}
w.Header().Add("Content-type", "application/json")
w.Write([]byte("{\"status\":\"0\"}"))
}
func server_rpc(rpcaddr string) {
http.HandleFunc("/rpc", rpc)
http.ListenAndServe(rpcaddr, nil)
}
// --------------------------------------------------------------------------------
// 收到信息,参数分别是 client userid msgid msgtype message_buff
var onMessage func(interfaceDir.Client, string, string, string, []byte) bool = nil
// 用户断开连接,还未进行key的删除,参数是 userid
var onClose func(interfaceDir.Client, string) = nil
// 用户连入,参数是 client userid
var onConnect func(interfaceDir.Client, string) bool = nil
/** 设置收到信息call,该函数执行时可能是通过RPC执行的,可阻断自带信息处理 参数分别是
* client 若是自定义信息处理 该对象可能为nil,情况:1:msgbuff中RecvId获取不到,2:当前节点已不存在该客户端
* userid 发送者
* msgid 消息id
* msgtype 消息类型
* message_buff 消息内容(仅支持字符串)
**/
func SetOnMessage(hook func(interfaceDir.Client, string, string, string, []byte) bool) {
onMessage = hook
}
// 设置用户断开call,未进行用户移出列表 参数是 client,userid
func SetOnClose(hook func(interfaceDir.Client, string)) {
onClose = hook
}
// 设置用户连入call,可阻止用户连入 参数是 client userid
func SetOnConnect(hook func(interfaceDir.Client, string) bool) {
onConnect = hook
}
// 当stop存在表示即将关闭,在处理关闭的操作
var stop os.Signal = nil
/**
* macid 节点ID
* recordaddr 记录在redis上的本节点可访问地址(不包端口)
* wsport websocket客户端访问端口
* rpcport 本节与其他节点通讯的端口(需要每个节点都是一致的并且是开放端口)
* redisaddr 127.0.0.1:6379 redis地址
*/
func Run(macid string, recordaddr string, wsport string, rpcport string, redisaddr string) {
c := make(chan os.Signal, 1)
signal.Notify(c)
// log.Println(GetLocalIp(), len(GetLocalIp()))
log.Println("监听地址:", GetLocalIp()[0]) //strings.Join(GetLocalIp(), "|")
log.Println("设备ID:", macid)
log.Println("RPC端口:", rpcport)
log.Println("WS端口:", wsport)
log.Println("recordaddr:", recordaddr)
log.Println("redisaddr:", redisaddr)
svrid = macid
rpcPort = rpcport
if recordaddr != "" {
RecordAddr = recordaddr
}
if redisaddr != "" {
RedisAddress = redisaddr
}
go server_ws(macid, "0.0.0.0:"+wsport)
go server_rpc("0.0.0.0:" + rpcport)
stop := <-c
log.Println("即将关闭服务,正在清理redis且关闭程序,请勿关机", stop)
for _, ws := range userList {
wscl := ws.(*clientclass.WebSocket)
for _, v := range wscl.Group {
redisClient.GroupDel(fmt.Sprintf(RD_GROUP_MEMBERS, v), wscl.Id)
}
redisClient.Del(fmt.Sprintf(RD_USER_RPCNODE, wscl.Id))
}
log.Println("执行完毕,等待10秒后结束程序")
time.Sleep(10 * time.Second)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。