Trigger "joined" events through async messages.

This commit is contained in:
Joachim Bauch 2022-06-15 15:09:15 +02:00
parent 0115c97946
commit ece2903413
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
5 changed files with 154 additions and 77 deletions

View file

@ -34,5 +34,13 @@ type AsyncMessage struct {
Permissions []Permission `json:"permissions,omitempty"`
AsyncRoom *AsyncRoomMessage `json:"asyncroom,omitempty"`
Id string `json:"id"`
}
type AsyncRoomMessage struct {
Type string `json:"type"`
SessionId string `json:"sessionid,omitempty"`
}

View file

@ -24,7 +24,7 @@ package signaling
import "sync"
type AsyncBackendRoomEventListener interface {
ProcessBackendRoomRequest(request *BackendServerRoomRequest)
ProcessBackendRoomRequest(message *AsyncMessage)
}
type AsyncRoomEventListener interface {
@ -75,7 +75,7 @@ type asyncBackendRoomSubscriber struct {
listeners map[AsyncBackendRoomEventListener]bool
}
func (s *asyncBackendRoomSubscriber) processBackendRoomRequest(message *BackendServerRoomRequest) {
func (s *asyncBackendRoomSubscriber) processBackendRoomRequest(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()

View file

@ -136,12 +136,7 @@ func (s *asyncBackendRoomSubscriberNats) doProcessMessage(msg *nats.Msg) {
return
}
switch message.Type {
case "room":
s.processBackendRoomRequest(message.Room)
default:
log.Printf("Unsupported NATS room request with type %s: %+v", message.Type, message)
}
s.processBackendRoomRequest(&message)
}
type asyncRoomSubscriberNats struct {
@ -256,22 +251,31 @@ func NewAsyncEventsNats(client NatsClient) (AsyncEvents, error) {
func (e *asyncEventsNats) Close() {
e.mu.Lock()
defer e.mu.Unlock()
var wg sync.WaitGroup
wg.Add(1)
go func(subscriptions map[string]*asyncBackendRoomSubscriberNats) {
defer wg.Done()
for _, sub := range subscriptions {
sub.close()
}
}(e.backendRoomSubscriptions)
wg.Add(1)
go func(subscriptions map[string]*asyncRoomSubscriberNats) {
defer wg.Done()
for _, sub := range subscriptions {
sub.close()
}
}(e.roomSubscriptions)
wg.Add(1)
go func(subscriptions map[string]*asyncUserSubscriberNats) {
defer wg.Done()
for _, sub := range subscriptions {
sub.close()
}
}(e.userSubscriptions)
wg.Add(1)
go func(subscriptions map[string]*asyncSessionSubscriberNats) {
defer wg.Done()
for _, sub := range subscriptions {
sub.close()
}
@ -280,6 +284,7 @@ func (e *asyncEventsNats) Close() {
e.roomSubscriptions = make(map[string]*asyncRoomSubscriberNats)
e.userSubscriptions = make(map[string]*asyncUserSubscriberNats)
e.sessionSubscriptions = make(map[string]*asyncSessionSubscriberNats)
wg.Wait()
e.client.Close()
}

60
hub.go
View file

@ -1199,65 +1199,7 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
session.SetPermissions(*room.Room.Permissions)
}
h.sendRoom(session, message, r)
h.notifyUserJoinedRoom(r, session, room.Room.Session)
}
func (h *Hub) notifyUserJoinedRoom(room *Room, session *ClientSession, sessionData *json.RawMessage) {
// Register session with the room
if sessions := room.AddSession(session, sessionData); len(sessions) > 0 {
events := make([]*EventServerMessageSessionEntry, 0, len(sessions))
for _, s := range sessions {
entry := &EventServerMessageSessionEntry{
SessionId: s.PublicId(),
UserId: s.UserId(),
User: s.UserData(),
}
if s, ok := s.(*ClientSession); ok {
entry.RoomSessionId = s.RoomSessionId()
}
events = append(events, entry)
}
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "room",
Type: "join",
Join: events,
},
}
// No need to send through asynchronous events, the session is connected locally.
session.SendMessage(msg)
// Notify about initial flags of virtual sessions.
for _, s := range sessions {
vsess, ok := s.(*VirtualSession)
if !ok {
continue
}
flags := vsess.Flags()
if flags == 0 {
continue
}
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "participants",
Type: "flags",
Flags: &RoomFlagsServerMessage{
RoomId: room.Id(),
SessionId: vsess.PublicId(),
Flags: vsess.Flags(),
},
},
}
// No need to send through asynchronous events, the session is connected locally.
session.SendMessage(msg)
}
}
r.AddSession(session, room.Room.Session)
}
func (h *Hub) processMessageMsg(client *Client, message *ClientMessage) {

142
room.go
View file

@ -202,7 +202,18 @@ func (r *Room) Close() []Session {
return result
}
func (r *Room) ProcessBackendRoomRequest(message *BackendServerRoomRequest) {
func (r *Room) ProcessBackendRoomRequest(message *AsyncMessage) {
switch message.Type {
case "room":
r.processBackendRoomRequestRoom(message.Room)
case "asyncroom":
r.processBackendRoomRequestAsyncRoom(message.AsyncRoom)
default:
log.Printf("Unsupported backend room request with type %s in %s: %+v", message.Type, r.id, message)
}
}
func (r *Room) processBackendRoomRequestRoom(message *BackendServerRoomRequest) {
received := message.ReceivedTime
if last, found := r.lastRoomRequests[message.Type]; found && last > received {
if msg, err := json.Marshal(message); err == nil {
@ -232,7 +243,16 @@ func (r *Room) ProcessBackendRoomRequest(message *BackendServerRoomRequest) {
}
}
func (r *Room) AddSession(session Session, sessionData *json.RawMessage) []Session {
func (r *Room) processBackendRoomRequestAsyncRoom(message *AsyncRoomMessage) {
switch message.Type {
case "sessionjoined":
r.notifySessionJoined(message.SessionId)
default:
log.Printf("Unsupported async room request with type %s in %s: %+v", message.Type, r.Id(), message)
}
}
func (r *Room) AddSession(session Session, sessionData *json.RawMessage) {
var roomSessionData *RoomSessionData
if sessionData != nil && len(*sessionData) > 0 {
roomSessionData = &RoomSessionData{}
@ -245,13 +265,6 @@ func (r *Room) AddSession(session Session, sessionData *json.RawMessage) []Sessi
sid := session.PublicId()
r.mu.Lock()
_, found := r.sessions[sid]
// Return list of sessions already in the room.
result := make([]Session, 0, len(r.sessions))
for _, s := range r.sessions {
if s != session {
result = append(result, s)
}
}
r.sessions[sid] = session
if !found {
r.statsRoomSessionsCurrent.With(prometheus.Labels{"clienttype": session.ClientType()}).Inc()
@ -287,7 +300,116 @@ func (r *Room) AddSession(session Session, sessionData *json.RawMessage) []Sessi
r.transientData.AddListener(clientSession)
}
}
return result
// Trigger notifications that the session joined.
if err := r.events.PublishBackendRoomMessage(r.id, r.backend, &AsyncMessage{
Type: "asyncroom",
AsyncRoom: &AsyncRoomMessage{
Type: "sessionjoined",
SessionId: sid,
},
}); err != nil {
log.Printf("Error publishing joined event for session %s: %s", sid, err)
}
}
func (r *Room) getOtherSessions(ignoreSessionId string) (Session, []Session) {
r.mu.Lock()
defer r.mu.Unlock()
sessions := make([]Session, 0, len(r.sessions))
for _, s := range r.sessions {
if s.PublicId() == ignoreSessionId {
continue
}
sessions = append(sessions, s)
}
return r.sessions[ignoreSessionId], sessions
}
func (r *Room) notifySessionJoined(sessionId string) {
session, sessions := r.getOtherSessions(sessionId)
if len(sessions) == 0 {
return
}
if session != nil && session.ClientType() != HelloClientTypeClient {
session = nil
}
events := make([]*EventServerMessageSessionEntry, 0, len(sessions))
for _, s := range sessions {
entry := &EventServerMessageSessionEntry{
SessionId: s.PublicId(),
UserId: s.UserId(),
User: s.UserData(),
}
if s, ok := s.(*ClientSession); ok {
entry.RoomSessionId = s.RoomSessionId()
}
events = append(events, entry)
}
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "room",
Type: "join",
Join: events,
},
}
if session != nil {
// No need to send through asynchronous events, the session is connected locally.
session.(*ClientSession).SendMessage(msg)
} else {
if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{
Type: "message",
Message: msg,
}); err != nil {
log.Printf("Error publishing joined events to session %s: %s", sessionId, err)
}
}
// Notify about initial flags of virtual sessions.
for _, s := range sessions {
vsess, ok := s.(*VirtualSession)
if !ok {
continue
}
flags := vsess.Flags()
if flags == 0 {
continue
}
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "participants",
Type: "flags",
Flags: &RoomFlagsServerMessage{
RoomId: r.id,
SessionId: vsess.PublicId(),
Flags: vsess.Flags(),
},
},
}
if session != nil {
// No need to send through asynchronous events, the session is connected locally.
session.(*ClientSession).SendMessage(msg)
} else {
if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{
Type: "message",
Message: msg,
}); err != nil {
log.Printf("Error publishing initial flags to session %s: %s", sessionId, err)
}
}
}
}
func (r *Room) HasSession(session Session) bool {