From dd327a841efbd52c070eefbeba7368d835d770e6 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 21 Oct 2020 14:16:30 +0200 Subject: [PATCH 1/2] Send MCU messages through the session. This helps with clients disconnecting / resuming while MCU responses are being sent to them. With the change, the messages are stored in the session and sent out when clients resume their session. --- src/signaling/clientsession.go | 68 ++++++++++++++++++++-------------- src/signaling/hub.go | 2 +- 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/src/signaling/clientsession.go b/src/signaling/clientsession.go index 85cfe5a..ed4c001 100644 --- a/src/signaling/clientsession.go +++ b/src/signaling/clientsession.go @@ -79,7 +79,7 @@ type ClientSession struct { publishers map[string]McuPublisher subscribers map[string]McuSubscriber - pendingClientMessages []*NatsMessage + pendingClientMessages []*ServerMessage } func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { @@ -490,12 +490,25 @@ func (s *ClientSession) sendCandidate(client McuClient, sender string, streamTyp }, } - if c := s.client; c != nil { - c.SendMessage(response_message) - } else { - // TODO(jojo): Should we store the candidate and send when a client is connected again? - log.Printf("Session %s received candidate %+v while no client was connected", s.PublicId(), candidate) + s.sendMessageUnlocked(response_message) +} + +func (s *ClientSession) sendMessageUnlocked(message *ServerMessage) bool { + if c := s.getClientUnlocked(); c != nil { + if c.SendMessage(message) { + return true + } } + + s.storePendingMessage(message) + return true +} + +func (s *ClientSession) SendMessage(message *ServerMessage) bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.sendMessageUnlocked(message) } func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { @@ -654,26 +667,27 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) { } } - s.mu.Lock() - defer s.mu.Unlock() - client := s.client - if client == nil { - s.pendingClientMessages = append(s.pendingClientMessages, &message) - if len(s.pendingClientMessages) >= warnPendingMessagesCount { - log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages)) - } + serverMessage := s.processNatsMessage(&message) + if serverMessage == nil { return } - s.processNatsMessage(client, &message) + s.SendMessage(serverMessage) } -func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) bool { +func (s *ClientSession) storePendingMessage(message *ServerMessage) { + s.pendingClientMessages = append(s.pendingClientMessages, message) + if len(s.pendingClientMessages) >= warnPendingMessagesCount { + log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages)) + } +} + +func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { switch msg.Type { case "message": if msg.Message == nil { log.Printf("Received NATS message without payload: %+v\n", msg) - return true + return nil } switch msg.Message.Type { @@ -682,14 +696,14 @@ func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) boo msg.Message.Message.Sender != nil && msg.Message.Message.Sender.SessionId == s.PublicId() { // Don't send message back to sender (can happen if sent to user or room) - return true + return nil } case "control": if msg.Message.Control != nil && msg.Message.Control.Sender != nil && msg.Message.Control.Sender.SessionId == s.PublicId() { // Don't send message back to sender (can happen if sent to user or room) - return true + return nil } case "event": if msg.Message.Event.Target == "participants" && @@ -711,18 +725,18 @@ func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) boo } } - return client.writeMessage(msg.Message) + return msg.Message default: log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg) - return true + return nil } } -func (s *ClientSession) combinePendingMessages(messages []*NatsMessage) ([]*NatsMessage, error) { - var result []*NatsMessage +func (s *ClientSession) combinePendingMessages(messages []*ServerMessage) ([]*ServerMessage, error) { + var result []*ServerMessage has_chat := false for _, message := range messages { - if message.Type == "message" && message.Message != nil && message.Message.IsChatRefresh() { + if message.IsChatRefresh() { if has_chat { // Only send a single chat refresh message to the client. continue @@ -757,12 +771,10 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId()) had_participants_update := false for _, message := range messages { - if !s.processNatsMessage(client, message) { - break - } + client.writeMessage(message) if !had_participants_update { - had_participants_update = message.Type == "message" && message.Message.IsParticipantsUpdate() + had_participants_update = message.IsParticipantsUpdate() } } diff --git a/src/signaling/hub.go b/src/signaling/hub.go index 9c960da..eb46c41 100644 --- a/src/signaling/hub.go +++ b/src/signaling/hub.go @@ -1517,7 +1517,7 @@ func (h *Hub) sendMcuMessageResponse(client *Client, session *ClientSession, mes } if response_message != nil { - client.SendMessage(response_message) + session.SendMessage(response_message) } } From e6f6f0a8464d603228460f75742b4c3ef581cd36 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 21 Oct 2020 14:34:53 +0200 Subject: [PATCH 2/2] Check combining while building pending messages instead of while sending out. --- src/signaling/clientsession.go | 64 ++++++++++++++++------------------ 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/src/signaling/clientsession.go b/src/signaling/clientsession.go index ed4c001..5c3cdf4 100644 --- a/src/signaling/clientsession.go +++ b/src/signaling/clientsession.go @@ -79,7 +79,9 @@ type ClientSession struct { publishers map[string]McuPublisher subscribers map[string]McuSubscriber - pendingClientMessages []*ServerMessage + pendingClientMessages []*ServerMessage + hasPendingChat bool + hasPendingParticipantsUpdate bool } func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { @@ -511,6 +513,16 @@ func (s *ClientSession) SendMessage(message *ServerMessage) bool { return s.sendMessageUnlocked(message) } +func (s *ClientSession) SendMessages(messages []*ServerMessage) bool { + s.mu.Lock() + defer s.mu.Unlock() + + for _, message := range messages { + s.sendMessageUnlocked(message) + } + return true +} + func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { s.mu.Lock() defer s.mu.Unlock() @@ -676,6 +688,17 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) { } func (s *ClientSession) storePendingMessage(message *ServerMessage) { + if message.IsChatRefresh() { + if s.hasPendingChat { + // Only send a single "chat-refresh" message on resume. + return + } + + s.hasPendingChat = true + } + if !s.hasPendingParticipantsUpdate && message.IsParticipantsUpdate() { + s.hasPendingParticipantsUpdate = true + } s.pendingClientMessages = append(s.pendingClientMessages, message) if len(s.pendingClientMessages) >= warnPendingMessagesCount { log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages)) @@ -732,24 +755,6 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { } } -func (s *ClientSession) combinePendingMessages(messages []*ServerMessage) ([]*ServerMessage, error) { - var result []*ServerMessage - has_chat := false - for _, message := range messages { - if message.IsChatRefresh() { - if has_chat { - // Only send a single chat refresh message to the client. - continue - } - - has_chat = true - } - - result = append(result, message) - } - return result, nil -} - func (s *ClientSession) NotifySessionResumed(client *Client) { s.mu.Lock() if len(s.pendingClientMessages) == 0 { @@ -760,25 +765,18 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { return } - messages, err := s.combinePendingMessages(s.pendingClientMessages) + messages := s.pendingClientMessages + hasPendingParticipantsUpdate := s.hasPendingParticipantsUpdate s.pendingClientMessages = nil + s.hasPendingChat = false + s.hasPendingParticipantsUpdate = false s.mu.Unlock() - if err != nil { - client.writeError(err) - return - } log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId()) - had_participants_update := false - for _, message := range messages { - client.writeMessage(message) + // Send through session to handle connection interruptions. + s.SendMessages(messages) - if !had_participants_update { - had_participants_update = message.IsParticipantsUpdate() - } - } - - if !had_participants_update { + if !hasPendingParticipantsUpdate { // Only need to send initial participants list update if none was part of the pending messages. if room := s.GetRoom(); room != nil { room.NotifySessionResumed(client)