diff --git a/znet/connection.go b/znet/connection.go index bd0d8058c9cd565794bb257be1ecdcd3e85d2286..c02c372ec27dbd063936fc2bdbcce30d360853e0 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -6,6 +6,7 @@ import ( "errors" "net" "sync" + "sync/atomic" "time" "github.com/aceld/zinx/zconf" @@ -49,9 +50,9 @@ type Connection struct { // (有缓冲管道,用于读、写两个goroutine之间的消息通信) msgBuffChan chan []byte - // Lock for user message reception and transmission - // (用户收发消息的Lock) - msgLock sync.RWMutex + // Go StartWriter Flag + // (开始初始化写协程标志) + startWriterFlag int32 // Connection properties // (链接属性) @@ -63,7 +64,7 @@ type Connection struct { // The current connection's close state // (当前连接的关闭状态) - isClosed bool + closed int32 // Which Connection Manager the current connection belongs to // (当前链接是属于哪个Connection Manager的) @@ -112,14 +113,15 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I // Initialize Conn properties c := &Connection{ - conn: conn, - connID: connID, - isClosed: false, - msgBuffChan: nil, - property: nil, - name: server.ServerName(), - localAddr: conn.LocalAddr().String(), - remoteAddr: conn.RemoteAddr().String(), + conn: conn, + connID: connID, + closed: 0, + startWriterFlag: 0, + msgBuffChan: nil, + property: nil, + name: server.ServerName(), + localAddr: conn.LocalAddr().String(), + remoteAddr: conn.RemoteAddr().String(), } lengthField := server.GetLengthField() @@ -150,7 +152,7 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection { c := &Connection{ conn: conn, connID: 0, // client ignore - isClosed: false, + closed: 0, msgBuffChan: nil, property: nil, name: client.GetName(), @@ -333,9 +335,7 @@ func (c *Connection) LocalAddr() net.Addr { } func (c *Connection) Send(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.isClosed == true { + if c.isClosed() == true { return errors.New("connection closed when send msg") } @@ -349,10 +349,8 @@ func (c *Connection) Send(data []byte) error { } func (c *Connection) SendToQueue(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.msgBuffChan == nil { + if c.msgBuffChan == nil && c.setStartWriterFlag() { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) // Start a Goroutine to write data back to the client // This method only reads data from the MsgBuffChan without allocating memory or starting a Goroutine @@ -364,7 +362,7 @@ func (c *Connection) SendToQueue(data []byte) error { idleTimeout := time.NewTimer(5 * time.Millisecond) defer idleTimeout.Stop() - if c.isClosed == true { + if c.isClosed() == true { return errors.New("Connection closed when send buff msg") } @@ -385,9 +383,7 @@ func (c *Connection) SendToQueue(data []byte) error { // SendMsg directly sends Message data to the remote TCP client. // (直接将Message数据发送数据给远程的TCP客户端) func (c *Connection) SendMsg(msgID uint32, data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.isClosed == true { + if c.isClosed() == true { return errors.New("connection closed when send msg") } @@ -408,10 +404,7 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error { } func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - - if c.msgBuffChan == nil { + if c.msgBuffChan == nil && c.setStartWriterFlag() { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) // Start a Goroutine to write data back to the client // This method only reads data from the MsgBuffChan without allocating memory or starting a Goroutine @@ -423,7 +416,7 @@ func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { idleTimeout := time.NewTimer(5 * time.Millisecond) defer idleTimeout.Stop() - if c.isClosed == true { + if c.isClosed() == true { return errors.New("Connection closed when send buff msg") } @@ -475,18 +468,20 @@ func (c *Connection) Context() context.Context { } func (c *Connection) finalizer() { - // Call the callback function registered by the user when closing the connection if it exists - // (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用) - c.callOnConnStop() - - c.msgLock.Lock() - defer c.msgLock.Unlock() - // If the connection has already been closed - if c.isClosed == true { + if c.isClosed() == true { + return + } + + //set closed + if !c.setClose() { return } + // Call the callback function registered by the user when closing the connection if it exists + // (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用) + c.callOnConnStop() + // Stop the heartbeat detector associated with the connection if c.hc != nil { c.hc.Stop() @@ -505,8 +500,6 @@ func (c *Connection) finalizer() { close(c.msgBuffChan) } - c.isClosed = true - zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID) } @@ -525,7 +518,7 @@ func (c *Connection) callOnConnStop() { } func (c *Connection) IsAlive() bool { - if c.isClosed { + if c.isClosed() { return false } // Check the last activity time of the connection. If it's beyond the heartbeat interval, @@ -557,3 +550,21 @@ func (c *Connection) GetName() string { func (c *Connection) GetMsgHandler() ziface.IMsgHandle { return c.msgHandler } + +func (c *Connection) isClosed() bool { + return atomic.LoadInt32(&c.closed) != 0 +} + +func (c *Connection) setClose() bool { + if atomic.CompareAndSwapInt32(&c.closed, 0, 1) { + return true + } + return false +} + +func (c *Connection) setStartWriterFlag() bool { + if atomic.CompareAndSwapInt32(&c.startWriterFlag, 0, 1) { + return true + } + return false +}