diff --git a/src/signaling/clientsession.go b/src/signaling/clientsession.go index 85cfe5a..5c3cdf4 100644 --- a/src/signaling/clientsession.go +++ b/src/signaling/clientsession.go @@ -79,7 +79,9 @@ type ClientSession struct { publishers map[string]McuPublisher subscribers map[string]McuSubscriber - pendingClientMessages []*NatsMessage + pendingClientMessages []*ServerMessage + hasPendingChat bool + hasPendingParticipantsUpdate bool } func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { @@ -490,12 +492,35 @@ func (s *ClientSession) sendCandidate(client McuClient, sender string, streamTyp }, } - if c := s.client; c != nil { - c.SendMessage(response_message) - } else { - // TODO(jojo): Should we store the candidate and send when a client is connected again? - log.Printf("Session %s received candidate %+v while no client was connected", s.PublicId(), candidate) + s.sendMessageUnlocked(response_message) +} + +func (s *ClientSession) sendMessageUnlocked(message *ServerMessage) bool { + if c := s.getClientUnlocked(); c != nil { + if c.SendMessage(message) { + return true + } } + + s.storePendingMessage(message) + return true +} + +func (s *ClientSession) SendMessage(message *ServerMessage) bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.sendMessageUnlocked(message) +} + +func (s *ClientSession) SendMessages(messages []*ServerMessage) bool { + s.mu.Lock() + defer s.mu.Unlock() + + for _, message := range messages { + s.sendMessageUnlocked(message) + } + return true } func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { @@ -654,26 +679,38 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) { } } - s.mu.Lock() - defer s.mu.Unlock() - client := s.client - if client == nil { - s.pendingClientMessages = append(s.pendingClientMessages, &message) - if len(s.pendingClientMessages) >= warnPendingMessagesCount { - log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages)) - } + serverMessage := s.processNatsMessage(&message) + if serverMessage == nil { return } - s.processNatsMessage(client, &message) + s.SendMessage(serverMessage) } -func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) bool { +func (s *ClientSession) storePendingMessage(message *ServerMessage) { + if message.IsChatRefresh() { + if s.hasPendingChat { + // Only send a single "chat-refresh" message on resume. + return + } + + s.hasPendingChat = true + } + if !s.hasPendingParticipantsUpdate && message.IsParticipantsUpdate() { + s.hasPendingParticipantsUpdate = true + } + s.pendingClientMessages = append(s.pendingClientMessages, message) + if len(s.pendingClientMessages) >= warnPendingMessagesCount { + log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages)) + } +} + +func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { switch msg.Type { case "message": if msg.Message == nil { log.Printf("Received NATS message without payload: %+v\n", msg) - return true + return nil } switch msg.Message.Type { @@ -682,14 +719,14 @@ func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) boo msg.Message.Message.Sender != nil && msg.Message.Message.Sender.SessionId == s.PublicId() { // Don't send message back to sender (can happen if sent to user or room) - return true + return nil } case "control": if msg.Message.Control != nil && msg.Message.Control.Sender != nil && msg.Message.Control.Sender.SessionId == s.PublicId() { // Don't send message back to sender (can happen if sent to user or room) - return true + return nil } case "event": if msg.Message.Event.Target == "participants" && @@ -711,31 +748,13 @@ func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) boo } } - return client.writeMessage(msg.Message) + return msg.Message default: log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg) - return true + return nil } } -func (s *ClientSession) combinePendingMessages(messages []*NatsMessage) ([]*NatsMessage, error) { - var result []*NatsMessage - has_chat := false - for _, message := range messages { - if message.Type == "message" && message.Message != nil && message.Message.IsChatRefresh() { - if has_chat { - // Only send a single chat refresh message to the client. - continue - } - - has_chat = true - } - - result = append(result, message) - } - return result, nil -} - func (s *ClientSession) NotifySessionResumed(client *Client) { s.mu.Lock() if len(s.pendingClientMessages) == 0 { @@ -746,27 +765,18 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { return } - messages, err := s.combinePendingMessages(s.pendingClientMessages) + messages := s.pendingClientMessages + hasPendingParticipantsUpdate := s.hasPendingParticipantsUpdate s.pendingClientMessages = nil + s.hasPendingChat = false + s.hasPendingParticipantsUpdate = false s.mu.Unlock() - if err != nil { - client.writeError(err) - return - } log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId()) - had_participants_update := false - for _, message := range messages { - if !s.processNatsMessage(client, message) { - break - } + // Send through session to handle connection interruptions. + s.SendMessages(messages) - if !had_participants_update { - had_participants_update = message.Type == "message" && message.Message.IsParticipantsUpdate() - } - } - - if !had_participants_update { + if !hasPendingParticipantsUpdate { // Only need to send initial participants list update if none was part of the pending messages. if room := s.GetRoom(); room != nil { room.NotifySessionResumed(client) diff --git a/src/signaling/hub.go b/src/signaling/hub.go index 9c960da..eb46c41 100644 --- a/src/signaling/hub.go +++ b/src/signaling/hub.go @@ -1517,7 +1517,7 @@ func (h *Hub) sendMcuMessageResponse(client *Client, session *ClientSession, mes } if response_message != nil { - client.SendMessage(response_message) + session.SendMessage(response_message) } }