Add special events to update "incall" flags of all sessions.

This commit is contained in:
Joachim Bauch 2022-03-02 13:56:59 +01:00
parent 008879aefa
commit 97f2a1d5f0
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
8 changed files with 313 additions and 9 deletions

View file

@ -127,6 +127,7 @@ type BackendRoomDeleteRequest struct {
type BackendRoomInCallRequest struct {
// TODO(jojo): Change "InCall" to "int" when #914 has landed in NC Talk.
InCall json.RawMessage `json:"incall,omitempty"`
All bool `json:"all,omitempty"`
Changed []map[string]interface{} `json:"changed,omitempty"`
Users []map[string]interface{} `json:"users,omitempty"`
}

View file

@ -338,6 +338,7 @@ const (
ServerFeatureUpdateSdp = "update-sdp"
ServerFeatureAudioVideoPermissions = "audio-video-permissions"
ServerFeatureTransientData = "transient-data"
ServerFeatureInCallAll = "incall-all"
// Features for internal clients only.
ServerFeatureInternalVirtualSessions = "virtual-sessions"
@ -347,10 +348,12 @@ var (
DefaultFeatures = []string{
ServerFeatureAudioVideoPermissions,
ServerFeatureTransientData,
ServerFeatureInCallAll,
}
DefaultFeaturesInternal = []string{
ServerFeatureInternalVirtualSessions,
ServerFeatureTransientData,
ServerFeatureInCallAll,
}
)
@ -591,6 +594,8 @@ type RoomEventServerMessage struct {
InCall *json.RawMessage `json:"incall,omitempty"`
Changed []map[string]interface{} `json:"changed,omitempty"`
Users []map[string]interface{} `json:"users,omitempty"`
All bool `json:"all,omitempty"`
}
const (

View file

@ -445,16 +445,18 @@ func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, user
}
func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request *BackendServerRoomRequest) error {
timeout := time.Second
if !request.InCall.All {
timeout := time.Second
var cache ConcurrentStringStringMap
// Convert (Nextcloud) session ids to signaling session ids.
request.InCall.Users = b.fixupUserSessions(&cache, request.InCall.Users, timeout)
// Entries in "Changed" are most likely already fetched through the "Users" list.
request.InCall.Changed = b.fixupUserSessions(&cache, request.InCall.Changed, timeout)
var cache ConcurrentStringStringMap
// Convert (Nextcloud) session ids to signaling session ids.
request.InCall.Users = b.fixupUserSessions(&cache, request.InCall.Users, timeout)
// Entries in "Changed" are most likely already fetched through the "Users" list.
request.InCall.Changed = b.fixupUserSessions(&cache, request.InCall.Changed, timeout)
if len(request.InCall.Users) == 0 && len(request.InCall.Changed) == 0 {
return nil
if len(request.InCall.Users) == 0 && len(request.InCall.Changed) == 0 {
return nil
}
}
return b.nats.PublishBackendServerRoomRequest(GetSubjectForBackendRoomId(roomid, backend), request)

View file

@ -555,6 +555,29 @@ for both the signaling session id (`sessionId`) and the Nextcloud session id
(`nextcloudSessionId`).
### All participants "incall" changed events
When the `inCall` flag of all participants is changed from the backend (see
[backend request](#in-call-state-of-all-participants-changed) below),
a dedicated event is sent that doesn't include information on all participants,
but an `all` flag.
Message format (Server -> Client, incall change):
{
"type": "event"
"event": {
"target": "participants",
"type": "update",
"update": [
"roomid": "the-room-id",
"incall": new-incall-state,
"all": true
]
}
}
## Room messages
The server can notify clients about events that happened in a room. Currently
@ -884,6 +907,23 @@ Message format (Backend -> Server)
}
### In call state of all participants changed
This can be used to notify when all participants changed their `inCall` flag
to the same new value (available if the server returns the `incall-all` feature
id in the [hello response](#establish-connection)).
Message format (Backend -> Server)
{
"type": "incall"
"incall" {
"incall": new-incall-state,
"all": true
}
}
### Send an arbitrary room message
This can be used to send arbitrary messages to participants in a room. It is

19
hub.go
View file

@ -1936,7 +1936,24 @@ func (h *Hub) processRoomDeleted(message *BackendServerRoomRequest) {
func (h *Hub) processRoomInCallChanged(message *BackendServerRoomRequest) {
room := message.room
room.PublishUsersInCallChanged(message.InCall.Changed, message.InCall.Users)
if message.InCall.All {
var flags int
if err := json.Unmarshal(message.InCall.InCall, &flags); err != nil {
var incall bool
if err := json.Unmarshal(message.InCall.InCall, &incall); err != nil {
log.Printf("Unsupported InCall flags type: %+v, ignoring", string(message.InCall.InCall))
return
}
if incall {
flags = FlagInCall
}
}
room.PublishUsersInCallChangedAll(flags)
} else {
room.PublishUsersInCallChanged(message.InCall.Changed, message.InCall.Users)
}
}
func (h *Hub) processRoomParticipants(message *BackendServerRoomRequest) {

73
room.go
View file

@ -28,6 +28,7 @@ import (
"fmt"
"log"
"net/url"
"strconv"
"sync"
"time"
@ -615,6 +616,78 @@ func (r *Room) PublishUsersInCallChanged(changed []map[string]interface{}, users
}
}
func (r *Room) PublishUsersInCallChangedAll(inCall int) {
r.mu.Lock()
defer r.mu.Unlock()
if inCall&FlagInCall != 0 {
// All connected sessions join the call.
var joined []string
for _, session := range r.sessions {
if _, ok := session.(*ClientSession); !ok {
continue
}
if session.ClientType() == HelloClientTypeInternal {
continue
}
if !r.inCallSessions[session] {
r.inCallSessions[session] = true
joined = append(joined, session.PublicId())
}
}
if len(joined) == 0 {
return
}
log.Printf("Sessions %v joined call %s", joined, r.id)
} else if len(r.inCallSessions) > 0 {
// Perform actual leaving asynchronously.
ch := make(chan *ClientSession, 1)
go func() {
for {
session := <-ch
if session == nil {
break
}
session.LeaveCall()
}
}()
for session := range r.inCallSessions {
if clientSession, ok := session.(*ClientSession); ok {
ch <- clientSession
}
}
close(ch)
r.inCallSessions = make(map[Session]bool)
} else {
// All sessions already left the call, no need to notify.
return
}
inCallMsg := json.RawMessage(strconv.FormatInt(int64(inCall), 10))
message := &ServerMessage{
Type: "event",
Event: &EventServerMessage{
Target: "participants",
Type: "update",
Update: &RoomEventServerMessage{
RoomId: r.id,
InCall: &inCallMsg,
All: true,
},
},
}
if err := r.publish(message); err != nil {
log.Printf("Could not publish incall message in room %s: %s", r.Id(), err)
}
}
func (r *Room) PublishUsersChanged(changed []map[string]interface{}, users []map[string]interface{}) {
changed = r.filterPermissions(changed)
users = r.filterPermissions(users)

View file

@ -27,6 +27,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"strconv"
"testing"
"time"
@ -421,3 +422,149 @@ func TestRoom_RoomSessionData(t *testing.T) {
}
wg.Wait()
}
func TestRoom_InCallAll(t *testing.T) {
hub, _, router, server, shutdown := CreateHubForTest(t)
defer shutdown()
config, err := getTestConfig(server)
if err != nil {
t.Fatal(err)
}
b, err := NewBackendServer(config, hub, "no-version")
if err != nil {
t.Fatal(err)
}
if err := b.Start(router); err != nil {
t.Fatal(err)
}
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
// Simulate backend request from Nextcloud to update the "inCall" flag of all participants.
msg1 := &BackendServerRoomRequest{
Type: "incall",
InCall: &BackendRoomInCallRequest{
All: true,
InCall: json.RawMessage(strconv.FormatInt(FlagInCall, 10)),
},
}
data1, err := json.Marshal(msg1)
if err != nil {
t.Fatal(err)
}
res1, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data1)
if err != nil {
t.Fatal(err)
}
defer res1.Body.Close()
body1, err := ioutil.ReadAll(res1.Body)
if err != nil {
t.Error(err)
}
if res1.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res1.Status, string(body1))
}
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else if err := checkMessageInCallAll(msg, roomId, FlagInCall); err != nil {
t.Fatal(err)
}
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else if err := checkMessageInCallAll(msg, roomId, FlagInCall); err != nil {
t.Fatal(err)
}
// Simulate backend request from Nextcloud to update the "inCall" flag of all participants.
msg2 := &BackendServerRoomRequest{
Type: "incall",
InCall: &BackendRoomInCallRequest{
All: true,
InCall: json.RawMessage(strconv.FormatInt(0, 10)),
},
}
data2, err := json.Marshal(msg2)
if err != nil {
t.Fatal(err)
}
res2, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data2)
if err != nil {
t.Fatal(err)
}
defer res2.Body.Close()
body2, err := ioutil.ReadAll(res2.Body)
if err != nil {
t.Error(err)
}
if res2.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res2.Status, string(body2))
}
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else if err := checkMessageInCallAll(msg, roomId, 0); err != nil {
t.Fatal(err)
}
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else if err := checkMessageInCallAll(msg, roomId, 0); err != nil {
t.Fatal(err)
}
}

View file

@ -22,6 +22,7 @@
package signaling
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
@ -31,6 +32,7 @@ import (
"net"
"net/http/httptest"
"reflect"
"strconv"
"strings"
"testing"
"time"
@ -841,3 +843,20 @@ func checkMessageTransientInitial(message *ServerMessage, data map[string]interf
return nil
}
func checkMessageInCallAll(message *ServerMessage, roomId string, inCall int) error {
if err := checkMessageType(message, "event"); err != nil {
return err
} else if message.Event.Type != "update" {
return fmt.Errorf("Expected update event, got %+v", message.Event)
} else if message.Event.Target != "participants" {
return fmt.Errorf("Expected participants update event, got %+v", message.Event)
} else if message.Event.Update.RoomId != roomId {
return fmt.Errorf("Expected participants update event for room %s, got %+v", roomId, message.Event.Update)
} else if !message.Event.Update.All {
return fmt.Errorf("Expected participants update event for all, got %+v", message.Event.Update)
} else if !bytes.Equal(*message.Event.Update.InCall, []byte(strconv.FormatInt(int64(inCall), 10))) {
return fmt.Errorf("Expected incall flags %d, got %+v", inCall, message.Event.Update)
}
return nil
}