From 2b02a2f92960918aa516ddd5c55dd1645ee0ced7 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 31 May 2021 16:12:48 +0200 Subject: [PATCH] Make client message processing asynchronous. This helps in detecting clients that disconnect while a backend request for them is still active (e.g. joining a room). --- client.go | 44 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/client.go b/client.go index 3b17d39..a41ee39 100644 --- a/client.go +++ b/client.go @@ -100,7 +100,9 @@ type Client struct { mu sync.Mutex - closeChan chan bool + closeChan chan bool + messagesDone sync.WaitGroup + messageChan chan *bytes.Buffer OnLookupCountry func(*Client) string OnClosed func(*Client) @@ -118,10 +120,12 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien agent = "unknown user agent" } client := &Client{ - conn: conn, - addr: remoteAddress, - agent: agent, - closeChan: make(chan bool, 1), + conn: conn, + addr: remoteAddress, + agent: agent, + + closeChan: make(chan bool, 1), + messageChan: make(chan *bytes.Buffer, 16), OnLookupCountry: func(client *Client) string { return unknownCountry }, OnClosed: func(client *Client) {}, @@ -179,6 +183,8 @@ func (c *Client) Close() { } c.closeChan <- true + c.messagesDone.Wait() + close(c.messageChan) c.OnClosed(c) c.SetSession(nil) @@ -255,8 +261,8 @@ func (c *Client) ReadPump() { return nil }) - decodeBuffer := bufferPool.Get().(*bytes.Buffer) - defer bufferPool.Put(decodeBuffer) + go c.processMessages() + for { conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint messageType, reader, err := conn.NextReader() @@ -284,8 +290,10 @@ func (c *Client) ReadPump() { continue } + decodeBuffer := bufferPool.Get().(*bytes.Buffer) decodeBuffer.Reset() if _, err := decodeBuffer.ReadFrom(reader); err != nil { + bufferPool.Put(decodeBuffer) if session := c.GetSession(); session != nil { log.Printf("Error reading message from client %s: %v", session.PublicId(), err) } else { @@ -294,7 +302,27 @@ func (c *Client) ReadPump() { break } - c.OnMessageReceived(c, decodeBuffer.Bytes()) + // Stop processing if the client was closed. + if atomic.LoadUint32(&c.closed) == 1 { + bufferPool.Put(decodeBuffer) + break + } + + c.messagesDone.Add(1) + c.messageChan <- decodeBuffer + } +} + +func (c *Client) processMessages() { + for { + buffer := <-c.messageChan + if buffer == nil { + break + } + + c.OnMessageReceived(c, buffer.Bytes()) + c.messagesDone.Done() + bufferPool.Put(buffer) } }