Added tests for clustered behaviour.

This commit is contained in:
Joachim Bauch 2022-06-28 17:18:35 +02:00
parent dcb5be956c
commit 0e144906a4
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
5 changed files with 1111 additions and 329 deletions

View File

@ -112,6 +112,133 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil
return config, b, events, hub, r, server
}
func CreateBackendServerWithClusteringForTest(t *testing.T) (*BackendServer, *BackendServer, *Hub, *Hub, *httptest.Server, *httptest.Server) {
return CreateBackendServerWithClusteringForTestFromConfig(t, nil, nil)
}
func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *goconf.ConfigFile, config2 *goconf.ConfigFile) (*BackendServer, *BackendServer, *Hub, *Hub, *httptest.Server, *httptest.Server) {
r1 := mux.NewRouter()
registerBackendHandler(t, r1)
server1 := httptest.NewServer(r1)
t.Cleanup(func() {
server1.Close()
})
r2 := mux.NewRouter()
registerBackendHandler(t, r2)
server2 := httptest.NewServer(r2)
t.Cleanup(func() {
server2.Close()
})
nats := startLocalNatsServer(t)
grpcServer1, addr1 := NewGrpcServerForTest(t)
grpcServer2, addr2 := NewGrpcServerForTest(t)
if config1 == nil {
config1 = goconf.NewConfigFile()
}
u1, err := url.Parse(server1.URL)
if err != nil {
t.Fatal(err)
}
config1.AddOption("backend", "allowed", u1.Host)
if u1.Scheme == "http" {
config1.AddOption("backend", "allowhttp", "true")
}
config1.AddOption("backend", "secret", string(testBackendSecret))
config1.AddOption("sessions", "hashkey", "12345678901234567890123456789012")
config1.AddOption("sessions", "blockkey", "09876543210987654321098765432109")
config1.AddOption("clients", "internalsecret", string(testInternalSecret))
config1.AddOption("geoip", "url", "none")
events1, err := NewAsyncEvents(nats)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
events1.Close()
})
client1 := NewGrpcClientsForTest(t, addr2)
hub1, err := NewHub(config1, events1, client1, r1, "no-version")
if err != nil {
t.Fatal(err)
}
if config2 == nil {
config2 = goconf.NewConfigFile()
}
u2, err := url.Parse(server2.URL)
if err != nil {
t.Fatal(err)
}
config2.AddOption("backend", "allowed", u2.Host)
if u2.Scheme == "http" {
config2.AddOption("backend", "allowhttp", "true")
}
config2.AddOption("backend", "secret", string(testBackendSecret))
config2.AddOption("sessions", "hashkey", "12345678901234567890123456789012")
config2.AddOption("sessions", "blockkey", "09876543210987654321098765432109")
config2.AddOption("clients", "internalsecret", string(testInternalSecret))
config2.AddOption("geoip", "url", "none")
events2, err := NewAsyncEvents(nats)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
events2.Close()
})
client2 := NewGrpcClientsForTest(t, addr1)
hub2, err := NewHub(config2, events2, client2, r2, "no-version")
if err != nil {
t.Fatal(err)
}
b1, err := NewBackendServer(config1, hub1, "no-version")
if err != nil {
t.Fatal(err)
}
if err := b1.Start(r1); err != nil {
t.Fatal(err)
}
b2, err := NewBackendServer(config2, hub2, "no-version")
if err != nil {
t.Fatal(err)
}
if err := b2.Start(r2); err != nil {
t.Fatal(err)
}
grpcServer1.hub = hub1
grpcServer2.hub = hub2
go func() {
if err := grpcServer1.Run(); err != nil {
t.Errorf("Could not start RPC server on %s: %s", addr1, err)
}
}()
go func() {
if err := grpcServer2.Run(); err != nil {
t.Errorf("Could not start RPC server on %s: %s", addr2, err)
}
}()
go hub1.Run()
go hub2.Run()
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
WaitForHub(ctx, t, hub1)
WaitForHub(ctx, t, hub2)
})
return b1, b2, hub1, hub2, server1, server2
}
func performBackendRequest(url string, body []byte) (*http.Response, error) {
request, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
@ -791,117 +918,134 @@ func RunTestBackendServer_RoomDelete(t *testing.T) {
}
func TestBackendServer_ParticipantsUpdatePermissions(t *testing.T) {
_, _, _, hub, _, server := CreateBackendServerForTest(t)
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
if isLocalTest(t) {
_, _, _, hub1, _, server1 = CreateBackendServerForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hub2 = hub1
server2 = server1
} else {
_, _, hub1, hub2, server1, server2 = CreateBackendServerWithClusteringForTest(t)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId)
if session1 == nil {
t.Fatalf("Session %s does not exist", hello1.Hello.SessionId)
}
session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId)
if session2 == nil {
t.Fatalf("Session %s does not exist", hello2.Hello.SessionId)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
// Sessions have all permissions initially (fallback for old-style sessions).
assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA)
assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_SCREEN)
assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_MEDIA)
assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_SCREEN)
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId)
if session1 == nil {
t.Fatalf("Session %s does not exist", hello1.Hello.SessionId)
}
session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId)
if session2 == nil {
t.Fatalf("Session %s does not exist", hello2.Hello.SessionId)
}
// Ignore "join" events.
if err := client1.DrainMessages(ctx); err != nil {
t.Error(err)
}
if err := client2.DrainMessages(ctx); err != nil {
t.Error(err)
}
// Sessions have all permissions initially (fallback for old-style sessions).
assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA)
assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_SCREEN)
assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_MEDIA)
assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_SCREEN)
msg := &BackendServerRoomRequest{
Type: "participants",
Participants: &BackendRoomParticipantsRequest{
Changed: []map[string]interface{}{
{
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA},
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// Ignore "join" events.
if err := client1.DrainMessages(ctx); err != nil {
t.Error(err)
}
if err := client2.DrainMessages(ctx); err != nil {
t.Error(err)
}
msg := &BackendServerRoomRequest{
Type: "participants",
Participants: &BackendRoomParticipantsRequest{
Changed: []map[string]interface{}{
{
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA},
},
{
"sessionId": roomId + "-" + hello2.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_SCREEN},
},
},
Users: []map[string]interface{}{
{
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA},
},
{
"sessionId": roomId + "-" + hello2.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_SCREEN},
},
},
},
{
"sessionId": roomId + "-" + hello2.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_SCREEN},
},
},
Users: []map[string]interface{}{
{
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA},
},
{
"sessionId": roomId + "-" + hello2.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_SCREEN},
},
},
},
}
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
// The request could be sent to any of the backend servers.
res, err := performBackendRequest(server1.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
// TODO: Use event to wait for asynchronous messages.
time.Sleep(10 * time.Millisecond)
// TODO: Use event to wait for asynchronous messages.
time.Sleep(10 * time.Millisecond)
assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA)
assertSessionHasNotPermission(t, session1, PERMISSION_MAY_PUBLISH_SCREEN)
assertSessionHasNotPermission(t, session2, PERMISSION_MAY_PUBLISH_MEDIA)
assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_SCREEN)
assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA)
assertSessionHasNotPermission(t, session1, PERMISSION_MAY_PUBLISH_SCREEN)
assertSessionHasNotPermission(t, session2, PERMISSION_MAY_PUBLISH_MEDIA)
assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_SCREEN)
})
}
}
func TestBackendServer_ParticipantsUpdateEmptyPermissions(t *testing.T) {

View File

@ -34,6 +34,21 @@ const (
GrpcSelfTargetForTesting = "testing.grpc.target"
)
func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients {
config := goconf.NewConfigFile()
config.AddOption("grpc", "targets", addr)
client, err := NewGrpcClients(config, nil)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
client.Close()
})
return client
}
func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients {
config := goconf.NewConfigFile()
config.AddOption("etcd", "endpoints", etcd.Config().LCUrls[0].String())

File diff suppressed because it is too large Load Diff

View File

@ -124,14 +124,14 @@ func checkMessageType(message *ServerMessage, expectedType string) error {
return nil
}
func checkMessageSender(hub *Hub, message *MessageServerMessage, senderType string, hello *HelloServerMessage) error {
if message.Sender.Type != senderType {
return fmt.Errorf("Expected sender type %s, got %s", senderType, message.Sender.SessionId)
} else if message.Sender.SessionId != hello.SessionId {
func checkMessageSender(hub *Hub, sender *MessageServerMessageSender, senderType string, hello *HelloServerMessage) error {
if sender.Type != senderType {
return fmt.Errorf("Expected sender type %s, got %s", senderType, sender.SessionId)
} else if sender.SessionId != hello.SessionId {
return fmt.Errorf("Expected session id %+v, got %+v",
getPubliceSessionIdData(hub, hello.SessionId), getPubliceSessionIdData(hub, message.Sender.SessionId))
} else if message.Sender.UserId != hello.UserId {
return fmt.Errorf("Expected user id %s, got %s", hello.UserId, message.Sender.UserId)
getPubliceSessionIdData(hub, hello.SessionId), getPubliceSessionIdData(hub, sender.SessionId))
} else if sender.UserId != hello.UserId {
return fmt.Errorf("Expected user id %s, got %s", hello.UserId, sender.UserId)
}
return nil
@ -143,7 +143,7 @@ func checkReceiveClientMessageWithSender(ctx context.Context, client *TestClient
return err
} else if err := checkMessageType(message, "message"); err != nil {
return err
} else if err := checkMessageSender(client.hub, message.Message, senderType, hello); err != nil {
} else if err := checkMessageSender(client.hub, message.Message.Sender, senderType, hello); err != nil {
return err
} else {
if err := json.Unmarshal(*message.Message.Data, payload); err != nil {
@ -160,6 +160,29 @@ func checkReceiveClientMessage(ctx context.Context, client *TestClient, senderTy
return checkReceiveClientMessageWithSender(ctx, client, senderType, hello, payload, nil)
}
func checkReceiveClientControlWithSender(ctx context.Context, client *TestClient, senderType string, hello *HelloServerMessage, payload interface{}, sender **MessageServerMessageSender) error {
message, err := client.RunUntilMessage(ctx)
if err := checkUnexpectedClose(err); err != nil {
return err
} else if err := checkMessageType(message, "control"); err != nil {
return err
} else if err := checkMessageSender(client.hub, message.Control.Sender, senderType, hello); err != nil {
return err
} else {
if err := json.Unmarshal(*message.Control.Data, payload); err != nil {
return err
}
}
if sender != nil {
*sender = message.Message.Sender
}
return nil
}
func checkReceiveClientControl(ctx context.Context, client *TestClient, senderType string, hello *HelloServerMessage, payload interface{}) error {
return checkReceiveClientControlWithSender(ctx, client, senderType, hello, payload, nil)
}
func checkReceiveClientEvent(ctx context.Context, client *TestClient, eventType string, msg **EventServerMessage) error {
message, err := client.RunUntilMessage(ctx)
if err := checkUnexpectedClose(err); err != nil {
@ -414,6 +437,25 @@ func (c *TestClient) SendMessage(recipient MessageClientMessageRecipient, data i
return c.WriteJSON(message)
}
func (c *TestClient) SendControl(recipient MessageClientMessageRecipient, data interface{}) error {
payload, err := json.Marshal(data)
if err != nil {
c.t.Fatal(err)
}
message := &ClientMessage{
Id: "abcd",
Type: "control",
Control: &ControlClientMessage{
MessageClientMessage: MessageClientMessage{
Recipient: recipient,
Data: (*json.RawMessage)(&payload),
},
},
}
return c.WriteJSON(message)
}
func (c *TestClient) SetTransientData(key string, value interface{}) error {
payload, err := json.Marshal(value)
if err != nil {

View File

@ -265,7 +265,7 @@ func TestVirtualSession(t *testing.T) {
t.Fatal(err)
} else if err := checkMessageType(msg2, "message"); err != nil {
t.Fatal(err)
} else if err := checkMessageSender(hub, msg2.Message, "session", hello.Hello); err != nil {
} else if err := checkMessageSender(hub, msg2.Message.Sender, "session", hello.Hello); err != nil {
t.Error(err)
}