diff --git a/client.go b/client.go index fce3fdd..c28bf83 100644 --- a/client.go +++ b/client.go @@ -106,7 +106,7 @@ type Client struct { mu sync.Mutex closeChan chan bool - messagesDone sync.WaitGroup + messagesDone chan bool messageChan chan *bytes.Buffer messageProcessing uint32 @@ -125,14 +125,16 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien if agent == "" { agent = "unknown user agent" } + client := &Client{ conn: conn, addr: remoteAddress, agent: agent, logRTT: true, - closeChan: make(chan bool, 1), - messageChan: make(chan *bytes.Buffer, 16), + closeChan: make(chan bool, 1), + messageChan: make(chan *bytes.Buffer, 16), + messagesDone: make(chan bool, 1), OnLookupCountry: func(client *Client) string { return unknownCountry }, OnClosed: func(client *Client) {}, @@ -207,7 +209,7 @@ func (c *Client) Close() { func (c *Client) doClose() { c.closeChan <- true - c.messagesDone.Wait() + <-c.messagesDone c.OnClosed(c) c.SetSession(nil) @@ -334,25 +336,24 @@ func (c *Client) ReadPump() { break } - c.messagesDone.Add(1) c.messageChan <- decodeBuffer } } func (c *Client) processMessages() { + atomic.StoreUint32(&c.messageProcessing, 1) for { buffer := <-c.messageChan if buffer == nil { break } - atomic.StoreUint32(&c.messageProcessing, 1) c.OnMessageReceived(c, buffer.Bytes()) - atomic.StoreUint32(&c.messageProcessing, 0) - c.messagesDone.Done() bufferPool.Put(buffer) } + atomic.StoreUint32(&c.messageProcessing, 0) + c.messagesDone <- true if atomic.LoadUint32(&c.closed) == 2 { c.doClose() }