mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2026-03-14 14:35:44 +01:00
Fix transient data for clustered setups.
This commit is contained in:
parent
90723676a0
commit
2881ca98dc
2 changed files with 176 additions and 121 deletions
45
hub.go
45
hub.go
|
|
@ -2676,10 +2676,36 @@ func (h *Hub) processTransientMsg(session Session, message *ClientMessage) {
|
|||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
if msg.Value == nil {
|
||||
room.SetTransientDataTTL(msg.Key, nil, msg.TTL)
|
||||
err = h.events.PublishBackendRoomMessage(room.Id(), room.Backend(), &AsyncMessage{
|
||||
Type: "room",
|
||||
Room: &BackendServerRoomRequest{
|
||||
Type: "transient",
|
||||
Transient: &BackendRoomTransientRequest{
|
||||
Action: TransientActionDelete,
|
||||
Key: msg.Key,
|
||||
},
|
||||
},
|
||||
})
|
||||
} else {
|
||||
room.SetTransientDataTTL(msg.Key, msg.Value, msg.TTL)
|
||||
err = h.events.PublishBackendRoomMessage(room.Id(), room.Backend(), &AsyncMessage{
|
||||
Type: "room",
|
||||
Room: &BackendServerRoomRequest{
|
||||
Type: "transient",
|
||||
Transient: &BackendRoomTransientRequest{
|
||||
Action: TransientActionSet,
|
||||
Key: msg.Key,
|
||||
Value: msg.Value,
|
||||
TTL: msg.TTL,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
response := message.NewWrappedErrorServerMessage(err)
|
||||
session.SendMessage(response)
|
||||
return
|
||||
}
|
||||
case "remove":
|
||||
if !isAllowedToUpdateTransientData(session) {
|
||||
|
|
@ -2687,7 +2713,20 @@ func (h *Hub) processTransientMsg(session Session, message *ClientMessage) {
|
|||
return
|
||||
}
|
||||
|
||||
room.RemoveTransientData(msg.Key)
|
||||
if err := h.events.PublishBackendRoomMessage(room.Id(), room.Backend(), &AsyncMessage{
|
||||
Type: "room",
|
||||
Room: &BackendServerRoomRequest{
|
||||
Type: "transient",
|
||||
Transient: &BackendRoomTransientRequest{
|
||||
Action: TransientActionDelete,
|
||||
Key: msg.Key,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
response := message.NewWrappedErrorServerMessage(err)
|
||||
session.SendMessage(response)
|
||||
return
|
||||
}
|
||||
default:
|
||||
response := message.NewErrorServerMessage(NewError("ignored", "Unsupported message type."))
|
||||
session.SendMessage(response)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ package signaling
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -138,123 +139,138 @@ func Test_TransientDataDeadlock(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_TransientMessages(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
hub, _, _, server := CreateHubForTest(t)
|
||||
for _, subtest := range clusteredTests {
|
||||
t.Run(subtest, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
var hub1 *Hub
|
||||
var hub2 *Hub
|
||||
var server1 *httptest.Server
|
||||
var server2 *httptest.Server
|
||||
if isLocalTest(t) {
|
||||
hub1, _, _, server1 = CreateHubForTest(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
defer cancel()
|
||||
hub2 = hub1
|
||||
server2 = server1
|
||||
} else {
|
||||
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
|
||||
}
|
||||
|
||||
client1, hello1 := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId+"1")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
defer cancel()
|
||||
|
||||
require.NoError(client1.SetTransientData("foo", "bar", 0))
|
||||
if msg, ok := client1.RunUntilMessage(ctx); ok {
|
||||
checkMessageError(t, msg, "not_in_room")
|
||||
}
|
||||
client1, hello1 := NewTestClientWithHello(ctx, t, server1, hub1, testDefaultUserId+"1")
|
||||
|
||||
client2, hello2 := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId+"2")
|
||||
require.NoError(client1.SetTransientData("foo", "bar", 0))
|
||||
if msg, ok := client1.RunUntilMessage(ctx); ok {
|
||||
checkMessageError(t, msg, "not_in_room")
|
||||
}
|
||||
|
||||
// Join room by id.
|
||||
roomId := "test-room"
|
||||
roomMsg := MustSucceed2(t, client1.JoinRoom, ctx, roomId)
|
||||
require.Equal(roomId, roomMsg.Room.RoomId)
|
||||
client2, hello2 := NewTestClientWithHello(ctx, t, server2, hub2, testDefaultUserId+"2")
|
||||
|
||||
// Give message processing some time.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
// Join room by id.
|
||||
roomId := "test-room"
|
||||
roomMsg := MustSucceed2(t, client1.JoinRoom, ctx, roomId)
|
||||
require.Equal(roomId, roomMsg.Room.RoomId)
|
||||
|
||||
roomMsg = MustSucceed2(t, client2.JoinRoom, ctx, roomId)
|
||||
require.Equal(roomId, roomMsg.Room.RoomId)
|
||||
// Give message processing some time.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
||||
roomMsg = MustSucceed2(t, client2.JoinRoom, ctx, roomId)
|
||||
require.Equal(roomId, roomMsg.Room.RoomId)
|
||||
|
||||
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
|
||||
require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId)
|
||||
session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
|
||||
require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId)
|
||||
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
|
||||
|
||||
// Client 1 may modify transient data.
|
||||
session1.SetPermissions([]Permission{PERMISSION_TRANSIENT_DATA})
|
||||
// Client 2 may not modify transient data.
|
||||
session2.SetPermissions([]Permission{})
|
||||
session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
|
||||
require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId)
|
||||
session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
|
||||
require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId)
|
||||
|
||||
require.NoError(client2.SetTransientData("foo", "bar", 0))
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageError(t, msg, "not_allowed")
|
||||
}
|
||||
// Client 1 may modify transient data.
|
||||
session1.SetPermissions([]Permission{PERMISSION_TRANSIENT_DATA})
|
||||
// Client 2 may not modify transient data.
|
||||
session2.SetPermissions([]Permission{})
|
||||
|
||||
require.NoError(client1.SetTransientData("foo", "bar", 0))
|
||||
require.NoError(client2.SetTransientData("foo", "bar", 0))
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageError(t, msg, "not_allowed")
|
||||
}
|
||||
|
||||
if msg, ok := client1.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientSet(t, msg, "foo", "bar", nil)
|
||||
}
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientSet(t, msg, "foo", "bar", nil)
|
||||
}
|
||||
require.NoError(client1.SetTransientData("foo", "bar", 0))
|
||||
|
||||
require.NoError(client2.RemoveTransientData("foo"))
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageError(t, msg, "not_allowed")
|
||||
}
|
||||
if msg, ok := client1.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientSet(t, msg, "foo", "bar", nil)
|
||||
}
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientSet(t, msg, "foo", "bar", nil)
|
||||
}
|
||||
|
||||
// Setting the same value is ignored by the server.
|
||||
require.NoError(client1.SetTransientData("foo", "bar", 0))
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel2()
|
||||
require.NoError(client2.RemoveTransientData("foo"))
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageError(t, msg, "not_allowed")
|
||||
}
|
||||
|
||||
client1.RunUntilErrorIs(ctx2, context.DeadlineExceeded)
|
||||
// Setting the same value is ignored by the server.
|
||||
require.NoError(client1.SetTransientData("foo", "bar", 0))
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel2()
|
||||
|
||||
data := map[string]any{
|
||||
"hello": "world",
|
||||
}
|
||||
require.NoError(client1.SetTransientData("foo", data, 0))
|
||||
client1.RunUntilErrorIs(ctx2, context.DeadlineExceeded)
|
||||
|
||||
if msg, ok := client1.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientSet(t, msg, "foo", data, "bar")
|
||||
}
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientSet(t, msg, "foo", data, "bar")
|
||||
}
|
||||
data := map[string]any{
|
||||
"hello": "world",
|
||||
}
|
||||
require.NoError(client1.SetTransientData("foo", data, 0))
|
||||
|
||||
require.NoError(client1.RemoveTransientData("foo"))
|
||||
if msg, ok := client1.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientSet(t, msg, "foo", data, "bar")
|
||||
}
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientSet(t, msg, "foo", data, "bar")
|
||||
}
|
||||
|
||||
if msg, ok := client1.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientRemove(t, msg, "foo", data)
|
||||
}
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientRemove(t, msg, "foo", data)
|
||||
}
|
||||
require.NoError(client1.RemoveTransientData("foo"))
|
||||
|
||||
// Removing a non-existing key is ignored by the server.
|
||||
require.NoError(client1.RemoveTransientData("foo"))
|
||||
ctx3, cancel3 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel3()
|
||||
if msg, ok := client1.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientRemove(t, msg, "foo", data)
|
||||
}
|
||||
if msg, ok := client2.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientRemove(t, msg, "foo", data)
|
||||
}
|
||||
|
||||
client1.RunUntilErrorIs(ctx3, context.DeadlineExceeded)
|
||||
// Removing a non-existing key is ignored by the server.
|
||||
require.NoError(client1.RemoveTransientData("foo"))
|
||||
ctx3, cancel3 := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel3()
|
||||
|
||||
require.NoError(client1.SetTransientData("abc", data, 10*time.Millisecond))
|
||||
client1.RunUntilErrorIs(ctx3, context.DeadlineExceeded)
|
||||
|
||||
client3, hello3 := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId+"3")
|
||||
roomMsg = MustSucceed2(t, client3.JoinRoom, ctx, roomId)
|
||||
require.Equal(roomId, roomMsg.Room.RoomId)
|
||||
require.NoError(client1.SetTransientData("abc", data, 10*time.Millisecond))
|
||||
|
||||
_, ignored, ok := client3.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello, hello3.Hello)
|
||||
require.True(ok)
|
||||
client3, hello3 := NewTestClientWithHello(ctx, t, server1, hub1, testDefaultUserId+"3")
|
||||
roomMsg = MustSucceed2(t, client3.JoinRoom, ctx, roomId)
|
||||
require.Equal(roomId, roomMsg.Room.RoomId)
|
||||
|
||||
var msg *ServerMessage
|
||||
if len(ignored) == 0 {
|
||||
msg = MustSucceed1(t, client3.RunUntilMessage, ctx)
|
||||
} else if len(ignored) == 1 {
|
||||
msg = ignored[0]
|
||||
} else {
|
||||
require.LessOrEqual(len(ignored), 1, "Received too many messages: %+v", ignored)
|
||||
}
|
||||
_, ignored, ok := client3.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello, hello3.Hello)
|
||||
require.True(ok)
|
||||
|
||||
checkMessageTransientInitial(t, msg, api.StringMap{
|
||||
"abc": data,
|
||||
})
|
||||
var msg *ServerMessage
|
||||
if len(ignored) == 0 {
|
||||
msg = MustSucceed1(t, client3.RunUntilMessage, ctx)
|
||||
} else if len(ignored) == 1 {
|
||||
msg = ignored[0]
|
||||
} else {
|
||||
require.LessOrEqual(len(ignored), 1, "Received too many messages: %+v", ignored)
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if msg, ok = client3.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientRemove(t, msg, "abc", data)
|
||||
checkMessageTransientInitial(t, msg, api.StringMap{
|
||||
"abc": data,
|
||||
})
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if msg, ok = client3.RunUntilMessage(ctx); ok {
|
||||
checkMessageTransientRemove(t, msg, "abc", data)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue