Update virtual sessions for SIP support in Talk.

This commit is contained in:
Joachim Bauch 2020-11-25 11:24:35 +01:00
parent b6ce723a8b
commit 5188c159c5
Failed to extract signature
7 changed files with 221 additions and 32 deletions

View file

@ -195,6 +195,11 @@ type BackendClientRoomRequest struct {
Action string `json:"action,omitempty"`
UserId string `json:"userid"`
SessionId string `json:"sessionid"`
// For Nextcloud Talk with SIP support.
ActorId string `json:"actorid,omitempty"`
ActorType string `json:"actortype,omitempty"`
InCall int `json:"incall,omitempty"`
}
func NewBackendClientRoomRequest(roomid string, userid string, sessionid string) *BackendClientRequest {

View file

@ -453,12 +453,19 @@ func (m *CommonSessionInternalClientMessage) CheckValid() error {
return nil
}
type AddSessionOptions struct {
ActorId string `json:"actorId,omitempty"`
ActorType string `json:"actorType,omitempty"`
}
type AddSessionInternalClientMessage struct {
CommonSessionInternalClientMessage
UserId string `json:"userid,omitempty"`
User *json.RawMessage `json:"user,omitempty"`
Flags uint32 `json:"flags,omitempty"`
Options *AddSessionOptions `json:"options,omitempy"`
}
func (m *AddSessionInternalClientMessage) CheckValid() error {
@ -483,6 +490,8 @@ func (m *UpdateSessionInternalClientMessage) CheckValid() error {
type RemoveSessionInternalClientMessage struct {
CommonSessionInternalClientMessage
UserId string `json:"userid,omitempty"`
}
func (m *RemoveSessionInternalClientMessage) CheckValid() error {
@ -553,6 +562,12 @@ type RoomEventMessage struct {
Data *json.RawMessage `json:"data,omitempty"`
}
type RoomFlagsServerMessage struct {
RoomId string `json:"roomid"`
SessionId string `json:"sessionid"`
Flags uint32 `json:"flags"`
}
type EventServerMessage struct {
Target string `json:"target"`
Type string `json:"type"`
@ -566,6 +581,7 @@ type EventServerMessage struct {
Invite *RoomEventServerMessage `json:"invite,omitempty"`
Disinvite *RoomDisinviteEventServerMessage `json:"disinvite,omitempty"`
Update *RoomEventServerMessage `json:"update,omitempty"`
Flags *RoomFlagsServerMessage `json:"flags,omitempty"`
// Used for target "message"
Message *RoomEventMessage `json:"message,omitempty"`

View file

@ -1437,7 +1437,7 @@ func (h *Hub) processControlMsg(client *Client, message *ClientMessage) {
}
func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
msg := message.Control
msg := message.Internal
session := client.GetSession()
if session == nil {
// Client is not connected yet.
@ -1447,9 +1447,9 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
return
}
switch message.Internal.Type {
switch msg.Type {
case "addsession":
msg := message.Internal.AddSession
msg := msg.AddSession
room := h.getRoom(msg.RoomId)
if room == nil {
log.Printf("Ignore add session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId())
@ -1473,13 +1473,35 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
virtualSessionId := GetVirtualSessionId(session, msg.SessionId)
request := NewBackendClientSessionRequest(room.Id(), "add", publicSessionId, msg)
var response BackendClientSessionResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session."))
client.SendMessage(reply)
return
if msg.Options != nil {
request := NewBackendClientRoomRequest(room.Id(), msg.UserId, publicSessionId)
request.Room.ActorId = msg.Options.ActorId
request.Room.ActorType = msg.Options.ActorType
request.Room.InCall = FlagInCall | FlagWithPhone
var response BackendClientResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
log.Printf("Could not join virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session."))
client.SendMessage(reply)
return
}
if response.Type == "error" {
log.Printf("Could not join virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error)
reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error()))
client.SendMessage(reply)
return
}
} else {
request := NewBackendClientSessionRequest(room.Id(), "add", publicSessionId, msg)
var response BackendClientSessionResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
log.Printf("Could not add virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not add virtual session."))
client.SendMessage(reply)
return
}
}
sess := NewVirtualSession(session, privateSessionId, publicSessionId, sessionIdData, msg)
@ -1492,7 +1514,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
sess.SetRoom(room)
room.AddSession(sess, nil)
case "updatesession":
msg := message.Internal.UpdateSession
msg := msg.UpdateSession
room := h.getRoom(msg.RoomId)
if room == nil {
log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId())
@ -1525,7 +1547,7 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
}
}
case "removesession":
msg := message.Internal.RemoveSession
msg := msg.RemoveSession
room := h.getRoom(msg.RoomId)
if room == nil {
log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId())
@ -1551,18 +1573,46 @@ func (h *Hub) processInternalMsg(client *Client, message *ClientMessage) {
ctx, cancel := context.WithTimeout(context.Background(), h.backendTimeout)
defer cancel()
request := NewBackendClientSessionRequest(room.Id(), "remove", sess.PublicId(), nil)
var response BackendClientSessionResponse
err := h.backend.PerformJSONRequest(ctx, sess.ParsedBackendUrl(), request, &response)
if err != nil {
log.Printf("Could not remove virtual session %s from backend %s: %s", sess.PublicId(), sess.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
client.SendMessage(reply)
var options *AddSessionOptions
if vsess, ok := sess.(*VirtualSession); ok {
options = vsess.Options()
}
if options != nil {
request := NewBackendClientRoomRequest(room.Id(), sess.UserId(), sess.PublicId())
request.Room.Action = "leave"
if options != nil {
request.Room.ActorId = options.ActorId
request.Room.ActorType = options.ActorType
}
var response BackendClientResponse
if err := h.backend.PerformJSONRequest(ctx, session.ParsedBackendUrl(), request, &response); err != nil {
log.Printf("Could not leave virtual session %s at backend %s: %s", virtualSessionId, session.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("add_failed", "Could not join virtual session."))
client.SendMessage(reply)
return
}
if response.Type == "error" {
log.Printf("Could not leave virtual session %s at backend %s: %+v", virtualSessionId, session.BackendUrl(), response.Error)
reply := message.NewErrorServerMessage(NewError("add_failed", response.Error.Error()))
client.SendMessage(reply)
return
}
} else {
request := NewBackendClientSessionRequest(room.Id(), "remove", sess.PublicId(), nil)
var response BackendClientSessionResponse
err := h.backend.PerformJSONRequest(ctx, sess.ParsedBackendUrl(), request, &response)
if err != nil {
log.Printf("Could not remove virtual session %s from backend %s: %s", sess.PublicId(), sess.BackendUrl(), err)
reply := message.NewErrorServerMessage(NewError("remove_failed", "Could not remove virtual session from backend."))
client.SendMessage(reply)
}
}
}()
}
default:
log.Printf("Ignore unsupported internal message %+v from %s", message.Internal, session.PublicId())
log.Printf("Ignore unsupported internal message %+v from %s", msg, session.PublicId())
return
}
}

View file

@ -40,6 +40,7 @@ const (
FlagInCall = 1
FlagWithAudio = 2
FlagWithVideo = 4
FlagWithPhone = 8
)
var (
@ -228,6 +229,7 @@ func (r *Room) processBackendRoomRequest(message *BackendServerRoomRequest) {
case "update":
r.hub.roomUpdated <- message
case "delete":
r.notifyInternalRoomDeleted()
r.hub.roomDeleted <- message
case "incall":
r.hub.roomInCall <- message
@ -284,6 +286,9 @@ func (r *Room) AddSession(session Session, sessionData *json.RawMessage) []Sessi
r.PublishSessionJoined(session, roomSessionData)
if publishUsersChanged {
r.publishUsersChangedWithInternal()
if session, ok := session.(*VirtualSession); ok && session.Flags() != 0 {
r.publishSessionFlagsChanged(session)
}
}
}
return result
@ -421,7 +426,7 @@ func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string]
}
for session := range r.internalSessions {
users = append(users, map[string]interface{}{
"inCall": true,
"inCall": FlagInCall | FlagWithAudio,
"sessionId": session.PublicId(),
"lastPing": now,
"internal": true,
@ -429,11 +434,10 @@ func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string]
}
for session := range r.virtualSessions {
users = append(users, map[string]interface{}{
"inCall": true,
"inCall": FlagInCall | FlagWithPhone,
"sessionId": session.PublicId(),
"lastPing": now,
"virtual": true,
"flags": session.Flags(),
})
}
r.mu.Unlock()
@ -583,7 +587,12 @@ func (r *Room) NotifySessionChanged(session Session) {
return
}
r.publishUsersChangedWithInternal()
virtual, ok := session.(*VirtualSession)
if !ok {
return
}
r.publishSessionFlagsChanged(virtual)
}
func (r *Room) publishUsersChangedWithInternal() {
@ -591,6 +600,22 @@ func (r *Room) publishUsersChangedWithInternal() {
r.publish(message)
}
func (r *Room) publishSessionFlagsChanged(session *VirtualSession) {
message := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "participants",
Type: "flags",
Flags: &RoomFlagsServerMessage{
RoomId: r.id,
SessionId: session.PublicId(),
Flags: session.Flags(),
},
},
}
r.publish(message)
}
func (r *Room) publishActiveSessions() {
r.mu.Lock()
defer r.mu.Unlock()
@ -667,3 +692,16 @@ func (r *Room) publishRoomMessage(message *BackendRoomMessageRequest) {
}
r.publish(msg)
}
func (r *Room) notifyInternalRoomDeleted() {
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "room",
Type: "delete",
},
}
for s := range r.internalSessions {
s.(*ClientSession).SendMessage(msg)
}
}

View file

@ -640,6 +640,18 @@ func checkMessageParticipantsInCall(message *ServerMessage) (*RoomEventServerMes
return message.Event.Update, nil
}
func checkMessageParticipantFlags(message *ServerMessage) (*RoomFlagsServerMessage, error) {
if err := checkMessageType(message, "event"); err != nil {
return nil, err
} else if message.Event.Target != "participants" {
return nil, fmt.Errorf("Expected event target room, got %+v", message.Event)
} else if message.Event.Type != "flags" || message.Event.Flags == nil {
return nil, fmt.Errorf("Expected event type flags, got %+v", message.Event)
}
return message.Event.Flags, nil
}
func checkMessageRoomMessage(message *ServerMessage) (*RoomEventMessage, error) {
if err := checkMessageType(message, "event"); err != nil {
return nil, err

View file

@ -48,6 +48,7 @@ type VirtualSession struct {
userId string
userData *json.RawMessage
flags uint32
options *AddSessionOptions
}
func GetVirtualSessionId(session *ClientSession, sessionId string) string {
@ -65,6 +66,7 @@ func NewVirtualSession(session *ClientSession, privateId string, publicId string
userId: msg.UserId,
userData: msg.User,
flags: msg.Flags,
options: msg.Options,
}
}
@ -193,3 +195,7 @@ func (s *VirtualSession) SetFlags(flags uint32) bool {
func (s *VirtualSession) Flags() uint32 {
return atomic.LoadUint32(&s.flags)
}
func (s *VirtualSession) Options() *AddSessionOptions {
return s.options
}

View file

@ -144,10 +144,58 @@ func TestVirtualSession(t *testing.T) {
t.Errorf("Expected session id %s, got %+v", sessionId, updateMsg.Users[0])
} else if virtual, ok := updateMsg.Users[0]["virtual"].(bool); !ok || !virtual {
t.Errorf("Expected virtual user, got %+v", updateMsg.Users[0])
} else if inCall, ok := updateMsg.Users[0]["inCall"].(bool); !ok || !inCall {
} else if inCall, ok := updateMsg.Users[0]["inCall"].(float64); !ok || inCall == 0 {
t.Errorf("Expected user in call, got %+v", updateMsg.Users[0])
} else if flags, ok := updateMsg.Users[0]["flags"].(float64); !ok || flags != FLAG_MUTED_SPEAKING {
t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, updateMsg.Users[0])
}
msg3, err := client.RunUntilMessage(ctx)
if err != nil {
t.Fatal(err)
}
flagsMsg, err := checkMessageParticipantFlags(msg3)
if err != nil {
t.Error(err)
} else if flagsMsg.RoomId != roomId {
t.Errorf("Expected room %s, got %s", roomId, flagsMsg.RoomId)
} else if flagsMsg.SessionId != sessionId {
t.Errorf("Expected session id %s, got %s", sessionId, flagsMsg.SessionId)
} else if flagsMsg.Flags != FLAG_MUTED_SPEAKING {
t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, flagsMsg.Flags)
}
newFlags := uint32(FLAG_TALKING)
msgFlags := &ClientMessage{
Type: "internal",
Internal: &InternalClientMessage{
Type: "updatesession",
UpdateSession: &UpdateSessionInternalClientMessage{
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
SessionId: internalSessionId,
RoomId: roomId,
},
Flags: &newFlags,
},
},
}
if err := clientInternal.WriteJSON(msgFlags); err != nil {
t.Fatal(err)
}
msg4, err := client.RunUntilMessage(ctx)
if err != nil {
t.Fatal(err)
}
flagsMsg, err = checkMessageParticipantFlags(msg4)
if err != nil {
t.Error(err)
} else if flagsMsg.RoomId != roomId {
t.Errorf("Expected room %s, got %s", roomId, flagsMsg.RoomId)
} else if flagsMsg.SessionId != sessionId {
t.Errorf("Expected session id %s, got %s", sessionId, flagsMsg.SessionId)
} else if flagsMsg.Flags != newFlags {
t.Errorf("Expected flags %d, got %+v", newFlags, flagsMsg.Flags)
}
// When sending to a virtual session, the message is sent to the actual
@ -200,11 +248,11 @@ func TestVirtualSession(t *testing.T) {
t.Fatal(err)
}
msg3, err := client.RunUntilMessage(ctx)
msg5, err := client.RunUntilMessage(ctx)
if err != nil {
t.Fatal(err)
}
if err := client.checkMessageRoomLeaveSession(msg3, sessionId); err != nil {
if err := client.checkMessageRoomLeaveSession(msg5, sessionId); err != nil {
t.Error(err)
}
}
@ -324,10 +372,24 @@ func TestVirtualSessionCleanup(t *testing.T) {
t.Errorf("Expected session id %s, got %+v", sessionId, updateMsg.Users[0])
} else if virtual, ok := updateMsg.Users[0]["virtual"].(bool); !ok || !virtual {
t.Errorf("Expected virtual user, got %+v", updateMsg.Users[0])
} else if inCall, ok := updateMsg.Users[0]["inCall"].(bool); !ok || !inCall {
} else if inCall, ok := updateMsg.Users[0]["inCall"].(float64); !ok || inCall == 0 {
t.Errorf("Expected user in call, got %+v", updateMsg.Users[0])
} else if flags, ok := updateMsg.Users[0]["flags"].(float64); !ok || flags != FLAG_MUTED_SPEAKING {
t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, updateMsg.Users[0])
}
msg3, err := client.RunUntilMessage(ctx)
if err != nil {
t.Fatal(err)
}
flagsMsg, err := checkMessageParticipantFlags(msg3)
if err != nil {
t.Error(err)
} else if flagsMsg.RoomId != roomId {
t.Errorf("Expected room %s, got %s", roomId, flagsMsg.RoomId)
} else if flagsMsg.SessionId != sessionId {
t.Errorf("Expected session id %s, got %s", sessionId, flagsMsg.SessionId)
} else if flagsMsg.Flags != FLAG_MUTED_SPEAKING {
t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, flagsMsg.Flags)
}
// The virtual sessions are closed when the parent session is deleted.