From ece2903413913a62e5b5cc8b07022aa7e73aea2c Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 15 Jun 2022 15:09:15 +0200 Subject: [PATCH] Trigger "joined" events through async messages. --- api_async.go | 8 +++ async_events.go | 4 +- async_events_nats.go | 17 ++++-- hub.go | 60 +----------------- room.go | 142 ++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 154 insertions(+), 77 deletions(-) diff --git a/api_async.go b/api_async.go index 2b5e954..f831341 100644 --- a/api_async.go +++ b/api_async.go @@ -34,5 +34,13 @@ type AsyncMessage struct { Permissions []Permission `json:"permissions,omitempty"` + AsyncRoom *AsyncRoomMessage `json:"asyncroom,omitempty"` + Id string `json:"id"` } + +type AsyncRoomMessage struct { + Type string `json:"type"` + + SessionId string `json:"sessionid,omitempty"` +} diff --git a/async_events.go b/async_events.go index 5f7938b..4fb34d8 100644 --- a/async_events.go +++ b/async_events.go @@ -24,7 +24,7 @@ package signaling import "sync" type AsyncBackendRoomEventListener interface { - ProcessBackendRoomRequest(request *BackendServerRoomRequest) + ProcessBackendRoomRequest(message *AsyncMessage) } type AsyncRoomEventListener interface { @@ -75,7 +75,7 @@ type asyncBackendRoomSubscriber struct { listeners map[AsyncBackendRoomEventListener]bool } -func (s *asyncBackendRoomSubscriber) processBackendRoomRequest(message *BackendServerRoomRequest) { +func (s *asyncBackendRoomSubscriber) processBackendRoomRequest(message *AsyncMessage) { s.mu.Lock() defer s.mu.Unlock() diff --git a/async_events_nats.go b/async_events_nats.go index 00af5ed..04699c6 100644 --- a/async_events_nats.go +++ b/async_events_nats.go @@ -136,12 +136,7 @@ func (s *asyncBackendRoomSubscriberNats) doProcessMessage(msg *nats.Msg) { return } - switch message.Type { - case "room": - s.processBackendRoomRequest(message.Room) - default: - log.Printf("Unsupported NATS room request with type %s: %+v", message.Type, message) - } + s.processBackendRoomRequest(&message) } type asyncRoomSubscriberNats struct { @@ -256,22 +251,31 @@ func NewAsyncEventsNats(client NatsClient) (AsyncEvents, error) { func (e *asyncEventsNats) Close() { e.mu.Lock() defer e.mu.Unlock() + var wg sync.WaitGroup + wg.Add(1) go func(subscriptions map[string]*asyncBackendRoomSubscriberNats) { + defer wg.Done() for _, sub := range subscriptions { sub.close() } }(e.backendRoomSubscriptions) + wg.Add(1) go func(subscriptions map[string]*asyncRoomSubscriberNats) { + defer wg.Done() for _, sub := range subscriptions { sub.close() } }(e.roomSubscriptions) + wg.Add(1) go func(subscriptions map[string]*asyncUserSubscriberNats) { + defer wg.Done() for _, sub := range subscriptions { sub.close() } }(e.userSubscriptions) + wg.Add(1) go func(subscriptions map[string]*asyncSessionSubscriberNats) { + defer wg.Done() for _, sub := range subscriptions { sub.close() } @@ -280,6 +284,7 @@ func (e *asyncEventsNats) Close() { e.roomSubscriptions = make(map[string]*asyncRoomSubscriberNats) e.userSubscriptions = make(map[string]*asyncUserSubscriberNats) e.sessionSubscriptions = make(map[string]*asyncSessionSubscriberNats) + wg.Wait() e.client.Close() } diff --git a/hub.go b/hub.go index 9675dae..403e38e 100644 --- a/hub.go +++ b/hub.go @@ -1199,65 +1199,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro session.SetPermissions(*room.Room.Permissions) } h.sendRoom(session, message, r) - h.notifyUserJoinedRoom(r, session, room.Room.Session) -} - -func (h *Hub) notifyUserJoinedRoom(room *Room, session *ClientSession, sessionData *json.RawMessage) { - // Register session with the room - if sessions := room.AddSession(session, sessionData); len(sessions) > 0 { - events := make([]*EventServerMessageSessionEntry, 0, len(sessions)) - for _, s := range sessions { - entry := &EventServerMessageSessionEntry{ - SessionId: s.PublicId(), - UserId: s.UserId(), - User: s.UserData(), - } - if s, ok := s.(*ClientSession); ok { - entry.RoomSessionId = s.RoomSessionId() - } - events = append(events, entry) - } - msg := &ServerMessage{ - Type: "event", - Event: &EventServerMessage{ - Target: "room", - Type: "join", - Join: events, - }, - } - - // No need to send through asynchronous events, the session is connected locally. - session.SendMessage(msg) - - // Notify about initial flags of virtual sessions. - for _, s := range sessions { - vsess, ok := s.(*VirtualSession) - if !ok { - continue - } - - flags := vsess.Flags() - if flags == 0 { - continue - } - - msg := &ServerMessage{ - Type: "event", - Event: &EventServerMessage{ - Target: "participants", - Type: "flags", - Flags: &RoomFlagsServerMessage{ - RoomId: room.Id(), - SessionId: vsess.PublicId(), - Flags: vsess.Flags(), - }, - }, - } - - // No need to send through asynchronous events, the session is connected locally. - session.SendMessage(msg) - } - } + r.AddSession(session, room.Room.Session) } func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) { diff --git a/room.go b/room.go index 0cd5b22..5c8a789 100644 --- a/room.go +++ b/room.go @@ -202,7 +202,18 @@ func (r *Room) Close() []Session { return result } -func (r *Room) ProcessBackendRoomRequest(message *BackendServerRoomRequest) { +func (r *Room) ProcessBackendRoomRequest(message *AsyncMessage) { + switch message.Type { + case "room": + r.processBackendRoomRequestRoom(message.Room) + case "asyncroom": + r.processBackendRoomRequestAsyncRoom(message.AsyncRoom) + default: + log.Printf("Unsupported backend room request with type %s in %s: %+v", message.Type, r.id, message) + } +} + +func (r *Room) processBackendRoomRequestRoom(message *BackendServerRoomRequest) { received := message.ReceivedTime if last, found := r.lastRoomRequests[message.Type]; found && last > received { if msg, err := json.Marshal(message); err == nil { @@ -232,7 +243,16 @@ func (r *Room) ProcessBackendRoomRequest(message *BackendServerRoomRequest) { } } -func (r *Room) AddSession(session Session, sessionData *json.RawMessage) []Session { +func (r *Room) processBackendRoomRequestAsyncRoom(message *AsyncRoomMessage) { + switch message.Type { + case "sessionjoined": + r.notifySessionJoined(message.SessionId) + default: + log.Printf("Unsupported async room request with type %s in %s: %+v", message.Type, r.Id(), message) + } +} + +func (r *Room) AddSession(session Session, sessionData *json.RawMessage) { var roomSessionData *RoomSessionData if sessionData != nil && len(*sessionData) > 0 { roomSessionData = &RoomSessionData{} @@ -245,13 +265,6 @@ func (r *Room) AddSession(session Session, sessionData *json.RawMessage) []Sessi sid := session.PublicId() r.mu.Lock() _, found := r.sessions[sid] - // Return list of sessions already in the room. - result := make([]Session, 0, len(r.sessions)) - for _, s := range r.sessions { - if s != session { - result = append(result, s) - } - } r.sessions[sid] = session if !found { r.statsRoomSessionsCurrent.With(prometheus.Labels{"clienttype": session.ClientType()}).Inc() @@ -287,7 +300,116 @@ func (r *Room) AddSession(session Session, sessionData *json.RawMessage) []Sessi r.transientData.AddListener(clientSession) } } - return result + + // Trigger notifications that the session joined. + if err := r.events.PublishBackendRoomMessage(r.id, r.backend, &AsyncMessage{ + Type: "asyncroom", + AsyncRoom: &AsyncRoomMessage{ + Type: "sessionjoined", + SessionId: sid, + }, + }); err != nil { + log.Printf("Error publishing joined event for session %s: %s", sid, err) + } +} + +func (r *Room) getOtherSessions(ignoreSessionId string) (Session, []Session) { + r.mu.Lock() + defer r.mu.Unlock() + + sessions := make([]Session, 0, len(r.sessions)) + for _, s := range r.sessions { + if s.PublicId() == ignoreSessionId { + continue + } + + sessions = append(sessions, s) + } + + return r.sessions[ignoreSessionId], sessions +} + +func (r *Room) notifySessionJoined(sessionId string) { + session, sessions := r.getOtherSessions(sessionId) + if len(sessions) == 0 { + return + } + + if session != nil && session.ClientType() != HelloClientTypeClient { + session = nil + } + + events := make([]*EventServerMessageSessionEntry, 0, len(sessions)) + for _, s := range sessions { + entry := &EventServerMessageSessionEntry{ + SessionId: s.PublicId(), + UserId: s.UserId(), + User: s.UserData(), + } + if s, ok := s.(*ClientSession); ok { + entry.RoomSessionId = s.RoomSessionId() + } + events = append(events, entry) + } + + msg := &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "room", + Type: "join", + Join: events, + }, + } + + if session != nil { + // No need to send through asynchronous events, the session is connected locally. + session.(*ClientSession).SendMessage(msg) + } else { + if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{ + Type: "message", + Message: msg, + }); err != nil { + log.Printf("Error publishing joined events to session %s: %s", sessionId, err) + } + } + + // Notify about initial flags of virtual sessions. + for _, s := range sessions { + vsess, ok := s.(*VirtualSession) + if !ok { + continue + } + + flags := vsess.Flags() + if flags == 0 { + continue + } + + msg := &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "participants", + Type: "flags", + Flags: &RoomFlagsServerMessage{ + RoomId: r.id, + SessionId: vsess.PublicId(), + Flags: vsess.Flags(), + }, + }, + } + + if session != nil { + // No need to send through asynchronous events, the session is connected locally. + session.(*ClientSession).SendMessage(msg) + } else { + if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{ + Type: "message", + Message: msg, + }); err != nil { + log.Printf("Error publishing initial flags to session %s: %s", sessionId, err) + } + } + } } func (r *Room) HasSession(session Session) bool {