Merge pull request #394 from strukturag/messages-done-wg

Stop using WaitGroup to detect finished message processing.
This commit is contained in:
Joachim Bauch 2023-01-17 12:24:08 +01:00 committed by GitHub
commit 7bd6fdd93f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -106,7 +106,7 @@ type Client struct {
mu sync.Mutex
closeChan chan bool
messagesDone sync.WaitGroup
messagesDone chan bool
messageChan chan *bytes.Buffer
messageProcessing uint32
@ -125,14 +125,16 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien
if agent == "" {
agent = "unknown user agent"
}
client := &Client{
conn: conn,
addr: remoteAddress,
agent: agent,
logRTT: true,
closeChan: make(chan bool, 1),
messageChan: make(chan *bytes.Buffer, 16),
closeChan: make(chan bool, 1),
messageChan: make(chan *bytes.Buffer, 16),
messagesDone: make(chan bool, 1),
OnLookupCountry: func(client *Client) string { return unknownCountry },
OnClosed: func(client *Client) {},
@ -207,7 +209,7 @@ func (c *Client) Close() {
func (c *Client) doClose() {
c.closeChan <- true
c.messagesDone.Wait()
<-c.messagesDone
c.OnClosed(c)
c.SetSession(nil)
@ -334,25 +336,24 @@ func (c *Client) ReadPump() {
break
}
c.messagesDone.Add(1)
c.messageChan <- decodeBuffer
}
}
func (c *Client) processMessages() {
atomic.StoreUint32(&c.messageProcessing, 1)
for {
buffer := <-c.messageChan
if buffer == nil {
break
}
atomic.StoreUint32(&c.messageProcessing, 1)
c.OnMessageReceived(c, buffer.Bytes())
atomic.StoreUint32(&c.messageProcessing, 0)
c.messagesDone.Done()
bufferPool.Put(buffer)
}
atomic.StoreUint32(&c.messageProcessing, 0)
c.messagesDone <- true
if atomic.LoadUint32(&c.closed) == 2 {
c.doClose()
}