From 0e144906a49dee10f59ee986afb178f4723649b6 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 28 Jun 2022 17:18:35 +0200 Subject: [PATCH] Added tests for clustered behaviour. --- backend_server_test.go | 342 ++++++++++---- grpc_client_test.go | 15 + hub_test.go | 1023 +++++++++++++++++++++++++++++++--------- testclient_test.go | 58 ++- virtualsession_test.go | 2 +- 5 files changed, 1111 insertions(+), 329 deletions(-) diff --git a/backend_server_test.go b/backend_server_test.go index bc0acfa..ff74b71 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -112,6 +112,133 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil return config, b, events, hub, r, server } +func CreateBackendServerWithClusteringForTest(t *testing.T) (*BackendServer, *BackendServer, *Hub, *Hub, *httptest.Server, *httptest.Server) { + return CreateBackendServerWithClusteringForTestFromConfig(t, nil, nil) +} + +func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *goconf.ConfigFile, config2 *goconf.ConfigFile) (*BackendServer, *BackendServer, *Hub, *Hub, *httptest.Server, *httptest.Server) { + r1 := mux.NewRouter() + registerBackendHandler(t, r1) + + server1 := httptest.NewServer(r1) + t.Cleanup(func() { + server1.Close() + }) + + r2 := mux.NewRouter() + registerBackendHandler(t, r2) + + server2 := httptest.NewServer(r2) + t.Cleanup(func() { + server2.Close() + }) + + nats := startLocalNatsServer(t) + grpcServer1, addr1 := NewGrpcServerForTest(t) + grpcServer2, addr2 := NewGrpcServerForTest(t) + + if config1 == nil { + config1 = goconf.NewConfigFile() + } + u1, err := url.Parse(server1.URL) + if err != nil { + t.Fatal(err) + } + config1.AddOption("backend", "allowed", u1.Host) + if u1.Scheme == "http" { + config1.AddOption("backend", "allowhttp", "true") + } + config1.AddOption("backend", "secret", string(testBackendSecret)) + config1.AddOption("sessions", "hashkey", "12345678901234567890123456789012") + config1.AddOption("sessions", "blockkey", "09876543210987654321098765432109") + config1.AddOption("clients", "internalsecret", string(testInternalSecret)) + config1.AddOption("geoip", "url", "none") + + events1, err := NewAsyncEvents(nats) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + events1.Close() + }) + client1 := NewGrpcClientsForTest(t, addr2) + hub1, err := NewHub(config1, events1, client1, r1, "no-version") + if err != nil { + t.Fatal(err) + } + + if config2 == nil { + config2 = goconf.NewConfigFile() + } + u2, err := url.Parse(server2.URL) + if err != nil { + t.Fatal(err) + } + config2.AddOption("backend", "allowed", u2.Host) + if u2.Scheme == "http" { + config2.AddOption("backend", "allowhttp", "true") + } + config2.AddOption("backend", "secret", string(testBackendSecret)) + config2.AddOption("sessions", "hashkey", "12345678901234567890123456789012") + config2.AddOption("sessions", "blockkey", "09876543210987654321098765432109") + config2.AddOption("clients", "internalsecret", string(testInternalSecret)) + config2.AddOption("geoip", "url", "none") + events2, err := NewAsyncEvents(nats) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + events2.Close() + }) + client2 := NewGrpcClientsForTest(t, addr1) + hub2, err := NewHub(config2, events2, client2, r2, "no-version") + if err != nil { + t.Fatal(err) + } + + b1, err := NewBackendServer(config1, hub1, "no-version") + if err != nil { + t.Fatal(err) + } + if err := b1.Start(r1); err != nil { + t.Fatal(err) + } + b2, err := NewBackendServer(config2, hub2, "no-version") + if err != nil { + t.Fatal(err) + } + if err := b2.Start(r2); err != nil { + t.Fatal(err) + } + + grpcServer1.hub = hub1 + grpcServer2.hub = hub2 + + go func() { + if err := grpcServer1.Run(); err != nil { + t.Errorf("Could not start RPC server on %s: %s", addr1, err) + } + }() + go func() { + if err := grpcServer2.Run(); err != nil { + t.Errorf("Could not start RPC server on %s: %s", addr2, err) + } + }() + + go hub1.Run() + go hub2.Run() + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + WaitForHub(ctx, t, hub1) + WaitForHub(ctx, t, hub2) + }) + + return b1, b2, hub1, hub2, server1, server2 +} + func performBackendRequest(url string, body []byte) (*http.Response, error) { request, err := http.NewRequest("POST", url, bytes.NewReader(body)) if err != nil { @@ -791,117 +918,134 @@ func RunTestBackendServer_RoomDelete(t *testing.T) { } func TestBackendServer_ParticipantsUpdatePermissions(t *testing.T) { - _, _, _, hub, _, server := CreateBackendServerForTest(t) + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server - client1 := NewTestClient(t, server, hub) - defer client1.CloseWithBye() - if err := client1.SendHello(testDefaultUserId + "1"); err != nil { - t.Fatal(err) - } - client2 := NewTestClient(t, server, hub) - defer client2.CloseWithBye() - if err := client2.SendHello(testDefaultUserId + "2"); err != nil { - t.Fatal(err) - } + if isLocalTest(t) { + _, _, _, hub1, _, server1 = CreateBackendServerForTest(t) - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() + hub2 = hub1 + server2 = server1 + } else { + _, _, hub1, hub2, server1, server2 = CreateBackendServerWithClusteringForTest(t) + } - hello1, err := client1.RunUntilHello(ctx) - if err != nil { - t.Fatal(err) - } - hello2, err := client2.RunUntilHello(ctx) - if err != nil { - t.Fatal(err) - } + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } - session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId) - if session1 == nil { - t.Fatalf("Session %s does not exist", hello1.Hello.SessionId) - } - session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId) - if session2 == nil { - t.Fatalf("Session %s does not exist", hello2.Hello.SessionId) - } + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() - // Sessions have all permissions initially (fallback for old-style sessions). - assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA) - assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_SCREEN) - assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_MEDIA) - assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_SCREEN) + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } - // Join room by id. - roomId := "test-room" - if room, err := client1.JoinRoom(ctx, roomId); err != nil { - t.Fatal(err) - } else if room.Room.RoomId != roomId { - t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) - } - if room, err := client2.JoinRoom(ctx, roomId); err != nil { - t.Fatal(err) - } else if room.Room.RoomId != roomId { - t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) - } + session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId) + if session1 == nil { + t.Fatalf("Session %s does not exist", hello1.Hello.SessionId) + } + session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId) + if session2 == nil { + t.Fatalf("Session %s does not exist", hello2.Hello.SessionId) + } - // Ignore "join" events. - if err := client1.DrainMessages(ctx); err != nil { - t.Error(err) - } - if err := client2.DrainMessages(ctx); err != nil { - t.Error(err) - } + // Sessions have all permissions initially (fallback for old-style sessions). + assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA) + assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_SCREEN) + assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_MEDIA) + assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_SCREEN) - msg := &BackendServerRoomRequest{ - Type: "participants", - Participants: &BackendRoomParticipantsRequest{ - Changed: []map[string]interface{}{ - { - "sessionId": roomId + "-" + hello1.Hello.SessionId, - "permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA}, + // Join room by id. + roomId := "test-room" + if room, err := client1.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // Ignore "join" events. + if err := client1.DrainMessages(ctx); err != nil { + t.Error(err) + } + if err := client2.DrainMessages(ctx); err != nil { + t.Error(err) + } + + msg := &BackendServerRoomRequest{ + Type: "participants", + Participants: &BackendRoomParticipantsRequest{ + Changed: []map[string]interface{}{ + { + "sessionId": roomId + "-" + hello1.Hello.SessionId, + "permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA}, + }, + { + "sessionId": roomId + "-" + hello2.Hello.SessionId, + "permissions": []Permission{PERMISSION_MAY_PUBLISH_SCREEN}, + }, + }, + Users: []map[string]interface{}{ + { + "sessionId": roomId + "-" + hello1.Hello.SessionId, + "permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA}, + }, + { + "sessionId": roomId + "-" + hello2.Hello.SessionId, + "permissions": []Permission{PERMISSION_MAY_PUBLISH_SCREEN}, + }, + }, }, - { - "sessionId": roomId + "-" + hello2.Hello.SessionId, - "permissions": []Permission{PERMISSION_MAY_PUBLISH_SCREEN}, - }, - }, - Users: []map[string]interface{}{ - { - "sessionId": roomId + "-" + hello1.Hello.SessionId, - "permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA}, - }, - { - "sessionId": roomId + "-" + hello2.Hello.SessionId, - "permissions": []Permission{PERMISSION_MAY_PUBLISH_SCREEN}, - }, - }, - }, - } + } - data, err := json.Marshal(msg) - if err != nil { - t.Fatal(err) - } - res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data) - if err != nil { - t.Fatal(err) - } - defer res.Body.Close() - body, err := io.ReadAll(res.Body) - if err != nil { - t.Error(err) - } - if res.StatusCode != 200 { - t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) - } + data, err := json.Marshal(msg) + if err != nil { + t.Fatal(err) + } + // The request could be sent to any of the backend servers. + res, err := performBackendRequest(server1.URL+"/api/v1/room/"+roomId, data) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + t.Error(err) + } + if res.StatusCode != 200 { + t.Errorf("Expected successful request, got %s: %s", res.Status, string(body)) + } - // TODO: Use event to wait for asynchronous messages. - time.Sleep(10 * time.Millisecond) + // TODO: Use event to wait for asynchronous messages. + time.Sleep(10 * time.Millisecond) - assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA) - assertSessionHasNotPermission(t, session1, PERMISSION_MAY_PUBLISH_SCREEN) - assertSessionHasNotPermission(t, session2, PERMISSION_MAY_PUBLISH_MEDIA) - assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_SCREEN) + assertSessionHasPermission(t, session1, PERMISSION_MAY_PUBLISH_MEDIA) + assertSessionHasNotPermission(t, session1, PERMISSION_MAY_PUBLISH_SCREEN) + assertSessionHasNotPermission(t, session2, PERMISSION_MAY_PUBLISH_MEDIA) + assertSessionHasPermission(t, session2, PERMISSION_MAY_PUBLISH_SCREEN) + }) + } } func TestBackendServer_ParticipantsUpdateEmptyPermissions(t *testing.T) { diff --git a/grpc_client_test.go b/grpc_client_test.go index 692297a..51bebf5 100644 --- a/grpc_client_test.go +++ b/grpc_client_test.go @@ -34,6 +34,21 @@ const ( GrpcSelfTargetForTesting = "testing.grpc.target" ) +func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients { + config := goconf.NewConfigFile() + config.AddOption("grpc", "targets", addr) + + client, err := NewGrpcClients(config, nil) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + client.Close() + }) + + return client +} + func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients { config := goconf.NewConfigFile() config.AddOption("etcd", "endpoints", etcd.Config().LCUrls[0].String()) diff --git a/hub_test.go b/hub_test.go index 8dc8b95..3bc4db4 100644 --- a/hub_test.go +++ b/hub_test.go @@ -47,6 +47,13 @@ const ( testTimeout = 10 * time.Second ) +var ( + clusteredTests = []string{ + "local", + "clustered", + } +) + // Only used for testing. func (h *Hub) getRoom(id string) *Room { h.ru.RLock() @@ -61,6 +68,10 @@ func (h *Hub) getRoom(id string) *Room { return nil } +func isLocalTest(t *testing.T) bool { + return strings.HasSuffix(t.Name(), "/local") +} + func getTestConfig(server *httptest.Server) (*goconf.ConfigFile, error) { config := goconf.NewConfigFile() u, err := url.Parse(server.URL) @@ -146,6 +157,106 @@ func CreateHubWithMultipleBackendsForTest(t *testing.T) (*Hub, AsyncEvents, *mux return h, events, r, server } +func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, *Hub, *httptest.Server, *httptest.Server) { + r1 := mux.NewRouter() + registerBackendHandler(t, r1) + + server1 := httptest.NewServer(r1) + t.Cleanup(func() { + server1.Close() + }) + + r2 := mux.NewRouter() + registerBackendHandler(t, r2) + + server2 := httptest.NewServer(r2) + t.Cleanup(func() { + server2.Close() + }) + + nats := startLocalNatsServer(t) + grpcServer1, addr1 := NewGrpcServerForTest(t) + grpcServer2, addr2 := NewGrpcServerForTest(t) + + events1, err := NewAsyncEvents(nats) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + events1.Close() + }) + config1, err := getConfigFunc(server1) + if err != nil { + t.Fatal(err) + } + client1 := NewGrpcClientsForTest(t, addr2) + h1, err := NewHub(config1, events1, client1, r1, "no-version") + if err != nil { + t.Fatal(err) + } + b1, err := NewBackendServer(config1, h1, "no-version") + if err != nil { + t.Fatal(err) + } + events2, err := NewAsyncEvents(nats) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + events2.Close() + }) + config2, err := getConfigFunc(server2) + if err != nil { + t.Fatal(err) + } + client2 := NewGrpcClientsForTest(t, addr1) + h2, err := NewHub(config2, events2, client2, r2, "no-version") + if err != nil { + t.Fatal(err) + } + b2, err := NewBackendServer(config2, h2, "no-version") + if err != nil { + t.Fatal(err) + } + if err := b1.Start(r1); err != nil { + t.Fatal(err) + } + if err := b2.Start(r2); err != nil { + t.Fatal(err) + } + + grpcServer1.hub = h1 + grpcServer2.hub = h2 + + go func() { + if err := grpcServer1.Run(); err != nil { + t.Errorf("Could not start RPC server on %s: %s", addr1, err) + } + }() + go func() { + if err := grpcServer2.Run(); err != nil { + t.Errorf("Could not start RPC server on %s: %s", addr2, err) + } + }() + + go h1.Run() + go h2.Run() + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + WaitForHub(ctx, t, h1) + WaitForHub(ctx, t, h2) + }) + + return h1, h2, server1, server2 +} + +func CreateClusteredHubsForTest(t *testing.T) (*Hub, *Hub, *httptest.Server, *httptest.Server) { + return CreateClusteredHubsForTestWithConfig(t, getTestConfig) +} + func WaitForHub(ctx context.Context, t *testing.T, h *Hub) { // Wait for any channel messages to be processed. time.Sleep(10 * time.Millisecond) @@ -1338,6 +1449,152 @@ func TestClientHelloInternal(t *testing.T) { } func TestClientMessageToSessionId(t *testing.T) { + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) + + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + if hello1.Hello.SessionId == hello2.Hello.SessionId { + t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId) + } + + recipient1 := MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + } + recipient2 := MessageClientMessageRecipient{ + Type: "session", + SessionId: hello2.Hello.SessionId, + } + + data1 := "from-1-to-2" + client1.SendMessage(recipient2, data1) // nolint + data2 := "from-2-to-1" + client2.SendMessage(recipient1, data2) // nolint + + var payload string + if err := checkReceiveClientMessage(ctx, client1, "session", hello2.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data2 { + t.Errorf("Expected payload %s, got %s", data2, payload) + } + if err := checkReceiveClientMessage(ctx, client2, "session", hello1.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data1 { + t.Errorf("Expected payload %s, got %s", data1, payload) + } + }) + } +} + +func TestClientControlToSessionId(t *testing.T) { + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) + + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + if hello1.Hello.SessionId == hello2.Hello.SessionId { + t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId) + } + + recipient1 := MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + } + recipient2 := MessageClientMessageRecipient{ + Type: "session", + SessionId: hello2.Hello.SessionId, + } + + data1 := "from-1-to-2" + client1.SendControl(recipient2, data1) // nolint + data2 := "from-2-to-1" + client2.SendControl(recipient1, data2) // nolint + + var payload string + if err := checkReceiveClientControl(ctx, client1, "session", hello2.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data2 { + t.Errorf("Expected payload %s, got %s", data2, payload) + } + if err := checkReceiveClientControl(ctx, client2, "session", hello1.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data1 { + t.Errorf("Expected payload %s, got %s", data1, payload) + } + }) + } +} + +func TestClientControlMissingPermissions(t *testing.T) { hub, _, _, server := CreateHubForTest(t) client1 := NewTestClient(t, server, hub) @@ -1367,6 +1624,27 @@ func TestClientMessageToSessionId(t *testing.T) { t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId) } + session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession) + if session1 == nil { + t.Fatalf("Session %s does not exist", hello1.Hello.SessionId) + } + session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession) + if session2 == nil { + t.Fatalf("Session %s does not exist", hello2.Hello.SessionId) + } + + // Client 1 may not send control messages (will be ignored). + session1.SetPermissions([]Permission{ + PERMISSION_MAY_PUBLISH_AUDIO, + PERMISSION_MAY_PUBLISH_VIDEO, + }) + // Client 2 may send control messages. + session2.SetPermissions([]Permission{ + PERMISSION_MAY_PUBLISH_AUDIO, + PERMISSION_MAY_PUBLISH_VIDEO, + PERMISSION_MAY_CONTROL, + }) + recipient1 := MessageClientMessageRecipient{ Type: "session", SessionId: hello1.Hello.SessionId, @@ -1377,20 +1655,26 @@ func TestClientMessageToSessionId(t *testing.T) { } data1 := "from-1-to-2" - client1.SendMessage(recipient2, data1) // nolint + client1.SendControl(recipient2, data1) // nolint data2 := "from-2-to-1" - client2.SendMessage(recipient1, data2) // nolint + client2.SendControl(recipient1, data2) // nolint var payload string - if err := checkReceiveClientMessage(ctx, client1, "session", hello2.Hello, &payload); err != nil { + if err := checkReceiveClientControl(ctx, client1, "session", hello2.Hello, &payload); err != nil { t.Error(err) } else if payload != data2 { t.Errorf("Expected payload %s, got %s", data2, payload) } - if err := checkReceiveClientMessage(ctx, client2, "session", hello1.Hello, &payload); err != nil { - t.Error(err) - } else if payload != data1 { - t.Errorf("Expected payload %s, got %s", data1, payload) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel2() + + if err := checkReceiveClientMessage(ctx2, client2, "session", hello1.Hello, &payload); err != nil { + if err != ErrNoMessageReceived { + t.Error(err) + } + } else { + t.Errorf("Expected no payload, got %+v", payload) } } @@ -1454,6 +1738,66 @@ func TestClientMessageToUserId(t *testing.T) { } } +func TestClientControlToUserId(t *testing.T) { + hub, _, _, server := CreateHubForTest(t) + + client1 := NewTestClient(t, server, hub) + defer client1.CloseWithBye() + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + client2 := NewTestClient(t, server, hub) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + if hello1.Hello.SessionId == hello2.Hello.SessionId { + t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId) + } else if hello1.Hello.UserId == hello2.Hello.UserId { + t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId) + } + + recipient1 := MessageClientMessageRecipient{ + Type: "user", + UserId: hello1.Hello.UserId, + } + recipient2 := MessageClientMessageRecipient{ + Type: "user", + UserId: hello2.Hello.UserId, + } + + data1 := "from-1-to-2" + client1.SendControl(recipient2, data1) // nolint + data2 := "from-2-to-1" + client2.SendControl(recipient1, data2) // nolint + + var payload string + if err := checkReceiveClientControl(ctx, client1, "user", hello2.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data2 { + t.Errorf("Expected payload %s, got %s", data2, payload) + } + + if err := checkReceiveClientControl(ctx, client2, "user", hello1.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data1 { + t.Errorf("Expected payload %s, got %s", data1, payload) + } +} + func TestClientMessageToUserIdMultipleSessions(t *testing.T) { hub, _, _, server := CreateHubForTest(t) @@ -1577,76 +1921,182 @@ func WaitForUsersJoined(ctx context.Context, t *testing.T, client1 *TestClient, } func TestClientMessageToRoom(t *testing.T) { - hub, _, _, server := CreateHubForTest(t) + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) - client1 := NewTestClient(t, server, hub) - defer client1.CloseWithBye() - if err := client1.SendHello(testDefaultUserId + "1"); err != nil { - t.Fatal(err) - } - hello1, err := client1.RunUntilHello(ctx) - if err != nil { - t.Fatal(err) + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + if hello1.Hello.SessionId == hello2.Hello.SessionId { + t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId) + } else if hello1.Hello.UserId == hello2.Hello.UserId { + t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId) + } + + // Join room by id. + roomId := "test-room" + if room, err := client1.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // Give message processing some time. + time.Sleep(10 * time.Millisecond) + + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2) + + recipient := MessageClientMessageRecipient{ + Type: "room", + } + + data1 := "from-1-to-2" + client1.SendMessage(recipient, data1) // nolint + data2 := "from-2-to-1" + client2.SendMessage(recipient, data2) // nolint + + var payload string + if err := checkReceiveClientMessage(ctx, client1, "room", hello2.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data2 { + t.Errorf("Expected payload %s, got %s", data2, payload) + } + + if err := checkReceiveClientMessage(ctx, client2, "room", hello1.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data1 { + t.Errorf("Expected payload %s, got %s", data1, payload) + } + }) } +} - client2 := NewTestClient(t, server, hub) - defer client2.CloseWithBye() - if err := client2.SendHello(testDefaultUserId + "2"); err != nil { - t.Fatal(err) - } - hello2, err := client2.RunUntilHello(ctx) - if err != nil { - t.Fatal(err) - } +func TestClientControlToRoom(t *testing.T) { + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server - if hello1.Hello.SessionId == hello2.Hello.SessionId { - t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId) - } else if hello1.Hello.UserId == hello2.Hello.UserId { - t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId) - } + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) - // Join room by id. - roomId := "test-room" - if room, err := client1.JoinRoom(ctx, roomId); err != nil { - t.Fatal(err) - } else if room.Room.RoomId != roomId { - t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) - } + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } - // Give message processing some time. - time.Sleep(10 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() - if room, err := client2.JoinRoom(ctx, roomId); err != nil { - t.Fatal(err) - } else if room.Room.RoomId != roomId { - t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) - } + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } - WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2) + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } - recipient := MessageClientMessageRecipient{ - Type: "room", - } + if hello1.Hello.SessionId == hello2.Hello.SessionId { + t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId) + } else if hello1.Hello.UserId == hello2.Hello.UserId { + t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId) + } - data1 := "from-1-to-2" - client1.SendMessage(recipient, data1) // nolint - data2 := "from-2-to-1" - client2.SendMessage(recipient, data2) // nolint + // Join room by id. + roomId := "test-room" + if room, err := client1.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } - var payload string - if err := checkReceiveClientMessage(ctx, client1, "room", hello2.Hello, &payload); err != nil { - t.Error(err) - } else if payload != data2 { - t.Errorf("Expected payload %s, got %s", data2, payload) - } + // Give message processing some time. + time.Sleep(10 * time.Millisecond) - if err := checkReceiveClientMessage(ctx, client2, "room", hello1.Hello, &payload); err != nil { - t.Error(err) - } else if payload != data1 { - t.Errorf("Expected payload %s, got %s", data1, payload) + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2) + + recipient := MessageClientMessageRecipient{ + Type: "room", + } + + data1 := "from-1-to-2" + client1.SendControl(recipient, data1) // nolint + data2 := "from-2-to-1" + client2.SendControl(recipient, data2) // nolint + + var payload string + if err := checkReceiveClientControl(ctx, client1, "room", hello2.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data2 { + t.Errorf("Expected payload %s, got %s", data2, payload) + } + + if err := checkReceiveClientControl(ctx, client2, "room", hello1.Hello, &payload); err != nil { + t.Error(err) + } else if payload != data1 { + t.Errorf("Expected payload %s, got %s", data1, payload) + } + }) } } @@ -2913,187 +3363,206 @@ loop: } func TestClientRequestOfferNotInRoom(t *testing.T) { - hub, _, _, server := CreateHubForTest(t) + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) - mcu, err := NewTestMCU() - if err != nil { - t.Fatal(err) - } else if err := mcu.Start(); err != nil { - t.Fatal(err) - } - defer mcu.Stop() + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } - hub.SetMcu(mcu) + mcu, err := NewTestMCU() + if err != nil { + t.Fatal(err) + } else if err := mcu.Start(); err != nil { + t.Fatal(err) + } + defer mcu.Stop() - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() + hub1.SetMcu(mcu) + hub2.SetMcu(mcu) - client1 := NewTestClient(t, server, hub) - defer client1.CloseWithBye() + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() - if err := client1.SendHello(testDefaultUserId + "1"); err != nil { - t.Fatal(err) - } + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() - hello1, err := client1.RunUntilHello(ctx) - if err != nil { - t.Fatal(err) - } + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } - client2 := NewTestClient(t, server, hub) - defer client2.CloseWithBye() + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } - if err := client2.SendHello(testDefaultUserId + "2"); err != nil { - t.Fatal(err) - } + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() - hello2, err := client2.RunUntilHello(ctx) - if err != nil { - t.Fatal(err) - } + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } - // Join room by id. - roomId := "test-room" - if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1"); err != nil { - t.Fatal(err) - } else if room.Room.RoomId != roomId { - t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) - } + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } - // We will receive a "joined" event. - if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil { - t.Error(err) - } + // Join room by id. + roomId := "test-room" + if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1"); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } - // Client 2 may not request an offer (he is not in the room yet). - if err := client2.SendMessage(MessageClientMessageRecipient{ - Type: "session", - SessionId: hello1.Hello.SessionId, - }, MessageClientMessageData{ - Type: "requestoffer", - Sid: "12345", - RoomType: "screen", - }); err != nil { - t.Fatal(err) - } + // We will receive a "joined" event. + if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil { + t.Error(err) + } - if msg, err := client2.RunUntilMessage(ctx); err != nil { - t.Fatal(err) - } else { - if err := checkMessageError(msg, "not_allowed"); err != nil { - t.Fatal(err) - } - } + // Client 2 may not request an offer (he is not in the room yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } - if room, err := client2.JoinRoom(ctx, roomId); err != nil { - t.Fatal(err) - } else if room.Room.RoomId != roomId { - t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) - } + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } - // We will receive a "joined" event. - if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil { - t.Error(err) - } - if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { - t.Error(err) - } + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } - // Client 2 may not request an offer (he is not in the call yet). - if err := client2.SendMessage(MessageClientMessageRecipient{ - Type: "session", - SessionId: hello1.Hello.SessionId, - }, MessageClientMessageData{ - Type: "requestoffer", - Sid: "12345", - RoomType: "screen", - }); err != nil { - t.Fatal(err) - } + // We will receive a "joined" event. + if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil { + t.Error(err) + } + if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil { + t.Error(err) + } - if msg, err := client2.RunUntilMessage(ctx); err != nil { - t.Fatal(err) - } else { - if err := checkMessageError(msg, "not_allowed"); err != nil { - t.Fatal(err) - } - } + // Client 2 may not request an offer (he is not in the call yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } - // Simulate request from the backend that somebody joined the call. - users1 := []map[string]interface{}{ - { - "sessionId": hello2.Hello.SessionId, - "inCall": 1, - }, - } - room := hub.getRoom(roomId) - if room == nil { - t.Fatalf("Could not find room %s", roomId) - } - room.PublishUsersInCallChanged(users1, users1) - if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil { - t.Error(err) - } - if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil { - t.Error(err) - } + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } - // Client 2 may not request an offer (recipient is not in the call yet). - if err := client2.SendMessage(MessageClientMessageRecipient{ - Type: "session", - SessionId: hello1.Hello.SessionId, - }, MessageClientMessageData{ - Type: "requestoffer", - Sid: "12345", - RoomType: "screen", - }); err != nil { - t.Fatal(err) - } + // Simulate request from the backend that somebody joined the call. + users1 := []map[string]interface{}{ + { + "sessionId": hello2.Hello.SessionId, + "inCall": 1, + }, + } + room2 := hub2.getRoom(roomId) + if room2 == nil { + t.Fatalf("Could not find room %s", roomId) + } + room2.PublishUsersInCallChanged(users1, users1) + if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil { + t.Error(err) + } + if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil { + t.Error(err) + } - if msg, err := client2.RunUntilMessage(ctx); err != nil { - t.Fatal(err) - } else { - if err := checkMessageError(msg, "not_allowed"); err != nil { - t.Fatal(err) - } - } + // Client 2 may not request an offer (recipient is not in the call yet). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } - // Simulate request from the backend that somebody joined the call. - users2 := []map[string]interface{}{ - { - "sessionId": hello1.Hello.SessionId, - "inCall": 1, - }, - } - room.PublishUsersInCallChanged(users2, users2) - if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil { - t.Error(err) - } - if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil { - t.Error(err) - } + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "not_allowed"); err != nil { + t.Fatal(err) + } + } - // Client 2 may request an offer now (both are in the same room and call). - if err := client2.SendMessage(MessageClientMessageRecipient{ - Type: "session", - SessionId: hello1.Hello.SessionId, - }, MessageClientMessageData{ - Type: "requestoffer", - Sid: "12345", - RoomType: "screen", - }); err != nil { - t.Fatal(err) - } + // Simulate request from the backend that somebody joined the call. + users2 := []map[string]interface{}{ + { + "sessionId": hello1.Hello.SessionId, + "inCall": 1, + }, + } + room1 := hub1.getRoom(roomId) + if room1 == nil { + t.Fatalf("Could not find room %s", roomId) + } + room1.PublishUsersInCallChanged(users2, users2) + if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil { + t.Error(err) + } + if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil { + t.Error(err) + } - if msg, err := client2.RunUntilMessage(ctx); err != nil { - t.Fatal(err) - } else { - // We check for "client_not_found" as the testing MCU doesn't support publishing/subscribing. - if err := checkMessageError(msg, "client_not_found"); err != nil { - t.Fatal(err) - } - } + // Client 2 may request an offer now (both are in the same room and call). + if err := client2.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "requestoffer", + Sid: "12345", + RoomType: "screen", + }); err != nil { + t.Fatal(err) + } + if msg, err := client2.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + // We check for "client_not_found" as the testing MCU doesn't support publishing/subscribing. + if err := checkMessageError(msg, "client_not_found"); err != nil { + t.Fatal(err) + } + } + }) + } } func TestNoSendBetweenSessionsOnDifferentBackends(t *testing.T) { @@ -3275,3 +3744,115 @@ func TestNoSameRoomOnDifferentBackends(t *testing.T) { t.Errorf("Expected no payload, got %+v", payload) } } + +func TestClientSendOffer(t *testing.T) { + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) + + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + mcu, err := NewTestMCU() + if err != nil { + t.Fatal(err) + } else if err := mcu.Start(); err != nil { + t.Fatal(err) + } + defer mcu.Stop() + + hub1.SetMcu(mcu) + hub2.SetMcu(mcu) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + + if err := client1.SendHello(testDefaultUserId + "1"); err != nil { + t.Fatal(err) + } + + hello1, err := client1.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + + if err := client2.SendHello(testDefaultUserId + "2"); err != nil { + t.Fatal(err) + } + + hello2, err := client2.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + // Join room by id. + roomId := "test-room" + if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1"); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // Give message processing some time. + time.Sleep(10 * time.Millisecond) + + if room, err := client2.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2) + + if err := client1.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, MessageClientMessageData{ + Type: "offer", + Sid: "12345", + RoomType: "video", + Payload: map[string]interface{}{ + "sdp": MockSdpOfferAudioAndVideo, + }, + }); err != nil { + t.Fatal(err) + } + + if err := client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo); err != nil { + t.Fatal(err) + } + + if err := client1.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello2.Hello.SessionId, + }, MessageClientMessageData{ + Type: "sendoffer", + RoomType: "video", + }); err != nil { + t.Fatal(err) + } + + if msg, err := client1.RunUntilMessage(ctx); err != nil { + t.Fatal(err) + } else { + if err := checkMessageError(msg, "client_not_found"); err != nil { + t.Fatal(err) + } + } + }) + } +} diff --git a/testclient_test.go b/testclient_test.go index 264d818..21b3588 100644 --- a/testclient_test.go +++ b/testclient_test.go @@ -124,14 +124,14 @@ func checkMessageType(message *ServerMessage, expectedType string) error { return nil } -func checkMessageSender(hub *Hub, message *MessageServerMessage, senderType string, hello *HelloServerMessage) error { - if message.Sender.Type != senderType { - return fmt.Errorf("Expected sender type %s, got %s", senderType, message.Sender.SessionId) - } else if message.Sender.SessionId != hello.SessionId { +func checkMessageSender(hub *Hub, sender *MessageServerMessageSender, senderType string, hello *HelloServerMessage) error { + if sender.Type != senderType { + return fmt.Errorf("Expected sender type %s, got %s", senderType, sender.SessionId) + } else if sender.SessionId != hello.SessionId { return fmt.Errorf("Expected session id %+v, got %+v", - getPubliceSessionIdData(hub, hello.SessionId), getPubliceSessionIdData(hub, message.Sender.SessionId)) - } else if message.Sender.UserId != hello.UserId { - return fmt.Errorf("Expected user id %s, got %s", hello.UserId, message.Sender.UserId) + getPubliceSessionIdData(hub, hello.SessionId), getPubliceSessionIdData(hub, sender.SessionId)) + } else if sender.UserId != hello.UserId { + return fmt.Errorf("Expected user id %s, got %s", hello.UserId, sender.UserId) } return nil @@ -143,7 +143,7 @@ func checkReceiveClientMessageWithSender(ctx context.Context, client *TestClient return err } else if err := checkMessageType(message, "message"); err != nil { return err - } else if err := checkMessageSender(client.hub, message.Message, senderType, hello); err != nil { + } else if err := checkMessageSender(client.hub, message.Message.Sender, senderType, hello); err != nil { return err } else { if err := json.Unmarshal(*message.Message.Data, payload); err != nil { @@ -160,6 +160,29 @@ func checkReceiveClientMessage(ctx context.Context, client *TestClient, senderTy return checkReceiveClientMessageWithSender(ctx, client, senderType, hello, payload, nil) } +func checkReceiveClientControlWithSender(ctx context.Context, client *TestClient, senderType string, hello *HelloServerMessage, payload interface{}, sender **MessageServerMessageSender) error { + message, err := client.RunUntilMessage(ctx) + if err := checkUnexpectedClose(err); err != nil { + return err + } else if err := checkMessageType(message, "control"); err != nil { + return err + } else if err := checkMessageSender(client.hub, message.Control.Sender, senderType, hello); err != nil { + return err + } else { + if err := json.Unmarshal(*message.Control.Data, payload); err != nil { + return err + } + } + if sender != nil { + *sender = message.Message.Sender + } + return nil +} + +func checkReceiveClientControl(ctx context.Context, client *TestClient, senderType string, hello *HelloServerMessage, payload interface{}) error { + return checkReceiveClientControlWithSender(ctx, client, senderType, hello, payload, nil) +} + func checkReceiveClientEvent(ctx context.Context, client *TestClient, eventType string, msg **EventServerMessage) error { message, err := client.RunUntilMessage(ctx) if err := checkUnexpectedClose(err); err != nil { @@ -414,6 +437,25 @@ func (c *TestClient) SendMessage(recipient MessageClientMessageRecipient, data i return c.WriteJSON(message) } +func (c *TestClient) SendControl(recipient MessageClientMessageRecipient, data interface{}) error { + payload, err := json.Marshal(data) + if err != nil { + c.t.Fatal(err) + } + + message := &ClientMessage{ + Id: "abcd", + Type: "control", + Control: &ControlClientMessage{ + MessageClientMessage: MessageClientMessage{ + Recipient: recipient, + Data: (*json.RawMessage)(&payload), + }, + }, + } + return c.WriteJSON(message) +} + func (c *TestClient) SetTransientData(key string, value interface{}) error { payload, err := json.Marshal(value) if err != nil { diff --git a/virtualsession_test.go b/virtualsession_test.go index 1c8890e..d8a0ffb 100644 --- a/virtualsession_test.go +++ b/virtualsession_test.go @@ -265,7 +265,7 @@ func TestVirtualSession(t *testing.T) { t.Fatal(err) } else if err := checkMessageType(msg2, "message"); err != nil { t.Fatal(err) - } else if err := checkMessageSender(hub, msg2.Message, "session", hello.Hello); err != nil { + } else if err := checkMessageSender(hub, msg2.Message.Sender, "session", hello.Hello); err != nil { t.Error(err) }