diff --git a/api_backend.go b/api_backend.go index ad62bc9..67f7a15 100644 --- a/api_backend.go +++ b/api_backend.go @@ -102,6 +102,8 @@ type BackendServerRoomRequest struct { Message *BackendRoomMessageRequest `json:"message,omitempty"` + SwitchTo *BackendRoomSwitchToMessageRequest `json:"switchto,omitempty"` + // Internal properties ReceivedTime int64 `json:"received,omitempty"` } @@ -149,6 +151,25 @@ type BackendRoomMessageRequest struct { Data *json.RawMessage `json:"data,omitempty"` } +type BackendRoomSwitchToSessionsList []string +type BackendRoomSwitchToSessionsMap map[string]json.RawMessage + +type BackendRoomSwitchToMessageRequest struct { + // Target room id + RoomId string `json:"roomid"` + + // Sessions is either a BackendRoomSwitchToSessionsList or a + // BackendRoomSwitchToSessionsMap. + // In the map, the key is the session id, the value additional details + // (or null) for the session. The details will be included in the request + // to the connected client. + Sessions *json.RawMessage `json:"sessions,omitempty"` + + // Internal properties + SessionsList BackendRoomSwitchToSessionsList `json:"sessionslist,omitempty"` + SessionsMap BackendRoomSwitchToSessionsMap `json:"sessionsmap,omitempty"` +} + // Requests from the signaling server to the Nextcloud backend. type BackendClientAuthRequest struct { diff --git a/api_signaling.go b/api_signaling.go index a736476..8411fea 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -432,6 +432,7 @@ const ( ServerFeatureInCallAll = "incall-all" ServerFeatureWelcome = "welcome" ServerFeatureHelloV2 = "hello-v2" + ServerFeatureSwitchTo = "switchto" // Features for internal clients only. ServerFeatureInternalVirtualSessions = "virtual-sessions" @@ -444,6 +445,7 @@ var ( ServerFeatureInCallAll, ServerFeatureWelcome, ServerFeatureHelloV2, + ServerFeatureSwitchTo, } DefaultFeaturesInternal = []string{ ServerFeatureInternalVirtualSessions, @@ -451,6 +453,7 @@ var ( ServerFeatureInCallAll, ServerFeatureWelcome, ServerFeatureHelloV2, + ServerFeatureSwitchTo, } DefaultWelcomeFeatures = []string{ ServerFeatureAudioVideoPermissions, @@ -459,6 +462,7 @@ var ( ServerFeatureInCallAll, ServerFeatureWelcome, ServerFeatureHelloV2, + ServerFeatureSwitchTo, } ) @@ -746,9 +750,10 @@ type EventServerMessage struct { Type string `json:"type"` // Used for target "room" - Join []*EventServerMessageSessionEntry `json:"join,omitempty"` - Leave []string `json:"leave,omitempty"` - Change []*EventServerMessageSessionEntry `json:"change,omitempty"` + Join []*EventServerMessageSessionEntry `json:"join,omitempty"` + Leave []string `json:"leave,omitempty"` + Change []*EventServerMessageSessionEntry `json:"change,omitempty"` + SwitchTo *EventServerMessageSwitchTo `json:"switchto,omitempty"` // Used for target "roomlist" / "participants" Invite *RoomEventServerMessage `json:"invite,omitempty"` @@ -784,6 +789,11 @@ func (e *EventServerMessageSessionEntry) Clone() *EventServerMessageSessionEntry } } +type EventServerMessageSwitchTo struct { + RoomId string `json:"roomid"` + Details json.RawMessage `json:"details,omitempty"` +} + // MCU-related types type AnswerOfferMessage struct { diff --git a/backend_server.go b/backend_server.go index d5072e3..fb0cfb0 100644 --- a/backend_server.go +++ b/backend_server.go @@ -546,6 +546,104 @@ func (b *BackendServer) sendRoomMessage(roomid string, backend *Backend, request return b.events.PublishBackendRoomMessage(roomid, backend, message) } +func (b *BackendServer) sendRoomSwitchTo(roomid string, backend *Backend, request *BackendServerRoomRequest) error { + timeout := time.Second + + // Convert (Nextcloud) session ids to signaling session ids. + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var wg sync.WaitGroup + var mu sync.Mutex + if request.SwitchTo.Sessions != nil { + // We support both a list of sessions or a map with additional details per session. + if (*request.SwitchTo.Sessions)[0] == '[' { + var sessionsList BackendRoomSwitchToSessionsList + if err := json.Unmarshal(*request.SwitchTo.Sessions, &sessionsList); err != nil { + return err + } + + if len(sessionsList) == 0 { + return nil + } + + var internalSessionsList BackendRoomSwitchToSessionsList + for _, roomSessionId := range sessionsList { + if roomSessionId == sessionIdNotInMeeting { + continue + } + + wg.Add(1) + go func(roomSessionId string) { + defer wg.Done() + if sessionId, err := b.lookupByRoomSessionId(ctx, roomSessionId, nil); err != nil { + log.Printf("Could not lookup by room session %s: %s", roomSessionId, err) + } else if sessionId != "" { + mu.Lock() + defer mu.Unlock() + internalSessionsList = append(internalSessionsList, sessionId) + } + }(roomSessionId) + } + wg.Wait() + + mu.Lock() + defer mu.Unlock() + if len(internalSessionsList) == 0 { + return nil + } + + request.SwitchTo.SessionsList = internalSessionsList + request.SwitchTo.SessionsMap = nil + } else { + var sessionsMap BackendRoomSwitchToSessionsMap + if err := json.Unmarshal(*request.SwitchTo.Sessions, &sessionsMap); err != nil { + return err + } + + if len(sessionsMap) == 0 { + return nil + } + + internalSessionsMap := make(BackendRoomSwitchToSessionsMap) + for roomSessionId, details := range sessionsMap { + if roomSessionId == sessionIdNotInMeeting { + continue + } + + wg.Add(1) + go func(roomSessionId string, details json.RawMessage) { + defer wg.Done() + if sessionId, err := b.lookupByRoomSessionId(ctx, roomSessionId, nil); err != nil { + log.Printf("Could not lookup by room session %s: %s", roomSessionId, err) + } else if sessionId != "" { + mu.Lock() + defer mu.Unlock() + internalSessionsMap[sessionId] = details + } + }(roomSessionId, details) + } + wg.Wait() + + mu.Lock() + defer mu.Unlock() + if len(internalSessionsMap) == 0 { + return nil + } + + request.SwitchTo.SessionsList = nil + request.SwitchTo.SessionsMap = internalSessionsMap + } + } + request.SwitchTo.Sessions = nil + + message := &AsyncMessage{ + Type: "room", + Room: request, + } + return b.events.PublishBackendRoomMessage(roomid, backend, message) +} + func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) { v := mux.Vars(r) roomid := v["roomid"] @@ -627,6 +725,8 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body err = b.sendRoomParticipantsUpdate(roomid, backend, &request) case "message": err = b.sendRoomMessage(roomid, backend, &request) + case "switchto": + err = b.sendRoomSwitchTo(roomid, backend, &request) default: http.Error(w, "Unsupported request type: "+request.Type, http.StatusBadRequest) return diff --git a/hub_test.go b/hub_test.go index 9b53939..5ae07f6 100644 --- a/hub_test.go +++ b/hub_test.go @@ -4985,3 +4985,286 @@ func TestVirtualClientSessions(t *testing.T) { }) } } + +func DoTestSwitchToOne(t *testing.T, details map[string]interface{}) { + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) + + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + roomSessionId1 := "roomsession1" + roomId1 := "test-room" + if room, err := client1.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId1); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId1 { + t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId) + } + + roomSessionId2 := "roomsession2" + if room, err := client2.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId2); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId1 { + t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId) + } + + if err := client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { + t.Error(err) + } + if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { + t.Error(err) + } + + roomId2 := "test-room-2" + var sessions json.RawMessage + if details != nil { + if sessions, err = json.Marshal(map[string]interface{}{ + roomSessionId1: details, + }); err != nil { + t.Fatal(err) + } + } else { + if sessions, err = json.Marshal([]string{ + roomSessionId1, + }); err != nil { + t.Fatal(err) + } + } + + // Notify first client to switch to different room. + msg := &BackendServerRoomRequest{ + Type: "switchto", + SwitchTo: &BackendRoomSwitchToMessageRequest{ + RoomId: roomId2, + Sessions: &sessions, + }, + } + + data, err := json.Marshal(msg) + if err != nil { + t.Fatal(err) + } + res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId1, data) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + t.Error(err) + } + if res.StatusCode != 200 { + t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) + } + + var detailsData json.RawMessage + if details != nil { + if detailsData, err = json.Marshal(details); err != nil { + t.Fatal(err) + } + } + if _, err := client1.RunUntilSwitchTo(ctx, roomId2, detailsData); err != nil { + t.Error(err) + } + + // The other client will not receive a message. + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() + + if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded { + t.Error(err) + } else if message != nil { + t.Errorf("Expected no message, got %+v", message) + } + }) + } +} + +func TestSwitchToOneMap(t *testing.T) { + DoTestSwitchToOne(t, map[string]interface{}{ + "foo": "bar", + }) +} + +func TestSwitchToOneList(t *testing.T) { + DoTestSwitchToOne(t, nil) +} + +func DoTestSwitchToMultiple(t *testing.T, details1 map[string]interface{}, details2 map[string]interface{}) { + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) + + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + roomSessionId1 := "roomsession1" + roomId1 := "test-room" + if room, err := client1.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId1); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId1 { + t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId) + } + + roomSessionId2 := "roomsession2" + if room, err := client2.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId2); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId1 { + t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId) + } + + if err := client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { + t.Error(err) + } + if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { + t.Error(err) + } + + roomId2 := "test-room-2" + var sessions json.RawMessage + if details1 != nil || details2 != nil { + if sessions, err = json.Marshal(map[string]interface{}{ + roomSessionId1: details1, + roomSessionId2: details2, + }); err != nil { + t.Fatal(err) + } + } else { + if sessions, err = json.Marshal([]string{ + roomSessionId1, + roomSessionId2, + }); err != nil { + t.Fatal(err) + } + } + + msg := &BackendServerRoomRequest{ + Type: "switchto", + SwitchTo: &BackendRoomSwitchToMessageRequest{ + RoomId: roomId2, + Sessions: &sessions, + }, + } + + data, err := json.Marshal(msg) + if err != nil { + t.Fatal(err) + } + res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId1, data) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + t.Error(err) + } + if res.StatusCode != 200 { + t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) + } + + var detailsData1 json.RawMessage + if details1 != nil { + if detailsData1, err = json.Marshal(details1); err != nil { + t.Fatal(err) + } + } + if _, err := client1.RunUntilSwitchTo(ctx, roomId2, detailsData1); err != nil { + t.Error(err) + } + + var detailsData2 json.RawMessage + if details2 != nil { + if detailsData2, err = json.Marshal(details2); err != nil { + t.Fatal(err) + } + } + if _, err := client2.RunUntilSwitchTo(ctx, roomId2, detailsData2); err != nil { + t.Error(err) + } + }) + } +} + +func TestSwitchToMultipleMap(t *testing.T) { + DoTestSwitchToMultiple(t, map[string]interface{}{ + "foo": "bar", + }, map[string]interface{}{ + "bar": "baz", + }) +} + +func TestSwitchToMultipleList(t *testing.T) { + DoTestSwitchToMultiple(t, nil, nil) +} + +func TestSwitchToMultipleMixed(t *testing.T) { + DoTestSwitchToMultiple(t, map[string]interface{}{ + "foo": "bar", + }, nil) +} diff --git a/room.go b/room.go index 80429a2..94db3f3 100644 --- a/room.go +++ b/room.go @@ -238,6 +238,8 @@ func (r *Room) processBackendRoomRequestRoom(message *BackendServerRoomRequest) r.hub.roomParticipants <- message case "message": r.publishRoomMessage(message.Message) + case "switchto": + r.publishSwitchTo(message.SwitchTo) default: log.Printf("Unsupported backend room request with type %s in %s: %+v", message.Type, r.Id(), message) } @@ -941,6 +943,65 @@ func (r *Room) publishRoomMessage(message *BackendRoomMessageRequest) { } } +func (r *Room) publishSwitchTo(message *BackendRoomSwitchToMessageRequest) { + var wg sync.WaitGroup + if len(message.SessionsList) > 0 { + msg := &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "room", + Type: "switchto", + SwitchTo: &EventServerMessageSwitchTo{ + RoomId: message.RoomId, + }, + }, + } + + for _, sessionId := range message.SessionsList { + wg.Add(1) + go func(sessionId string) { + defer wg.Done() + + if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{ + Type: "message", + Message: msg, + }); err != nil { + log.Printf("Error publishing switchto event to session %s: %s", sessionId, err) + } + }(sessionId) + } + } + + if len(message.SessionsMap) > 0 { + for sessionId, details := range message.SessionsMap { + wg.Add(1) + go func(sessionId string, details json.RawMessage) { + defer wg.Done() + + msg := &ServerMessage{ + Type: "event", + Event: &EventServerMessage{ + Target: "room", + Type: "switchto", + SwitchTo: &EventServerMessageSwitchTo{ + RoomId: message.RoomId, + Details: details, + }, + }, + } + + if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{ + Type: "message", + Message: msg, + }); err != nil { + log.Printf("Error publishing switchto event to session %s: %s", sessionId, err) + } + }(sessionId, details) + } + } + wg.Wait() +} + func (r *Room) notifyInternalRoomDeleted() { msg := &ServerMessage{ Type: "event", diff --git a/testclient_test.go b/testclient_test.go index ad14172..6cb90c1 100644 --- a/testclient_test.go +++ b/testclient_test.go @@ -1055,3 +1055,32 @@ func checkMessageInCallAll(message *ServerMessage, roomId string, inCall int) er } return nil } + +func checkMessageSwitchTo(message *ServerMessage, roomId string, details json.RawMessage) (*EventServerMessageSwitchTo, error) { + if err := checkMessageType(message, "event"); err != nil { + return nil, err + } else if message.Event.Type != "switchto" { + return nil, fmt.Errorf("Expected switchto event, got %+v", message.Event) + } else if message.Event.Target != "room" { + return nil, fmt.Errorf("Expected room switchto event, got %+v", message.Event) + } else if message.Event.SwitchTo.RoomId != roomId { + return nil, fmt.Errorf("Expected room switchto event for room %s, got %+v", roomId, message.Event) + } + if details != nil { + if message.Event.SwitchTo.Details == nil || !bytes.Equal(details, message.Event.SwitchTo.Details) { + return nil, fmt.Errorf("Expected details %s, got %+v", string(details), message.Event) + } + } else if message.Event.SwitchTo.Details != nil { + return nil, fmt.Errorf("Expected no details, got %+v", message.Event) + } + return message.Event.SwitchTo, nil +} + +func (c *TestClient) RunUntilSwitchTo(ctx context.Context, roomId string, details json.RawMessage) (*EventServerMessageSwitchTo, error) { + message, err := c.RunUntilMessage(ctx) + if err != nil { + return nil, err + } + + return checkMessageSwitchTo(message, roomId, details) +}