Send MCU messages through the session.

This helps with clients disconnecting / resuming while MCU responses
are being sent to them. With the change, the messages are stored in
the session and sent out when clients resume their session.
This commit is contained in:
Joachim Bauch 2020-10-21 14:16:30 +02:00
parent 644d9a1737
commit dd327a841e
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 41 additions and 29 deletions

View file

@ -79,7 +79,7 @@ type ClientSession struct {
publishers map[string]McuPublisher publishers map[string]McuPublisher
subscribers map[string]McuSubscriber subscribers map[string]McuSubscriber
pendingClientMessages []*NatsMessage pendingClientMessages []*ServerMessage
} }
func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) { func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
@ -490,12 +490,25 @@ func (s *ClientSession) sendCandidate(client McuClient, sender string, streamTyp
}, },
} }
if c := s.client; c != nil { s.sendMessageUnlocked(response_message)
c.SendMessage(response_message) }
} else {
// TODO(jojo): Should we store the candidate and send when a client is connected again? func (s *ClientSession) sendMessageUnlocked(message *ServerMessage) bool {
log.Printf("Session %s received candidate %+v while no client was connected", s.PublicId(), candidate) if c := s.getClientUnlocked(); c != nil {
if c.SendMessage(message) {
return true
}
} }
s.storePendingMessage(message)
return true
}
func (s *ClientSession) SendMessage(message *ServerMessage) bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.sendMessageUnlocked(message)
} }
func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) { func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) {
@ -654,26 +667,27 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) {
} }
} }
s.mu.Lock() serverMessage := s.processNatsMessage(&message)
defer s.mu.Unlock() if serverMessage == nil {
client := s.client
if client == nil {
s.pendingClientMessages = append(s.pendingClientMessages, &message)
if len(s.pendingClientMessages) >= warnPendingMessagesCount {
log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages))
}
return return
} }
s.processNatsMessage(client, &message) s.SendMessage(serverMessage)
} }
func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) bool { func (s *ClientSession) storePendingMessage(message *ServerMessage) {
s.pendingClientMessages = append(s.pendingClientMessages, message)
if len(s.pendingClientMessages) >= warnPendingMessagesCount {
log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages))
}
}
func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage {
switch msg.Type { switch msg.Type {
case "message": case "message":
if msg.Message == nil { if msg.Message == nil {
log.Printf("Received NATS message without payload: %+v\n", msg) log.Printf("Received NATS message without payload: %+v\n", msg)
return true return nil
} }
switch msg.Message.Type { switch msg.Message.Type {
@ -682,14 +696,14 @@ func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) boo
msg.Message.Message.Sender != nil && msg.Message.Message.Sender != nil &&
msg.Message.Message.Sender.SessionId == s.PublicId() { msg.Message.Message.Sender.SessionId == s.PublicId() {
// Don't send message back to sender (can happen if sent to user or room) // Don't send message back to sender (can happen if sent to user or room)
return true return nil
} }
case "control": case "control":
if msg.Message.Control != nil && if msg.Message.Control != nil &&
msg.Message.Control.Sender != nil && msg.Message.Control.Sender != nil &&
msg.Message.Control.Sender.SessionId == s.PublicId() { msg.Message.Control.Sender.SessionId == s.PublicId() {
// Don't send message back to sender (can happen if sent to user or room) // Don't send message back to sender (can happen if sent to user or room)
return true return nil
} }
case "event": case "event":
if msg.Message.Event.Target == "participants" && if msg.Message.Event.Target == "participants" &&
@ -711,18 +725,18 @@ func (s *ClientSession) processNatsMessage(client *Client, msg *NatsMessage) boo
} }
} }
return client.writeMessage(msg.Message) return msg.Message
default: default:
log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg) log.Printf("Received NATS message with unsupported type %s: %+v\n", msg.Type, msg)
return true return nil
} }
} }
func (s *ClientSession) combinePendingMessages(messages []*NatsMessage) ([]*NatsMessage, error) { func (s *ClientSession) combinePendingMessages(messages []*ServerMessage) ([]*ServerMessage, error) {
var result []*NatsMessage var result []*ServerMessage
has_chat := false has_chat := false
for _, message := range messages { for _, message := range messages {
if message.Type == "message" && message.Message != nil && message.Message.IsChatRefresh() { if message.IsChatRefresh() {
if has_chat { if has_chat {
// Only send a single chat refresh message to the client. // Only send a single chat refresh message to the client.
continue continue
@ -757,12 +771,10 @@ func (s *ClientSession) NotifySessionResumed(client *Client) {
log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId()) log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId())
had_participants_update := false had_participants_update := false
for _, message := range messages { for _, message := range messages {
if !s.processNatsMessage(client, message) { client.writeMessage(message)
break
}
if !had_participants_update { if !had_participants_update {
had_participants_update = message.Type == "message" && message.Message.IsParticipantsUpdate() had_participants_update = message.IsParticipantsUpdate()
} }
} }

View file

@ -1517,7 +1517,7 @@ func (h *Hub) sendMcuMessageResponse(client *Client, session *ClientSession, mes
} }
if response_message != nil { if response_message != nil {
client.SendMessage(response_message) session.SendMessage(response_message)
} }
} }