Stop using WaitGroup to detect finished message processing.

This causes flaky races if "Wait" and "Add" are being used from different
goroutines.
This commit is contained in:
Joachim Bauch 2023-01-17 12:09:06 +01:00
parent 2582e4ffb4
commit 8de8b39a5c
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02

View file

@ -106,7 +106,7 @@ type Client struct {
mu sync.Mutex
closeChan chan bool
messagesDone sync.WaitGroup
messagesDone chan bool
messageChan chan *bytes.Buffer
messageProcessing uint32
@ -125,14 +125,16 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien
if agent == "" {
agent = "unknown user agent"
}
client := &Client{
conn: conn,
addr: remoteAddress,
agent: agent,
logRTT: true,
closeChan: make(chan bool, 1),
messageChan: make(chan *bytes.Buffer, 16),
closeChan: make(chan bool, 1),
messageChan: make(chan *bytes.Buffer, 16),
messagesDone: make(chan bool, 1),
OnLookupCountry: func(client *Client) string { return unknownCountry },
OnClosed: func(client *Client) {},
@ -207,7 +209,7 @@ func (c *Client) Close() {
func (c *Client) doClose() {
c.closeChan <- true
c.messagesDone.Wait()
<-c.messagesDone
c.OnClosed(c)
c.SetSession(nil)
@ -334,25 +336,24 @@ func (c *Client) ReadPump() {
break
}
c.messagesDone.Add(1)
c.messageChan <- decodeBuffer
}
}
func (c *Client) processMessages() {
atomic.StoreUint32(&c.messageProcessing, 1)
for {
buffer := <-c.messageChan
if buffer == nil {
break
}
atomic.StoreUint32(&c.messageProcessing, 1)
c.OnMessageReceived(c, buffer.Bytes())
atomic.StoreUint32(&c.messageProcessing, 0)
c.messagesDone.Done()
bufferPool.Put(buffer)
}
atomic.StoreUint32(&c.messageProcessing, 0)
c.messagesDone <- true
if atomic.LoadUint32(&c.closed) == 2 {
c.doClose()
}