diff --git a/src/signaling/clientsession.go b/src/signaling/clientsession.go index 85cfe5a..ed4c001 100644 --- a/src/signaling/clientsession.go +++ b/src/signaling/clientsession.go @@ -79,7 +79,7 @@ type ClientSession struct { publishers map[string]McuPublisher subscribers map[string]McuSubscriber - pendingClientMessages []*NatsMessage + pendingClientMessages []*ServerMessage } func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { @@ -490,12 +490,25 @@ func (s *ClientSession) sendCandidate(client McuClient, sender string, streamTyp }, } - if c := s.client; c != nil { - c.SendMessage(response_message) - } else { - // TODO(jojo): Should we store the candidate and send when a client is connected again? - log.Printf("Session %s received candidate %+v while no client was connected", s.PublicId(), candidate) + s.sendMessageUnlocked(response_message) +} + +func (s *ClientSession) sendMessageUnlocked(message *ServerMessage) bool { + if c := s.getClientUnlocked(); c != nil { + if c.SendMessage(message) { + return true + } } + + s.storePendingMessage(message) + return true +} + +func (s *ClientSession) SendMessage(message *ServerMessage) bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.sendMessageUnlocked(message) } func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { @@ -654,26 +667,27 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) { } } - s.mu.Lock() - defer s.mu.Unlock() - client := s.client - if client == nil { - s.pendingClientMessages = append(s.pendingClientMessages, &message) - if len(s.pendingClientMessages) >= warnPendingMessagesCount { - log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages)) - } + serverMessage := s.processNatsMessage(&message) + if serverMessage == nil { return } - s.processNatsMessage(client, &message) + s.SendMessage(serverMessage) } -func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) bool { +func (s *ClientSession) storePendingMessage(message *ServerMessage) { + s.pendingClientMessages = append(s.pendingClientMessages, message) + if len(s.pendingClientMessages) >= warnPendingMessagesCount { + log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages)) + } +} + +func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage { switch msg.Type { case "message": if msg.Message == nil { log.Printf("Received NATS message without payload: %+v\n", msg) - return true + return nil } switch msg.Message.Type { @@ -682,14 +696,14 @@ func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) boo msg.Message.Message.Sender != nil && msg.Message.Message.Sender.SessionId == s.PublicId() { // Don't send message back to sender (can happen if sent to user or room) - return true + return nil } case "control": if msg.Message.Control != nil && msg.Message.Control.Sender != nil && msg.Message.Control.Sender.SessionId == s.PublicId() { // Don't send message back to sender (can happen if sent to user or room) - return true + return nil } case "event": if msg.Message.Event.Target == "participants" && @@ -711,18 +725,18 @@ func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) boo } } - return client.writeMessage(msg.Message) + return msg.Message default: log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg) - return true + return nil } } -func (s *ClientSession) combinePendingMessages(messages []*NatsMessage) ([]*NatsMessage, error) { - var result []*NatsMessage +func (s *ClientSession) combinePendingMessages(messages []*ServerMessage) ([]*ServerMessage, error) { + var result []*ServerMessage has_chat := false for _, message := range messages { - if message.Type == "message" && message.Message != nil && message.Message.IsChatRefresh() { + if message.IsChatRefresh() { if has_chat { // Only send a single chat refresh message to the client. continue @@ -757,12 +771,10 @@ func (s *ClientSession) NotifySessionResumed(client *Client) { log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId()) had_participants_update := false for _, message := range messages { - if !s.processNatsMessage(client, message) { - break - } + client.writeMessage(message) if !had_participants_update { - had_participants_update = message.Type == "message" && message.Message.IsParticipantsUpdate() + had_participants_update = message.IsParticipantsUpdate() } } diff --git a/src/signaling/hub.go b/src/signaling/hub.go index 9c960da..eb46c41 100644 --- a/src/signaling/hub.go +++ b/src/signaling/hub.go @@ -1517,7 +1517,7 @@ func (h *Hub) sendMcuMessageResponse(client *Client, session *ClientSession, mes } if response_message != nil { - client.SendMessage(response_message) + session.SendMessage(response_message) } }