From cd343e30748ad0e85232b7dd0f91c7c93acc3f24 Mon Sep 17 00:00:00 2001 From: xiongyu Date: Thu, 23 Nov 2023 14:32:16 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=8E=BB=E6=8E=89=E5=86=99=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AF=BB=E5=86=99=E9=94=81=202.=E7=94=A8CAS=E4=BF=9D=E6=8A=A4?= =?UTF-8?q?=E5=85=B3=E9=97=AD=E6=97=B6=EF=BC=8Cfinalizer=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E5=AE=89=E5=85=A8=203.=E4=BF=AE=E5=A4=8D=E5=8E=9F=E6=9D=A5Go?= =?UTF-8?q?=20StartWriter=E6=97=B6=E5=AD=98=E5=9C=A8=E7=9A=84=E5=A4=9A?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E7=AB=9E=E4=BA=89=E9=97=AE=E9=A2=98=EF=BC=88?= =?UTF-8?q?=E5=8F=AF=E8=83=BD=E5=AF=BC=E8=87=B4=E5=BC=80=E5=90=AF=E5=A4=9A?= =?UTF-8?q?=E4=B8=AA=E5=86=99=E5=8D=8F=E7=A8=8B=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- znet/connection.go | 89 ++++++++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/znet/connection.go b/znet/connection.go index bd0d805..c02c372 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -6,6 +6,7 @@ import ( "errors" "net" "sync" + "sync/atomic" "time" "github.com/aceld/zinx/zconf" @@ -49,9 +50,9 @@ type Connection struct { // (有缓冲管道,用于读、写两个goroutine之间的消息通信) msgBuffChan chan []byte - // Lock for user message reception and transmission - // (用户收发消息的Lock) - msgLock sync.RWMutex + // Go StartWriter Flag + // (开始初始化写协程标志) + startWriterFlag int32 // Connection properties // (链接属性) @@ -63,7 +64,7 @@ type Connection struct { // The current connection's close state // (当前连接的关闭状态) - isClosed bool + closed int32 // Which Connection Manager the current connection belongs to // (当前链接是属于哪个Connection Manager的) @@ -112,14 +113,15 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I // Initialize Conn properties c := &Connection{ - conn: conn, - connID: connID, - isClosed: false, - msgBuffChan: nil, - property: nil, - name: server.ServerName(), - localAddr: conn.LocalAddr().String(), - remoteAddr: conn.RemoteAddr().String(), + conn: conn, + connID: connID, + closed: 0, + startWriterFlag: 0, + msgBuffChan: nil, + property: nil, + name: server.ServerName(), + localAddr: conn.LocalAddr().String(), + remoteAddr: conn.RemoteAddr().String(), } lengthField := server.GetLengthField() @@ -150,7 +152,7 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection { c := &Connection{ conn: conn, connID: 0, // client ignore - isClosed: false, + closed: 0, msgBuffChan: nil, property: nil, name: client.GetName(), @@ -333,9 +335,7 @@ func (c *Connection) LocalAddr() net.Addr { } func (c *Connection) Send(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.isClosed == true { + if c.isClosed() == true { return errors.New("connection closed when send msg") } @@ -349,10 +349,8 @@ func (c *Connection) Send(data []byte) error { } func (c *Connection) SendToQueue(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.msgBuffChan == nil { + if c.msgBuffChan == nil && c.setStartWriterFlag() { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) // Start a Goroutine to write data back to the client // This method only reads data from the MsgBuffChan without allocating memory or starting a Goroutine @@ -364,7 +362,7 @@ func (c *Connection) SendToQueue(data []byte) error { idleTimeout := time.NewTimer(5 * time.Millisecond) defer idleTimeout.Stop() - if c.isClosed == true { + if c.isClosed() == true { return errors.New("Connection closed when send buff msg") } @@ -385,9 +383,7 @@ func (c *Connection) SendToQueue(data []byte) error { // SendMsg directly sends Message data to the remote TCP client. // (直接将Message数据发送数据给远程的TCP客户端) func (c *Connection) SendMsg(msgID uint32, data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.isClosed == true { + if c.isClosed() == true { return errors.New("connection closed when send msg") } @@ -408,10 +404,7 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error { } func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - - if c.msgBuffChan == nil { + if c.msgBuffChan == nil && c.setStartWriterFlag() { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) // Start a Goroutine to write data back to the client // This method only reads data from the MsgBuffChan without allocating memory or starting a Goroutine @@ -423,7 +416,7 @@ func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { idleTimeout := time.NewTimer(5 * time.Millisecond) defer idleTimeout.Stop() - if c.isClosed == true { + if c.isClosed() == true { return errors.New("Connection closed when send buff msg") } @@ -475,18 +468,20 @@ func (c *Connection) Context() context.Context { } func (c *Connection) finalizer() { - // Call the callback function registered by the user when closing the connection if it exists - // (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用) - c.callOnConnStop() - - c.msgLock.Lock() - defer c.msgLock.Unlock() - // If the connection has already been closed - if c.isClosed == true { + if c.isClosed() == true { + return + } + + //set closed + if !c.setClose() { return } + // Call the callback function registered by the user when closing the connection if it exists + // (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用) + c.callOnConnStop() + // Stop the heartbeat detector associated with the connection if c.hc != nil { c.hc.Stop() @@ -505,8 +500,6 @@ func (c *Connection) finalizer() { close(c.msgBuffChan) } - c.isClosed = true - zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID) } @@ -525,7 +518,7 @@ func (c *Connection) callOnConnStop() { } func (c *Connection) IsAlive() bool { - if c.isClosed { + if c.isClosed() { return false } // Check the last activity time of the connection. If it's beyond the heartbeat interval, @@ -557,3 +550,21 @@ func (c *Connection) GetName() string { func (c *Connection) GetMsgHandler() ziface.IMsgHandle { return c.msgHandler } + +func (c *Connection) isClosed() bool { + return atomic.LoadInt32(&c.closed) != 0 +} + +func (c *Connection) setClose() bool { + if atomic.CompareAndSwapInt32(&c.closed, 0, 1) { + return true + } + return false +} + +func (c *Connection) setStartWriterFlag() bool { + if atomic.CompareAndSwapInt32(&c.startWriterFlag, 0, 1) { + return true + } + return false +} -- Gitee