From 5188c159c5b0eb4b617c0d848f358da912b1751c Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 25 Nov 2020 11:24:35 +0100 Subject: [PATCH] Update virtual sessions for SIP support in Talk. --- src/signaling/api_backend.go | 5 ++ src/signaling/api_signaling.go | 16 +++++ src/signaling/hub.go | 90 +++++++++++++++++++++------- src/signaling/room.go | 46 ++++++++++++-- src/signaling/testclient_test.go | 12 ++++ src/signaling/virtualsession.go | 6 ++ src/signaling/virtualsession_test.go | 78 +++++++++++++++++++++--- 7 files changed, 221 insertions(+), 32 deletions(-) diff --git a/src/signaling/api_backend.go b/src/signaling/api_backend.go index a58b38a..426948f 100644 --- a/src/signaling/api_backend.go +++ b/src/signaling/api_backend.go @@ -195,6 +195,11 @@ type BackendClientRoomRequest struct { Action string `json:"action,omitempty"` UserId string `json:"userid"` SessionId string `json:"sessionid"` + + // For Nextcloud Talk with SIP support. + ActorId string `json:"actorid,omitempty"` + ActorType string `json:"actortype,omitempty"` + InCall int `json:"incall,omitempty"` } func NewBackendClientRoomRequest(roomid string, userid string, sessionid string) *BackendClientRequest { diff --git a/src/signaling/api_signaling.go b/src/signaling/api_signaling.go index 2a67b89..c69de5c 100644 --- a/src/signaling/api_signaling.go +++ b/src/signaling/api_signaling.go @@ -453,12 +453,19 @@ func (m *CommonSessionInternalClientMessage) CheckValid() error { return nil } +type AddSessionOptions struct { + ActorId string `json:"actorId,omitempty"` + ActorType string `json:"actorType,omitempty"` +} + type AddSessionInternalClientMessage struct { CommonSessionInternalClientMessage UserId string `json:"userid,omitempty"` User *json.RawMessage `json:"user,omitempty"` Flags uint32 `json:"flags,omitempty"` + + Options *AddSessionOptions `json:"options,omitempy"` } func (m *AddSessionInternalClientMessage) CheckValid() error { @@ -483,6 +490,8 @@ func (m *UpdateSessionInternalClientMessage) CheckValid() error { type RemoveSessionInternalClientMessage struct { CommonSessionInternalClientMessage + + UserId string `json:"userid,omitempty"` } func (m *RemoveSessionInternalClientMessage) CheckValid() error { @@ -553,6 +562,12 @@ type RoomEventMessage struct { Data *json.RawMessage `json:"data,omitempty"` } +type RoomFlagsServerMessage struct { + RoomId string `json:"roomid"` + SessionId string `json:"sessionid"` + Flags uint32 `json:"flags"` +} + type EventServerMessage struct { Target string `json:"target"` Type string `json:"type"` @@ -566,6 +581,7 @@ type EventServerMessage struct { Invite *RoomEventServerMessage `json:"invite,omitempty"` Disinvite *RoomDisinviteEventServerMessage `json:"disinvite,omitempty"` Update *RoomEventServerMessage `json:"update,omitempty"` + Flags *RoomFlagsServerMessage `json:"flags,omitempty"` // Used for target "message" Message *RoomEventMessage `json:"message,omitempty"` diff --git a/src/signaling/hub.go b/src/signaling/hub.go index 742c048..5c7f795 100644 --- a/src/signaling/hub.go +++ b/src/signaling/hub.go @@ -1437,7 +1437,7 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) { } func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { - msg := message.Control + msg := message.Internal session := client.GetSession() if session == nil { // Client is not connected yet. @@ -1447,9 +1447,9 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { return } - switch message.Internal.Type { + switch msg.Type { case "addsession": - msg := message.Internal.AddSession + msg := msg.AddSession room := h.getRoom(msg.RoomId) if room == nil { log.Printf("Ignore add session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) @@ -1473,13 +1473,35 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { virtualSessionId := GetVirtualSessionId(session, msg.SessionId) - request := NewBackendClientSessionRequest(room.Id(), "add", publicSessionId, msg) - var response BackendClientSessionResponse - if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { - log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) - reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session.")) - client.SendMessage(reply) - return + if msg.Options != nil { + request := NewBackendClientRoomRequest(room.Id(), msg.UserId, publicSessionId) + request.Room.ActorId = msg.Options.ActorId + request.Room.ActorType = msg.Options.ActorType + request.Room.InCall = FlagInCall | FlagWithPhone + + var response BackendClientResponse + if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { + log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) + reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session.")) + client.SendMessage(reply) + return + } + + if response.Type == "error" { + log.Printf("Could not join virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error) + reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error())) + client.SendMessage(reply) + return + } + } else { + request := NewBackendClientSessionRequest(room.Id(), "add", publicSessionId, msg) + var response BackendClientSessionResponse + if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { + log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) + reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session.")) + client.SendMessage(reply) + return + } } sess := NewVirtualSession(session, privateSessionId, publicSessionId, sessionIdData, msg) @@ -1492,7 +1514,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { sess.SetRoom(room) room.AddSession(sess, nil) case "updatesession": - msg := message.Internal.UpdateSession + msg := msg.UpdateSession room := h.getRoom(msg.RoomId) if room == nil { log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) @@ -1525,7 +1547,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { } } case "removesession": - msg := message.Internal.RemoveSession + msg := msg.RemoveSession room := h.getRoom(msg.RoomId) if room == nil { log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) @@ -1551,18 +1573,46 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) { ctx, cancel := context.WithTimeout(context.Background(), h.backendTimeout) defer cancel() - request := NewBackendClientSessionRequest(room.Id(), "remove", sess.PublicId(), nil) - var response BackendClientSessionResponse - err := h.backend.PerformJSONRequest(ctx, sess.ParsedBackendUrl(), request, &response) - if err != nil { - log.Printf("Could not remove virtual session %s from backend %s: %s", sess.PublicId(), sess.BackendUrl(), err) - reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend.")) - client.SendMessage(reply) + var options *AddSessionOptions + if vsess, ok := sess.(*VirtualSession); ok { + options = vsess.Options() + } + if options != nil { + request := NewBackendClientRoomRequest(room.Id(), sess.UserId(), sess.PublicId()) + request.Room.Action = "leave" + if options != nil { + request.Room.ActorId = options.ActorId + request.Room.ActorType = options.ActorType + } + + var response BackendClientResponse + if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil { + log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err) + reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session.")) + client.SendMessage(reply) + return + } + + if response.Type == "error" { + log.Printf("Could not leave virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error) + reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error())) + client.SendMessage(reply) + return + } + } else { + request := NewBackendClientSessionRequest(room.Id(), "remove", sess.PublicId(), nil) + var response BackendClientSessionResponse + err := h.backend.PerformJSONRequest(ctx, sess.ParsedBackendUrl(), request, &response) + if err != nil { + log.Printf("Could not remove virtual session %s from backend %s: %s", sess.PublicId(), sess.BackendUrl(), err) + reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend.")) + client.SendMessage(reply) + } } }() } default: - log.Printf("Ignore unsupported internal message %+v from %s", message.Internal, session.PublicId()) + log.Printf("Ignore unsupported internal message %+v from %s", msg, session.PublicId()) return } } diff --git a/src/signaling/room.go b/src/signaling/room.go index d639b7b..ea170e2 100644 --- a/src/signaling/room.go +++ b/src/signaling/room.go @@ -40,6 +40,7 @@ const ( FlagInCall = 1 FlagWithAudio = 2 FlagWithVideo = 4 + FlagWithPhone = 8 ) var ( @@ -228,6 +229,7 @@ func (r *Room) processBackendRoomRequest(message *BackendServerRoomRequest) { case "update": r.hub.roomUpdated <- message case "delete": + r.notifyInternalRoomDeleted() r.hub.roomDeleted <- message case "incall": r.hub.roomInCall <- message @@ -284,6 +286,9 @@ func (r *Room) AddSession(session Session, sessionData *json.RawMessage) []Sessi r.PublishSessionJoined(session, roomSessionData) if publishUsersChanged { r.publishUsersChangedWithInternal() + if session, ok := session.(*VirtualSession); ok && session.Flags() != 0 { + r.publishSessionFlagsChanged(session) + } } } return result @@ -421,7 +426,7 @@ func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string] } for session := range r.internalSessions { users = append(users, map[string]interface{}{ - "inCall": true, + "inCall": FlagInCall | FlagWithAudio, "sessionId": session.PublicId(), "lastPing": now, "internal": true, @@ -429,11 +434,10 @@ func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string] } for session := range r.virtualSessions { users = append(users, map[string]interface{}{ - "inCall": true, + "inCall": FlagInCall | FlagWithPhone, "sessionId": session.PublicId(), "lastPing": now, "virtual": true, - "flags": session.Flags(), }) } r.mu.Unlock() @@ -583,7 +587,12 @@ func (r *Room) NotifySessionChanged(session Session) { return } - r.publishUsersChangedWithInternal() + virtual, ok := session.(*VirtualSession) + if !ok { + return + } + + r.publishSessionFlagsChanged(virtual) } func (r *Room) publishUsersChangedWithInternal() { @@ -591,6 +600,22 @@ func (r *Room) publishUsersChangedWithInternal() { r.publish(message) } +func (r *Room) publishSessionFlagsChanged(session *VirtualSession) { + message := &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "participants", + Type: "flags", + Flags: &RoomFlagsServerMessage{ + RoomId: r.id, + SessionId: session.PublicId(), + Flags: session.Flags(), + }, + }, + } + r.publish(message) +} + func (r *Room) publishActiveSessions() { r.mu.Lock() defer r.mu.Unlock() @@ -667,3 +692,16 @@ func (r *Room) publishRoomMessage(message *BackendRoomMessageRequest) { } r.publish(msg) } + +func (r *Room) notifyInternalRoomDeleted() { + msg := &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "room", + Type: "delete", + }, + } + for s := range r.internalSessions { + s.(*ClientSession).SendMessage(msg) + } +} diff --git a/src/signaling/testclient_test.go b/src/signaling/testclient_test.go index 28198d2..4de1812 100644 --- a/src/signaling/testclient_test.go +++ b/src/signaling/testclient_test.go @@ -640,6 +640,18 @@ func checkMessageParticipantsInCall(message *ServerMessage) (*RoomEventServerMes return message.Event.Update, nil } +func checkMessageParticipantFlags(message *ServerMessage) (*RoomFlagsServerMessage, error) { + if err := checkMessageType(message, "event"); err != nil { + return nil, err + } else if message.Event.Target != "participants" { + return nil, fmt.Errorf("Expected event target room, got %+v", message.Event) + } else if message.Event.Type != "flags" || message.Event.Flags == nil { + return nil, fmt.Errorf("Expected event type flags, got %+v", message.Event) + } + + return message.Event.Flags, nil +} + func checkMessageRoomMessage(message *ServerMessage) (*RoomEventMessage, error) { if err := checkMessageType(message, "event"); err != nil { return nil, err diff --git a/src/signaling/virtualsession.go b/src/signaling/virtualsession.go index c6e6d2f..1bb91c6 100644 --- a/src/signaling/virtualsession.go +++ b/src/signaling/virtualsession.go @@ -48,6 +48,7 @@ type VirtualSession struct { userId string userData *json.RawMessage flags uint32 + options *AddSessionOptions } func GetVirtualSessionId(session *ClientSession, sessionId string) string { @@ -65,6 +66,7 @@ func NewVirtualSession(session *ClientSession, privateId string, publicId string userId: msg.UserId, userData: msg.User, flags: msg.Flags, + options: msg.Options, } } @@ -193,3 +195,7 @@ func (s *VirtualSession) SetFlags(flags uint32) bool { func (s *VirtualSession) Flags() uint32 { return atomic.LoadUint32(&s.flags) } + +func (s *VirtualSession) Options() *AddSessionOptions { + return s.options +} diff --git a/src/signaling/virtualsession_test.go b/src/signaling/virtualsession_test.go index ffc8b62..11041b8 100644 --- a/src/signaling/virtualsession_test.go +++ b/src/signaling/virtualsession_test.go @@ -144,10 +144,58 @@ func TestVirtualSession(t *testing.T) { t.Errorf("Expected session id %s, got %+v", sessionId, updateMsg.Users[0]) } else if virtual, ok := updateMsg.Users[0]["virtual"].(bool); !ok || !virtual { t.Errorf("Expected virtual user, got %+v", updateMsg.Users[0]) - } else if inCall, ok := updateMsg.Users[0]["inCall"].(bool); !ok || !inCall { + } else if inCall, ok := updateMsg.Users[0]["inCall"].(float64); !ok || inCall == 0 { t.Errorf("Expected user in call, got %+v", updateMsg.Users[0]) - } else if flags, ok := updateMsg.Users[0]["flags"].(float64); !ok || flags != FLAG_MUTED_SPEAKING { - t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, updateMsg.Users[0]) + } + + msg3, err := client.RunUntilMessage(ctx) + if err != nil { + t.Fatal(err) + } + + flagsMsg, err := checkMessageParticipantFlags(msg3) + if err != nil { + t.Error(err) + } else if flagsMsg.RoomId != roomId { + t.Errorf("Expected room %s, got %s", roomId, flagsMsg.RoomId) + } else if flagsMsg.SessionId != sessionId { + t.Errorf("Expected session id %s, got %s", sessionId, flagsMsg.SessionId) + } else if flagsMsg.Flags != FLAG_MUTED_SPEAKING { + t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, flagsMsg.Flags) + } + + newFlags := uint32(FLAG_TALKING) + msgFlags := &ClientMessage{ + Type: "internal", + Internal: &InternalClientMessage{ + Type: "updatesession", + UpdateSession: &UpdateSessionInternalClientMessage{ + CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{ + SessionId: internalSessionId, + RoomId: roomId, + }, + Flags: &newFlags, + }, + }, + } + if err := clientInternal.WriteJSON(msgFlags); err != nil { + t.Fatal(err) + } + + msg4, err := client.RunUntilMessage(ctx) + if err != nil { + t.Fatal(err) + } + + flagsMsg, err = checkMessageParticipantFlags(msg4) + if err != nil { + t.Error(err) + } else if flagsMsg.RoomId != roomId { + t.Errorf("Expected room %s, got %s", roomId, flagsMsg.RoomId) + } else if flagsMsg.SessionId != sessionId { + t.Errorf("Expected session id %s, got %s", sessionId, flagsMsg.SessionId) + } else if flagsMsg.Flags != newFlags { + t.Errorf("Expected flags %d, got %+v", newFlags, flagsMsg.Flags) } // When sending to a virtual session, the message is sent to the actual @@ -200,11 +248,11 @@ func TestVirtualSession(t *testing.T) { t.Fatal(err) } - msg3, err := client.RunUntilMessage(ctx) + msg5, err := client.RunUntilMessage(ctx) if err != nil { t.Fatal(err) } - if err := client.checkMessageRoomLeaveSession(msg3, sessionId); err != nil { + if err := client.checkMessageRoomLeaveSession(msg5, sessionId); err != nil { t.Error(err) } } @@ -324,10 +372,24 @@ func TestVirtualSessionCleanup(t *testing.T) { t.Errorf("Expected session id %s, got %+v", sessionId, updateMsg.Users[0]) } else if virtual, ok := updateMsg.Users[0]["virtual"].(bool); !ok || !virtual { t.Errorf("Expected virtual user, got %+v", updateMsg.Users[0]) - } else if inCall, ok := updateMsg.Users[0]["inCall"].(bool); !ok || !inCall { + } else if inCall, ok := updateMsg.Users[0]["inCall"].(float64); !ok || inCall == 0 { t.Errorf("Expected user in call, got %+v", updateMsg.Users[0]) - } else if flags, ok := updateMsg.Users[0]["flags"].(float64); !ok || flags != FLAG_MUTED_SPEAKING { - t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, updateMsg.Users[0]) + } + + msg3, err := client.RunUntilMessage(ctx) + if err != nil { + t.Fatal(err) + } + + flagsMsg, err := checkMessageParticipantFlags(msg3) + if err != nil { + t.Error(err) + } else if flagsMsg.RoomId != roomId { + t.Errorf("Expected room %s, got %s", roomId, flagsMsg.RoomId) + } else if flagsMsg.SessionId != sessionId { + t.Errorf("Expected session id %s, got %s", sessionId, flagsMsg.SessionId) + } else if flagsMsg.Flags != FLAG_MUTED_SPEAKING { + t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, flagsMsg.Flags) } // The virtual sessions are closed when the parent session is deleted.