Make client message processing asynchronous.

This helps in detecting clients that disconnect while a backend request for
them is still active (e.g. joining a room).
This commit is contained in:
Joachim Bauch 2021-05-31 16:12:48 +02:00
parent 8f3db6da80
commit 2b02a2f929
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
1 changed files with 36 additions and 8 deletions

View File

@ -100,7 +100,9 @@ type Client struct {
mu sync.Mutex
closeChan chan bool
closeChan chan bool
messagesDone sync.WaitGroup
messageChan chan *bytes.Buffer
OnLookupCountry func(*Client) string
OnClosed func(*Client)
@ -118,10 +120,12 @@ func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Clien
agent = "unknown user agent"
}
client := &Client{
conn: conn,
addr: remoteAddress,
agent: agent,
closeChan: make(chan bool, 1),
conn: conn,
addr: remoteAddress,
agent: agent,
closeChan: make(chan bool, 1),
messageChan: make(chan *bytes.Buffer, 16),
OnLookupCountry: func(client *Client) string { return unknownCountry },
OnClosed: func(client *Client) {},
@ -179,6 +183,8 @@ func (c *Client) Close() {
}
c.closeChan <- true
c.messagesDone.Wait()
close(c.messageChan)
c.OnClosed(c)
c.SetSession(nil)
@ -255,8 +261,8 @@ func (c *Client) ReadPump() {
return nil
})
decodeBuffer := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(decodeBuffer)
go c.processMessages()
for {
conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint
messageType, reader, err := conn.NextReader()
@ -284,8 +290,10 @@ func (c *Client) ReadPump() {
continue
}
decodeBuffer := bufferPool.Get().(*bytes.Buffer)
decodeBuffer.Reset()
if _, err := decodeBuffer.ReadFrom(reader); err != nil {
bufferPool.Put(decodeBuffer)
if session := c.GetSession(); session != nil {
log.Printf("Error reading message from client %s: %v", session.PublicId(), err)
} else {
@ -294,7 +302,27 @@ func (c *Client) ReadPump() {
break
}
c.OnMessageReceived(c, decodeBuffer.Bytes())
// Stop processing if the client was closed.
if atomic.LoadUint32(&c.closed) == 1 {
bufferPool.Put(decodeBuffer)
break
}
c.messagesDone.Add(1)
c.messageChan <- decodeBuffer
}
}
func (c *Client) processMessages() {
for {
buffer := <-c.messageChan
if buffer == nil {
break
}
c.OnMessageReceived(c, buffer.Bytes())
c.messagesDone.Done()
bufferPool.Put(buffer)
}
}