From 8de8b39a5cf9fbbd2a94e284c1b8323eb666fb40 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 17 Jan 2023 12:09:06 +0100 Subject: [PATCH] Stop using WaitGroup to detect finished message processing. This causes flaky races if "Wait" and "Add" are being used from different goroutines. --- client.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/client.go b/client.go index fce3fdd..c28bf83 100644 --- a/client.go +++ b/client.go @@ -106,7 +106,7 @@ type Client struct { mu sync.Mutex closeChan chan bool - messagesDone sync.WaitGroup + messagesDone chan bool messageChan chan *bytes.Buffer messageProcessing uint32 @@ -125,14 +125,16 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien if agent == "" { agent = "unknown user agent" } + client := &Client{ conn: conn, addr: remoteAddress, agent: agent, logRTT: true, - closeChan: make(chan bool, 1), - messageChan: make(chan *bytes.Buffer, 16), + closeChan: make(chan bool, 1), + messageChan: make(chan *bytes.Buffer, 16), + messagesDone: make(chan bool, 1), OnLookupCountry: func(client *Client) string { return unknownCountry }, OnClosed: func(client *Client) {}, @@ -207,7 +209,7 @@ func (c *Client) Close() { func (c *Client) doClose() { c.closeChan <- true - c.messagesDone.Wait() + <-c.messagesDone c.OnClosed(c) c.SetSession(nil) @@ -334,25 +336,24 @@ func (c *Client) ReadPump() { break } - c.messagesDone.Add(1) c.messageChan <- decodeBuffer } } func (c *Client) processMessages() { + atomic.StoreUint32(&c.messageProcessing, 1) for { buffer := <-c.messageChan if buffer == nil { break } - atomic.StoreUint32(&c.messageProcessing, 1) c.OnMessageReceived(c, buffer.Bytes()) - atomic.StoreUint32(&c.messageProcessing, 0) - c.messagesDone.Done() bufferPool.Put(buffer) } + atomic.StoreUint32(&c.messageProcessing, 0) + c.messagesDone <- true if atomic.LoadUint32(&c.closed) == 2 { c.doClose() }