From 97f2a1d5f04d916b25038267ed22d161c0a3e205 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 2 Mar 2022 13:56:59 +0100 Subject: [PATCH] Add special events to update "incall" flags of all sessions. --- api_backend.go | 1 + api_signaling.go | 5 + backend_server.go | 18 ++-- docs/standalone-signaling-api-v1.md | 40 ++++++++ hub.go | 19 +++- room.go | 73 ++++++++++++++ room_test.go | 147 ++++++++++++++++++++++++++++ testclient_test.go | 19 ++++ 8 files changed, 313 insertions(+), 9 deletions(-) diff --git a/api_backend.go b/api_backend.go index 0a28776..021d343 100644 --- a/api_backend.go +++ b/api_backend.go @@ -127,6 +127,7 @@ type BackendRoomDeleteRequest struct { type BackendRoomInCallRequest struct { // TODO(jojo): Change "InCall" to "int" when #914 has landed in NC Talk. InCall json.RawMessage `json:"incall,omitempty"` + All bool `json:"all,omitempty"` Changed []map[string]interface{} `json:"changed,omitempty"` Users []map[string]interface{} `json:"users,omitempty"` } diff --git a/api_signaling.go b/api_signaling.go index 5976c33..8f7e959 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -338,6 +338,7 @@ const ( ServerFeatureUpdateSdp = "update-sdp" ServerFeatureAudioVideoPermissions = "audio-video-permissions" ServerFeatureTransientData = "transient-data" + ServerFeatureInCallAll = "incall-all" // Features for internal clients only. ServerFeatureInternalVirtualSessions = "virtual-sessions" @@ -347,10 +348,12 @@ var ( DefaultFeatures = []string{ ServerFeatureAudioVideoPermissions, ServerFeatureTransientData, + ServerFeatureInCallAll, } DefaultFeaturesInternal = []string{ ServerFeatureInternalVirtualSessions, ServerFeatureTransientData, + ServerFeatureInCallAll, } ) @@ -591,6 +594,8 @@ type RoomEventServerMessage struct { InCall *json.RawMessage `json:"incall,omitempty"` Changed []map[string]interface{} `json:"changed,omitempty"` Users []map[string]interface{} `json:"users,omitempty"` + + All bool `json:"all,omitempty"` } const ( diff --git a/backend_server.go b/backend_server.go index be6ee32..0efa5ae 100644 --- a/backend_server.go +++ b/backend_server.go @@ -445,16 +445,18 @@ func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, user } func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request *BackendServerRoomRequest) error { - timeout := time.Second + if !request.InCall.All { + timeout := time.Second - var cache ConcurrentStringStringMap - // Convert (Nextcloud) session ids to signaling session ids. - request.InCall.Users = b.fixupUserSessions(&cache, request.InCall.Users, timeout) - // Entries in "Changed" are most likely already fetched through the "Users" list. - request.InCall.Changed = b.fixupUserSessions(&cache, request.InCall.Changed, timeout) + var cache ConcurrentStringStringMap + // Convert (Nextcloud) session ids to signaling session ids. + request.InCall.Users = b.fixupUserSessions(&cache, request.InCall.Users, timeout) + // Entries in "Changed" are most likely already fetched through the "Users" list. + request.InCall.Changed = b.fixupUserSessions(&cache, request.InCall.Changed, timeout) - if len(request.InCall.Users) == 0 && len(request.InCall.Changed) == 0 { - return nil + if len(request.InCall.Users) == 0 && len(request.InCall.Changed) == 0 { + return nil + } } return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request) diff --git a/docs/standalone-signaling-api-v1.md b/docs/standalone-signaling-api-v1.md index de4cccc..2d86569 100644 --- a/docs/standalone-signaling-api-v1.md +++ b/docs/standalone-signaling-api-v1.md @@ -555,6 +555,29 @@ for both the signaling session id (`sessionId`) and the Nextcloud session id (`nextcloudSessionId`). +### All participants "incall" changed events + +When the `inCall` flag of all participants is changed from the backend (see +[backend request](#in-call-state-of-all-participants-changed) below), +a dedicated event is sent that doesn't include information on all participants, +but an `all` flag. + +Message format (Server -> Client, incall change): + + { + "type": "event" + "event": { + "target": "participants", + "type": "update", + "update": [ + "roomid": "the-room-id", + "incall": new-incall-state, + "all": true + ] + } + } + + ## Room messages The server can notify clients about events that happened in a room. Currently @@ -884,6 +907,23 @@ Message format (Backend -> Server) } +### In call state of all participants changed + +This can be used to notify when all participants changed their `inCall` flag +to the same new value (available if the server returns the `incall-all` feature +id in the [hello response](#establish-connection)). + +Message format (Backend -> Server) + + { + "type": "incall" + "incall" { + "incall": new-incall-state, + "all": true + } + } + + ### Send an arbitrary room message This can be used to send arbitrary messages to participants in a room. It is diff --git a/hub.go b/hub.go index b1e4acc..706fb85 100644 --- a/hub.go +++ b/hub.go @@ -1936,7 +1936,24 @@ func (h *Hub) processRoomDeleted(message *BackendServerRoomRequest) { func (h *Hub) processRoomInCallChanged(message *BackendServerRoomRequest) { room := message.room - room.PublishUsersInCallChanged(message.InCall.Changed, message.InCall.Users) + if message.InCall.All { + var flags int + if err := json.Unmarshal(message.InCall.InCall, &flags); err != nil { + var incall bool + if err := json.Unmarshal(message.InCall.InCall, &incall); err != nil { + log.Printf("Unsupported InCall flags type: %+v, ignoring", string(message.InCall.InCall)) + return + } + + if incall { + flags = FlagInCall + } + } + + room.PublishUsersInCallChangedAll(flags) + } else { + room.PublishUsersInCallChanged(message.InCall.Changed, message.InCall.Users) + } } func (h *Hub) processRoomParticipants(message *BackendServerRoomRequest) { diff --git a/room.go b/room.go index 57ff6e6..db00b14 100644 --- a/room.go +++ b/room.go @@ -28,6 +28,7 @@ import ( "fmt" "log" "net/url" + "strconv" "sync" "time" @@ -615,6 +616,78 @@ func (r *Room) PublishUsersInCallChanged(changed []map[string]interface{}, users } } +func (r *Room) PublishUsersInCallChangedAll(inCall int) { + r.mu.Lock() + defer r.mu.Unlock() + + if inCall&FlagInCall != 0 { + // All connected sessions join the call. + var joined []string + for _, session := range r.sessions { + if _, ok := session.(*ClientSession); !ok { + continue + } + + if session.ClientType() == HelloClientTypeInternal { + continue + } + + if !r.inCallSessions[session] { + r.inCallSessions[session] = true + joined = append(joined, session.PublicId()) + } + } + + if len(joined) == 0 { + return + } + + log.Printf("Sessions %v joined call %s", joined, r.id) + } else if len(r.inCallSessions) > 0 { + // Perform actual leaving asynchronously. + ch := make(chan *ClientSession, 1) + go func() { + for { + session := <-ch + if session == nil { + break + } + + session.LeaveCall() + } + }() + + for session := range r.inCallSessions { + if clientSession, ok := session.(*ClientSession); ok { + ch <- clientSession + } + } + close(ch) + r.inCallSessions = make(map[Session]bool) + } else { + // All sessions already left the call, no need to notify. + return + } + + inCallMsg := json.RawMessage(strconv.FormatInt(int64(inCall), 10)) + + message := &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "participants", + Type: "update", + Update: &RoomEventServerMessage{ + RoomId: r.id, + InCall: &inCallMsg, + All: true, + }, + }, + } + if err := r.publish(message); err != nil { + log.Printf("Could not publish incall message in room %s: %s", r.Id(), err) + } +} + func (r *Room) PublishUsersChanged(changed []map[string]interface{}, users []map[string]interface{}) { changed = r.filterPermissions(changed) users = r.filterPermissions(users) diff --git a/room_test.go b/room_test.go index 0070f0c..a4a105c 100644 --- a/room_test.go +++ b/room_test.go @@ -27,6 +27,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "strconv" "testing" "time" @@ -421,3 +422,149 @@ func TestRoom_RoomSessionData(t *testing.T) { } wg.Wait() } + +func TestRoom_InCallAll(t *testing.T) { + hub, _, router, server, shutdown := CreateHubForTest(t) + defer shutdown() + + config, err := getTestConfig(server) + if err != nil { + t.Fatal(err) + } + b, err := NewBackendServer(config, hub, "no-version") + if err != nil { + t.Fatal(err) + } + if err := b.Start(router); err != nil { + t.Fatal(err) + } + + client1 := NewTestClient(t, server, hub) + defer client1.CloseWithBye() + + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + client2 := NewTestClient(t, server, hub) + defer client2.CloseWithBye() + + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + // Join room by id. + roomId := "test-room" + if room, err := client1.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil { + t.Error(err) + } + + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { + t.Error(err) + } + + if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil { + t.Error(err) + } + + // Simulate backend request from Nextcloud to update the "inCall" flag of all participants. + msg1 := &BackendServerRoomRequest{ + Type: "incall", + InCall: &BackendRoomInCallRequest{ + All: true, + InCall: json.RawMessage(strconv.FormatInt(FlagInCall, 10)), + }, + } + + data1, err := json.Marshal(msg1) + if err != nil { + t.Fatal(err) + } + res1, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data1) + if err != nil { + t.Fatal(err) + } + defer res1.Body.Close() + body1, err := ioutil.ReadAll(res1.Body) + if err != nil { + t.Error(err) + } + if res1.StatusCode != 200 { + t.Errorf("Expected successful request, got %s: %s", res1.Status, string(body1)) + } + + if msg, err := client1.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else if err := checkMessageInCallAll(msg, roomId, FlagInCall); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else if err := checkMessageInCallAll(msg, roomId, FlagInCall); err != nil { + t.Fatal(err) + } + + // Simulate backend request from Nextcloud to update the "inCall" flag of all participants. + msg2 := &BackendServerRoomRequest{ + Type: "incall", + InCall: &BackendRoomInCallRequest{ + All: true, + InCall: json.RawMessage(strconv.FormatInt(0, 10)), + }, + } + + data2, err := json.Marshal(msg2) + if err != nil { + t.Fatal(err) + } + res2, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data2) + if err != nil { + t.Fatal(err) + } + defer res2.Body.Close() + body2, err := ioutil.ReadAll(res2.Body) + if err != nil { + t.Error(err) + } + if res2.StatusCode != 200 { + t.Errorf("Expected successful request, got %s: %s", res2.Status, string(body2)) + } + + if msg, err := client1.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else if err := checkMessageInCallAll(msg, roomId, 0); err != nil { + t.Fatal(err) + } + + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else if err := checkMessageInCallAll(msg, roomId, 0); err != nil { + t.Fatal(err) + } +} diff --git a/testclient_test.go b/testclient_test.go index ff19fd6..cd54ce5 100644 --- a/testclient_test.go +++ b/testclient_test.go @@ -22,6 +22,7 @@ package signaling import ( + "bytes" "context" "crypto/hmac" "crypto/sha256" @@ -31,6 +32,7 @@ import ( "net" "net/http/httptest" "reflect" + "strconv" "strings" "testing" "time" @@ -841,3 +843,20 @@ func checkMessageTransientInitial(message *ServerMessage, data map[string]interf return nil } + +func checkMessageInCallAll(message *ServerMessage, roomId string, inCall int) error { + if err := checkMessageType(message, "event"); err != nil { + return err + } else if message.Event.Type != "update" { + return fmt.Errorf("Expected update event, got %+v", message.Event) + } else if message.Event.Target != "participants" { + return fmt.Errorf("Expected participants update event, got %+v", message.Event) + } else if message.Event.Update.RoomId != roomId { + return fmt.Errorf("Expected participants update event for room %s, got %+v", roomId, message.Event.Update) + } else if !message.Event.Update.All { + return fmt.Errorf("Expected participants update event for all, got %+v", message.Event.Update) + } else if !bytes.Equal(*message.Event.Update.InCall, []byte(strconv.FormatInt(int64(inCall), 10))) { + return fmt.Errorf("Expected incall flags %d, got %+v", inCall, message.Event.Update) + } + return nil +}