From e6f6f0a8464d603228460f75742b4c3ef581cd36 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 21 Oct 2020 14:34:53 +0200 Subject: [PATCH] Check combining while building pending messages instead of while sending out. --- src/signaling/clientsession.go | 64 ++++++++++++++++------------------ 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/src/signaling/clientsession.go b/src/signaling/clientsession.go index ed4c001..5c3cdf4 100644 --- a/src/signaling/clientsession.go +++ b/src/signaling/clientsession.go @@ -79,7 +79,9 @@ type ClientSession struct { publishers map[string]McuPublisher subscribers map[string]McuSubscriber - pendingClientMessages []*ServerMessage + pendingClientMessages []*ServerMessage + hasPendingChat bool + hasPendingParticipantsUpdate bool } func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { @@ -511,6 +513,16 @@ func (s *ClientSession) SendMessage(message *ServerMessage) bool { return s.sendMessageUnlocked(message) } +func (s *ClientSession) SendMessages(messages []*ServerMessage) bool { + s.mu.Lock() + defer s.mu.Unlock() + + for _, message := range messages { + s.sendMessageUnlocked(message) + } + return true +} + func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { s.mu.Lock() defer s.mu.Unlock() @@ -676,6 +688,17 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) { } func (s *ClientSession) storePendingMessage(message *ServerMessage) { + if message.IsChatRefresh() { + if s.hasPendingChat { + // Only send a single "chat-refresh" message on resume. + return + } + + s.hasPendingChat = true + } + if !s.hasPendingParticipantsUpdate && message.IsParticipantsUpdate() { + s.hasPendingParticipantsUpdate = true + } s.pendingClientMessages = append(s.pendingClientMessages, message) if len(s.pendingClientMessages) >= warnPendingMessagesCount { log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages)) @@ -732,24 +755,6 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { } } -func (s *ClientSession) combinePendingMessages(messages []*ServerMessage) ([]*ServerMessage, error) { - var result []*ServerMessage - has_chat := false - for _, message := range messages { - if message.IsChatRefresh() { - if has_chat { - // Only send a single chat refresh message to the client. - continue - } - - has_chat = true - } - - result = append(result, message) - } - return result, nil -} - func (s *ClientSession) NotifySessionResumed(client *Client) { s.mu.Lock() if len(s.pendingClientMessages) == 0 { @@ -760,25 +765,18 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { return } - messages, err := s.combinePendingMessages(s.pendingClientMessages) + messages := s.pendingClientMessages + hasPendingParticipantsUpdate := s.hasPendingParticipantsUpdate s.pendingClientMessages = nil + s.hasPendingChat = false + s.hasPendingParticipantsUpdate = false s.mu.Unlock() - if err != nil { - client.writeError(err) - return - } log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId()) - had_participants_update := false - for _, message := range messages { - client.writeMessage(message) + // Send through session to handle connection interruptions. + s.SendMessages(messages) - if !had_participants_update { - had_participants_update = message.IsParticipantsUpdate() - } - } - - if !had_participants_update { + if !hasPendingParticipantsUpdate { // Only need to send initial participants list update if none was part of the pending messages. if room := s.GetRoom(); room != nil { room.NotifySessionResumed(client)