mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-04-25 19:00:31 +02:00
Check combining while building pending messages instead of while sending out.
This commit is contained in:
parent
dd327a841e
commit
e6f6f0a846
|
@ -79,7 +79,9 @@ type ClientSession struct {
|
|||
publishers map[string]McuPublisher
|
||||
subscribers map[string]McuSubscriber
|
||||
|
||||
pendingClientMessages []*ServerMessage
|
||||
pendingClientMessages []*ServerMessage
|
||||
hasPendingChat bool
|
||||
hasPendingParticipantsUpdate bool
|
||||
}
|
||||
|
||||
func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
|
||||
|
@ -511,6 +513,16 @@ func (s *ClientSession) SendMessage(message *ServerMessage) bool {
|
|||
return s.sendMessageUnlocked(message)
|
||||
}
|
||||
|
||||
func (s *ClientSession) SendMessages(messages []*ServerMessage) bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for _, message := range messages {
|
||||
s.sendMessageUnlocked(message)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -676,6 +688,17 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) {
|
|||
}
|
||||
|
||||
func (s *ClientSession) storePendingMessage(message *ServerMessage) {
|
||||
if message.IsChatRefresh() {
|
||||
if s.hasPendingChat {
|
||||
// Only send a single "chat-refresh" message on resume.
|
||||
return
|
||||
}
|
||||
|
||||
s.hasPendingChat = true
|
||||
}
|
||||
if !s.hasPendingParticipantsUpdate && message.IsParticipantsUpdate() {
|
||||
s.hasPendingParticipantsUpdate = true
|
||||
}
|
||||
s.pendingClientMessages = append(s.pendingClientMessages, message)
|
||||
if len(s.pendingClientMessages) >= warnPendingMessagesCount {
|
||||
log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages))
|
||||
|
@ -732,24 +755,6 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *ClientSession) combinePendingMessages(messages []*ServerMessage) ([]*ServerMessage, error) {
|
||||
var result []*ServerMessage
|
||||
has_chat := false
|
||||
for _, message := range messages {
|
||||
if message.IsChatRefresh() {
|
||||
if has_chat {
|
||||
// Only send a single chat refresh message to the client.
|
||||
continue
|
||||
}
|
||||
|
||||
has_chat = true
|
||||
}
|
||||
|
||||
result = append(result, message)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *ClientSession) NotifySessionResumed(client *Client) {
|
||||
s.mu.Lock()
|
||||
if len(s.pendingClientMessages) == 0 {
|
||||
|
@ -760,25 +765,18 @@ func (s *ClientSession) NotifySessionResumed(client *Client) {
|
|||
return
|
||||
}
|
||||
|
||||
messages, err := s.combinePendingMessages(s.pendingClientMessages)
|
||||
messages := s.pendingClientMessages
|
||||
hasPendingParticipantsUpdate := s.hasPendingParticipantsUpdate
|
||||
s.pendingClientMessages = nil
|
||||
s.hasPendingChat = false
|
||||
s.hasPendingParticipantsUpdate = false
|
||||
s.mu.Unlock()
|
||||
if err != nil {
|
||||
client.writeError(err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId())
|
||||
had_participants_update := false
|
||||
for _, message := range messages {
|
||||
client.writeMessage(message)
|
||||
// Send through session to handle connection interruptions.
|
||||
s.SendMessages(messages)
|
||||
|
||||
if !had_participants_update {
|
||||
had_participants_update = message.IsParticipantsUpdate()
|
||||
}
|
||||
}
|
||||
|
||||
if !had_participants_update {
|
||||
if !hasPendingParticipantsUpdate {
|
||||
// Only need to send initial participants list update if none was part of the pending messages.
|
||||
if room := s.GetRoom(); room != nil {
|
||||
room.NotifySessionResumed(client)
|
||||
|
|
Loading…
Reference in a new issue