Implement switchto messages.

This commit is contained in:
Joachim Bauch 2023-01-30 10:08:35 +01:00
parent 69dfb0686f
commit bb24bf5f0d
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
6 changed files with 507 additions and 3 deletions

View File

@ -102,6 +102,8 @@ type BackendServerRoomRequest struct {
Message *BackendRoomMessageRequest `json:"message,omitempty"`
SwitchTo *BackendRoomSwitchToMessageRequest `json:"switchto,omitempty"`
// Internal properties
ReceivedTime int64 `json:"received,omitempty"`
}
@ -149,6 +151,25 @@ type BackendRoomMessageRequest struct {
Data *json.RawMessage `json:"data,omitempty"`
}
type BackendRoomSwitchToSessionsList []string
type BackendRoomSwitchToSessionsMap map[string]json.RawMessage
type BackendRoomSwitchToMessageRequest struct {
// Target room id
RoomId string `json:"roomid"`
// Sessions is either a BackendRoomSwitchToSessionsList or a
// BackendRoomSwitchToSessionsMap.
// In the map, the key is the session id, the value additional details
// (or null) for the session. The details will be included in the request
// to the connected client.
Sessions *json.RawMessage `json:"sessions,omitempty"`
// Internal properties
SessionsList BackendRoomSwitchToSessionsList `json:"sessionslist,omitempty"`
SessionsMap BackendRoomSwitchToSessionsMap `json:"sessionsmap,omitempty"`
}
// Requests from the signaling server to the Nextcloud backend.
type BackendClientAuthRequest struct {

View File

@ -432,6 +432,7 @@ const (
ServerFeatureInCallAll = "incall-all"
ServerFeatureWelcome = "welcome"
ServerFeatureHelloV2 = "hello-v2"
ServerFeatureSwitchTo = "switchto"
// Features for internal clients only.
ServerFeatureInternalVirtualSessions = "virtual-sessions"
@ -444,6 +445,7 @@ var (
ServerFeatureInCallAll,
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
}
DefaultFeaturesInternal = []string{
ServerFeatureInternalVirtualSessions,
@ -451,6 +453,7 @@ var (
ServerFeatureInCallAll,
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
}
DefaultWelcomeFeatures = []string{
ServerFeatureAudioVideoPermissions,
@ -459,6 +462,7 @@ var (
ServerFeatureInCallAll,
ServerFeatureWelcome,
ServerFeatureHelloV2,
ServerFeatureSwitchTo,
}
)
@ -746,9 +750,10 @@ type EventServerMessage struct {
Type string `json:"type"`
// Used for target "room"
Join []*EventServerMessageSessionEntry `json:"join,omitempty"`
Leave []string `json:"leave,omitempty"`
Change []*EventServerMessageSessionEntry `json:"change,omitempty"`
Join []*EventServerMessageSessionEntry `json:"join,omitempty"`
Leave []string `json:"leave,omitempty"`
Change []*EventServerMessageSessionEntry `json:"change,omitempty"`
SwitchTo *EventServerMessageSwitchTo `json:"switchto,omitempty"`
// Used for target "roomlist" / "participants"
Invite *RoomEventServerMessage `json:"invite,omitempty"`
@ -784,6 +789,11 @@ func (e *EventServerMessageSessionEntry) Clone() *EventServerMessageSessionEntry
}
}
type EventServerMessageSwitchTo struct {
RoomId string `json:"roomid"`
Details json.RawMessage `json:"details,omitempty"`
}
// MCU-related types
type AnswerOfferMessage struct {

View File

@ -546,6 +546,104 @@ func (b *BackendServer) sendRoomMessage(roomid string, backend *Backend, request
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
func (b *BackendServer) sendRoomSwitchTo(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
timeout := time.Second
// Convert (Nextcloud) session ids to signaling session ids.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var wg sync.WaitGroup
var mu sync.Mutex
if request.SwitchTo.Sessions != nil {
// We support both a list of sessions or a map with additional details per session.
if (*request.SwitchTo.Sessions)[0] == '[' {
var sessionsList BackendRoomSwitchToSessionsList
if err := json.Unmarshal(*request.SwitchTo.Sessions, &sessionsList); err != nil {
return err
}
if len(sessionsList) == 0 {
return nil
}
var internalSessionsList BackendRoomSwitchToSessionsList
for _, roomSessionId := range sessionsList {
if roomSessionId == sessionIdNotInMeeting {
continue
}
wg.Add(1)
go func(roomSessionId string) {
defer wg.Done()
if sessionId, err := b.lookupByRoomSessionId(ctx, roomSessionId, nil); err != nil {
log.Printf("Could not lookup by room session %s: %s", roomSessionId, err)
} else if sessionId != "" {
mu.Lock()
defer mu.Unlock()
internalSessionsList = append(internalSessionsList, sessionId)
}
}(roomSessionId)
}
wg.Wait()
mu.Lock()
defer mu.Unlock()
if len(internalSessionsList) == 0 {
return nil
}
request.SwitchTo.SessionsList = internalSessionsList
request.SwitchTo.SessionsMap = nil
} else {
var sessionsMap BackendRoomSwitchToSessionsMap
if err := json.Unmarshal(*request.SwitchTo.Sessions, &sessionsMap); err != nil {
return err
}
if len(sessionsMap) == 0 {
return nil
}
internalSessionsMap := make(BackendRoomSwitchToSessionsMap)
for roomSessionId, details := range sessionsMap {
if roomSessionId == sessionIdNotInMeeting {
continue
}
wg.Add(1)
go func(roomSessionId string, details json.RawMessage) {
defer wg.Done()
if sessionId, err := b.lookupByRoomSessionId(ctx, roomSessionId, nil); err != nil {
log.Printf("Could not lookup by room session %s: %s", roomSessionId, err)
} else if sessionId != "" {
mu.Lock()
defer mu.Unlock()
internalSessionsMap[sessionId] = details
}
}(roomSessionId, details)
}
wg.Wait()
mu.Lock()
defer mu.Unlock()
if len(internalSessionsMap) == 0 {
return nil
}
request.SwitchTo.SessionsList = nil
request.SwitchTo.SessionsMap = internalSessionsMap
}
}
request.SwitchTo.Sessions = nil
message := &AsyncMessage{
Type: "room",
Room: request,
}
return b.events.PublishBackendRoomMessage(roomid, backend, message)
}
func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body []byte) {
v := mux.Vars(r)
roomid := v["roomid"]
@ -627,6 +725,8 @@ func (b *BackendServer) roomHandler(w http.ResponseWriter, r *http.Request, body
err = b.sendRoomParticipantsUpdate(roomid, backend, &request)
case "message":
err = b.sendRoomMessage(roomid, backend, &request)
case "switchto":
err = b.sendRoomSwitchTo(roomid, backend, &request)
default:
http.Error(w, "Unsupported request type: "+request.Type, http.StatusBadRequest)
return

View File

@ -4985,3 +4985,286 @@ func TestVirtualClientSessions(t *testing.T) {
})
}
}
func DoTestSwitchToOne(t *testing.T, details map[string]interface{}) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomSessionId1 := "roomsession1"
roomId1 := "test-room"
if room, err := client1.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId1); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId1 {
t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId)
}
roomSessionId2 := "roomsession2"
if room, err := client2.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId2); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId1 {
t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
roomId2 := "test-room-2"
var sessions json.RawMessage
if details != nil {
if sessions, err = json.Marshal(map[string]interface{}{
roomSessionId1: details,
}); err != nil {
t.Fatal(err)
}
} else {
if sessions, err = json.Marshal([]string{
roomSessionId1,
}); err != nil {
t.Fatal(err)
}
}
// Notify first client to switch to different room.
msg := &BackendServerRoomRequest{
Type: "switchto",
SwitchTo: &BackendRoomSwitchToMessageRequest{
RoomId: roomId2,
Sessions: &sessions,
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId1, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
var detailsData json.RawMessage
if details != nil {
if detailsData, err = json.Marshal(details); err != nil {
t.Fatal(err)
}
}
if _, err := client1.RunUntilSwitchTo(ctx, roomId2, detailsData); err != nil {
t.Error(err)
}
// The other client will not receive a message.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
})
}
}
func TestSwitchToOneMap(t *testing.T) {
DoTestSwitchToOne(t, map[string]interface{}{
"foo": "bar",
})
}
func TestSwitchToOneList(t *testing.T) {
DoTestSwitchToOne(t, nil)
}
func DoTestSwitchToMultiple(t *testing.T, details1 map[string]interface{}, details2 map[string]interface{}) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomSessionId1 := "roomsession1"
roomId1 := "test-room"
if room, err := client1.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId1); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId1 {
t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId)
}
roomSessionId2 := "roomsession2"
if room, err := client2.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId2); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId1 {
t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
roomId2 := "test-room-2"
var sessions json.RawMessage
if details1 != nil || details2 != nil {
if sessions, err = json.Marshal(map[string]interface{}{
roomSessionId1: details1,
roomSessionId2: details2,
}); err != nil {
t.Fatal(err)
}
} else {
if sessions, err = json.Marshal([]string{
roomSessionId1,
roomSessionId2,
}); err != nil {
t.Fatal(err)
}
}
msg := &BackendServerRoomRequest{
Type: "switchto",
SwitchTo: &BackendRoomSwitchToMessageRequest{
RoomId: roomId2,
Sessions: &sessions,
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId1, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
var detailsData1 json.RawMessage
if details1 != nil {
if detailsData1, err = json.Marshal(details1); err != nil {
t.Fatal(err)
}
}
if _, err := client1.RunUntilSwitchTo(ctx, roomId2, detailsData1); err != nil {
t.Error(err)
}
var detailsData2 json.RawMessage
if details2 != nil {
if detailsData2, err = json.Marshal(details2); err != nil {
t.Fatal(err)
}
}
if _, err := client2.RunUntilSwitchTo(ctx, roomId2, detailsData2); err != nil {
t.Error(err)
}
})
}
}
func TestSwitchToMultipleMap(t *testing.T) {
DoTestSwitchToMultiple(t, map[string]interface{}{
"foo": "bar",
}, map[string]interface{}{
"bar": "baz",
})
}
func TestSwitchToMultipleList(t *testing.T) {
DoTestSwitchToMultiple(t, nil, nil)
}
func TestSwitchToMultipleMixed(t *testing.T) {
DoTestSwitchToMultiple(t, map[string]interface{}{
"foo": "bar",
}, nil)
}

61
room.go
View File

@ -238,6 +238,8 @@ func (r *Room) processBackendRoomRequestRoom(message *BackendServerRoomRequest)
r.hub.roomParticipants <- message
case "message":
r.publishRoomMessage(message.Message)
case "switchto":
r.publishSwitchTo(message.SwitchTo)
default:
log.Printf("Unsupported backend room request with type %s in %s: %+v", message.Type, r.Id(), message)
}
@ -941,6 +943,65 @@ func (r *Room) publishRoomMessage(message *BackendRoomMessageRequest) {
}
}
func (r *Room) publishSwitchTo(message *BackendRoomSwitchToMessageRequest) {
var wg sync.WaitGroup
if len(message.SessionsList) > 0 {
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "room",
Type: "switchto",
SwitchTo: &EventServerMessageSwitchTo{
RoomId: message.RoomId,
},
},
}
for _, sessionId := range message.SessionsList {
wg.Add(1)
go func(sessionId string) {
defer wg.Done()
if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{
Type: "message",
Message: msg,
}); err != nil {
log.Printf("Error publishing switchto event to session %s: %s", sessionId, err)
}
}(sessionId)
}
}
if len(message.SessionsMap) > 0 {
for sessionId, details := range message.SessionsMap {
wg.Add(1)
go func(sessionId string, details json.RawMessage) {
defer wg.Done()
msg := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "room",
Type: "switchto",
SwitchTo: &EventServerMessageSwitchTo{
RoomId: message.RoomId,
Details: details,
},
},
}
if err := r.events.PublishSessionMessage(sessionId, r.backend, &AsyncMessage{
Type: "message",
Message: msg,
}); err != nil {
log.Printf("Error publishing switchto event to session %s: %s", sessionId, err)
}
}(sessionId, details)
}
}
wg.Wait()
}
func (r *Room) notifyInternalRoomDeleted() {
msg := &ServerMessage{
Type: "event",

View File

@ -1055,3 +1055,32 @@ func checkMessageInCallAll(message *ServerMessage, roomId string, inCall int) er
}
return nil
}
func checkMessageSwitchTo(message *ServerMessage, roomId string, details json.RawMessage) (*EventServerMessageSwitchTo, error) {
if err := checkMessageType(message, "event"); err != nil {
return nil, err
} else if message.Event.Type != "switchto" {
return nil, fmt.Errorf("Expected switchto event, got %+v", message.Event)
} else if message.Event.Target != "room" {
return nil, fmt.Errorf("Expected room switchto event, got %+v", message.Event)
} else if message.Event.SwitchTo.RoomId != roomId {
return nil, fmt.Errorf("Expected room switchto event for room %s, got %+v", roomId, message.Event)
}
if details != nil {
if message.Event.SwitchTo.Details == nil || !bytes.Equal(details, message.Event.SwitchTo.Details) {
return nil, fmt.Errorf("Expected details %s, got %+v", string(details), message.Event)
}
} else if message.Event.SwitchTo.Details != nil {
return nil, fmt.Errorf("Expected no details, got %+v", message.Event)
}
return message.Event.SwitchTo, nil
}
func (c *TestClient) RunUntilSwitchTo(ctx context.Context, roomId string, details json.RawMessage) (*EventServerMessageSwitchTo, error) {
message, err := c.RunUntilMessage(ctx)
if err != nil {
return nil, err
}
return checkMessageSwitchTo(message, roomId, details)
}