nextcloud-spreed-signaling/hub_test.go

5562 lines
151 KiB
Go
Raw Permalink Normal View History

/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2017 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/ed25519"
"crypto/elliptic"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/base64"
"encoding/json"
"encoding/pem"
2022-07-12 12:01:20 +02:00
"errors"
2022-04-05 12:48:27 +02:00
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/dlintw/goconf"
"github.com/golang-jwt/jwt/v4"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
const (
testDefaultUserId = "test-userid"
authAnonymousUserId = "anonymous-userid"
testTimeout = 10 * time.Second
)
var (
testRoomProperties = []byte("{\"prop1\":\"value1\"}")
)
2022-06-28 17:18:35 +02:00
var (
clusteredTests = []string{
"local",
"clustered",
}
testHelloV2Algorithms = []string{
"RSA",
"ECDSA",
"Ed25519",
"Ed25519_Nextcloud",
}
2022-06-28 17:18:35 +02:00
)
// Only used for testing.
func (h *Hub) getRoom(id string) *Room {
h.ru.RLock()
defer h.ru.RUnlock()
// TODO: The same room might exist on different backends.
for _, room := range h.rooms {
if room.Id() == id {
return room
}
}
return nil
}
2022-06-28 17:18:35 +02:00
func isLocalTest(t *testing.T) bool {
return strings.HasSuffix(t.Name(), "/local")
}
func getTestConfig(server *httptest.Server) (*goconf.ConfigFile, error) {
config := goconf.NewConfigFile()
u, err := url.Parse(server.URL)
if err != nil {
return nil, err
}
config.AddOption("backend", "allowed", u.Host)
if u.Scheme == "http" {
config.AddOption("backend", "allowhttp", "true")
}
config.AddOption("backend", "secret", string(testBackendSecret))
config.AddOption("sessions", "hashkey", "12345678901234567890123456789012")
config.AddOption("sessions", "blockkey", "09876543210987654321098765432109")
config.AddOption("clients", "internalsecret", string(testInternalSecret))
config.AddOption("geoip", "url", "none")
return config, nil
}
func getTestConfigWithMultipleBackends(server *httptest.Server) (*goconf.ConfigFile, error) {
config, err := getTestConfig(server)
if err != nil {
return nil, err
}
config.RemoveOption("backend", "allowed")
config.RemoveOption("backend", "secret")
config.AddOption("backend", "backends", "backend1, backend2")
config.AddOption("backend1", "url", server.URL+"/one")
config.AddOption("backend1", "secret", string(testBackendSecret))
config.AddOption("backend2", "url", server.URL+"/two/")
config.AddOption("backend2", "secret", string(testBackendSecret))
return config, nil
}
func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) {
r := mux.NewRouter()
registerBackendHandler(t, r)
server := httptest.NewServer(r)
t.Cleanup(func() {
server.Close()
})
events := getAsyncEventsForTest(t)
config, err := getConfigFunc(server)
if err != nil {
t.Fatal(err)
}
h, err := NewHub(config, events, nil, nil, nil, r, "no-version")
if err != nil {
t.Fatal(err)
}
b, err := NewBackendServer(config, h, "no-version")
if err != nil {
t.Fatal(err)
}
if err := b.Start(r); err != nil {
t.Fatal(err)
}
go h.Run()
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
WaitForHub(ctx, t, h)
})
return h, events, r, server
}
func CreateHubForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) {
return CreateHubForTestWithConfig(t, getTestConfig)
}
func CreateHubWithMultipleBackendsForTest(t *testing.T) (*Hub, AsyncEvents, *mux.Router, *httptest.Server) {
h, events, r, server := CreateHubForTestWithConfig(t, getTestConfigWithMultipleBackends)
registerBackendHandlerUrl(t, r, "/one")
registerBackendHandlerUrl(t, r, "/two")
return h, events, r, server
}
func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Server) (*goconf.ConfigFile, error)) (*Hub, *Hub, *mux.Router, *mux.Router, *httptest.Server, *httptest.Server) {
2022-06-28 17:18:35 +02:00
r1 := mux.NewRouter()
registerBackendHandler(t, r1)
server1 := httptest.NewServer(r1)
t.Cleanup(func() {
server1.Close()
})
r2 := mux.NewRouter()
registerBackendHandler(t, r2)
server2 := httptest.NewServer(r2)
t.Cleanup(func() {
server2.Close()
})
nats := startLocalNatsServer(t)
grpcServer1, addr1 := NewGrpcServerForTest(t)
grpcServer2, addr2 := NewGrpcServerForTest(t)
events1, err := NewAsyncEvents(nats)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
events1.Close()
})
config1, err := getConfigFunc(server1)
if err != nil {
t.Fatal(err)
}
client1, _ := NewGrpcClientsForTest(t, addr2)
h1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version")
2022-06-28 17:18:35 +02:00
if err != nil {
t.Fatal(err)
}
b1, err := NewBackendServer(config1, h1, "no-version")
if err != nil {
t.Fatal(err)
}
events2, err := NewAsyncEvents(nats)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
events2.Close()
})
config2, err := getConfigFunc(server2)
if err != nil {
t.Fatal(err)
}
client2, _ := NewGrpcClientsForTest(t, addr1)
h2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version")
2022-06-28 17:18:35 +02:00
if err != nil {
t.Fatal(err)
}
b2, err := NewBackendServer(config2, h2, "no-version")
if err != nil {
t.Fatal(err)
}
if err := b1.Start(r1); err != nil {
t.Fatal(err)
}
if err := b2.Start(r2); err != nil {
t.Fatal(err)
}
go h1.Run()
go h2.Run()
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
WaitForHub(ctx, t, h1)
WaitForHub(ctx, t, h2)
})
return h1, h2, r1, r2, server1, server2
2022-06-28 17:18:35 +02:00
}
func CreateClusteredHubsForTest(t *testing.T) (*Hub, *Hub, *httptest.Server, *httptest.Server) {
h1, h2, _, _, server1, server2 := CreateClusteredHubsForTestWithConfig(t, getTestConfig)
return h1, h2, server1, server2
}
func WaitForHub(ctx context.Context, t *testing.T, h *Hub) {
// Wait for any channel messages to be processed.
time.Sleep(10 * time.Millisecond)
h.Stop()
for {
h.mu.Lock()
clients := len(h.clients)
sessions := len(h.sessions)
h.mu.Unlock()
h.ru.Lock()
rooms := len(h.rooms)
h.ru.Unlock()
2023-06-15 13:36:53 +02:00
readActive := h.readPumpActive.Load()
writeActive := h.writePumpActive.Load()
if clients == 0 && rooms == 0 && sessions == 0 && readActive == 0 && writeActive == 0 {
break
}
select {
case <-ctx.Done():
h.mu.Lock()
h.ru.Lock()
dumpGoroutines("", os.Stderr)
t.Errorf("Error waiting for clients %+v / rooms %+v / sessions %+v / %d read / %d write to terminate: %s", h.clients, h.rooms, h.sessions, readActive, writeActive, ctx.Err())
h.ru.Unlock()
h.mu.Unlock()
return
default:
time.Sleep(time.Millisecond)
}
}
}
func validateBackendChecksum(t *testing.T, f func(http.ResponseWriter, *http.Request, *BackendClientRequest) *BackendClientResponse) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
2022-04-05 12:48:27 +02:00
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal("Error reading body: ", err)
}
rnd := r.Header.Get(HeaderBackendSignalingRandom)
checksum := r.Header.Get(HeaderBackendSignalingChecksum)
if rnd == "" || checksum == "" {
t.Fatalf("No checksum headers found in request to %s", r.URL)
}
if verify := CalculateBackendChecksum(rnd, body, testBackendSecret); verify != checksum {
t.Fatalf("Backend checksum verification failed for request to %s", r.URL)
}
var request BackendClientRequest
if err := json.Unmarshal(body, &request); err != nil {
t.Fatal(err)
}
response := f(w, r, &request)
if response == nil {
// Function already returned a response.
return
}
data, err := json.Marshal(response)
if err != nil {
t.Fatal(err)
}
if r.Header.Get("OCS-APIRequest") != "" {
var ocs OcsResponse
ocs.Ocs = &OcsBody{
Meta: OcsMeta{
Status: "ok",
StatusCode: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
Data: (*json.RawMessage)(&data),
}
if data, err = json.Marshal(ocs); err != nil {
t.Fatal(err)
}
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data) // nolint
}
}
func processAuthRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
if request.Type != "auth" || request.Auth == nil {
t.Fatalf("Expected an auth backend request, got %+v", request)
}
var params TestBackendClientAuthParams
if request.Auth.Params != nil && len(*request.Auth.Params) > 0 {
if err := json.Unmarshal(*request.Auth.Params, &params); err != nil {
t.Fatal(err)
}
}
if params.UserId == "" {
params.UserId = testDefaultUserId
} else if params.UserId == authAnonymousUserId {
params.UserId = ""
}
response := &BackendClientResponse{
Type: "auth",
Auth: &BackendClientAuthResponse{
Version: BackendVersion,
UserId: params.UserId,
},
}
userdata := map[string]string{
"displayname": "Displayname " + params.UserId,
}
if data, err := json.Marshal(userdata); err != nil {
t.Fatal(err)
} else {
response.Auth.User = (*json.RawMessage)(&data)
}
return response
}
func processRoomRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
if request.Type != "room" || request.Room == nil {
t.Fatalf("Expected an room backend request, got %+v", request)
}
switch request.Room.RoomId {
case "test-room-slow":
time.Sleep(100 * time.Millisecond)
case "test-room-takeover-room-session":
// Additional checks for testcase "TestClientTakeoverRoomSession"
if request.Room.Action == "leave" && request.Room.UserId == "test-userid1" {
t.Errorf("Should not receive \"leave\" event for first user, received %+v", request.Room)
}
}
// Allow joining any room.
response := &BackendClientResponse{
Type: "room",
Room: &BackendClientRoomResponse{
Version: BackendVersion,
RoomId: request.Room.RoomId,
Properties: (*json.RawMessage)(&testRoomProperties),
},
}
switch request.Room.RoomId {
case "test-room-with-sessiondata":
data := map[string]string{
"userid": "userid-from-sessiondata",
}
tmp, err := json.Marshal(data)
if err != nil {
t.Fatalf("Could not marshal %+v: %s", data, err)
}
response.Room.Session = (*json.RawMessage)(&tmp)
case "test-room-initial-permissions":
permissions := []Permission{PERMISSION_MAY_PUBLISH_AUDIO}
response.Room.Permissions = &permissions
}
return response
}
2022-07-12 12:01:20 +02:00
var (
sessionRequestHander struct {
sync.Mutex
handlers map[*testing.T]func(*BackendClientSessionRequest)
}
)
func setSessionRequestHandler(t *testing.T, f func(*BackendClientSessionRequest)) {
sessionRequestHander.Lock()
defer sessionRequestHander.Unlock()
if sessionRequestHander.handlers == nil {
sessionRequestHander.handlers = make(map[*testing.T]func(*BackendClientSessionRequest))
}
if _, found := sessionRequestHander.handlers[t]; !found {
t.Cleanup(func() {
sessionRequestHander.Lock()
defer sessionRequestHander.Unlock()
delete(sessionRequestHander.handlers, t)
})
}
sessionRequestHander.handlers[t] = f
}
func clearSessionRequestHandler(t *testing.T) { // nolint
sessionRequestHander.Lock()
defer sessionRequestHander.Unlock()
delete(sessionRequestHander.handlers, t)
}
func processSessionRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
if request.Type != "session" || request.Session == nil {
t.Fatalf("Expected an session backend request, got %+v", request)
}
2022-07-12 12:01:20 +02:00
sessionRequestHander.Lock()
defer sessionRequestHander.Unlock()
if f, found := sessionRequestHander.handlers[t]; found {
f(request.Session)
}
response := &BackendClientResponse{
Type: "session",
Session: &BackendClientSessionResponse{
Version: BackendVersion,
RoomId: request.Session.RoomId,
},
}
return response
}
var pingRequests map[*testing.T][]*BackendClientRequest
func getPingRequests(t *testing.T) []*BackendClientRequest {
return pingRequests[t]
}
func clearPingRequests(t *testing.T) {
delete(pingRequests, t)
}
func storePingRequest(t *testing.T, request *BackendClientRequest) {
if entries, found := pingRequests[t]; !found {
if pingRequests == nil {
pingRequests = make(map[*testing.T][]*BackendClientRequest)
}
pingRequests[t] = []*BackendClientRequest{
request,
}
t.Cleanup(func() {
clearPingRequests(t)
})
} else {
pingRequests[t] = append(entries, request)
}
}
func processPingRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
if request.Type != "ping" || request.Ping == nil {
t.Fatalf("Expected an ping backend request, got %+v", request)
}
if request.Ping.RoomId == "test-room-with-sessiondata" {
if entries := request.Ping.Entries; len(entries) != 1 {
t.Errorf("Expected one entry, got %+v", entries)
} else {
if entries[0].UserId != "" {
t.Errorf("Expected empty userid, got %+v", entries[0])
}
}
}
storePingRequest(t, request)
response := &BackendClientResponse{
Type: "ping",
Ping: &BackendClientRingResponse{
Version: BackendVersion,
RoomId: request.Ping.RoomId,
},
}
return response
}
func ensureAuthTokens(t *testing.T) (string, string) {
if privateKey := os.Getenv("PRIVATE_AUTH_TOKEN_" + t.Name()); privateKey != "" {
publicKey := os.Getenv("PUBLIC_AUTH_TOKEN_" + t.Name())
if publicKey == "" {
// should not happen, always both keys are created
t.Fatal("public key is empty")
}
return privateKey, publicKey
}
var private []byte
var public []byte
if strings.Contains(t.Name(), "ECDSA") {
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
t.Fatal(err)
}
private, err = x509.MarshalECPrivateKey(key)
if err != nil {
t.Fatal(err)
}
private = pem.EncodeToMemory(&pem.Block{
Type: "ECDSA PRIVATE KEY",
Bytes: private,
})
public, err = x509.MarshalPKIXPublicKey(&key.PublicKey)
if err != nil {
t.Fatal(err)
}
public = pem.EncodeToMemory(&pem.Block{
Type: "ECDSA PUBLIC KEY",
Bytes: public,
})
} else if strings.Contains(t.Name(), "Ed25519") {
publicKey, privateKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatal(err)
}
private, err = x509.MarshalPKCS8PrivateKey(privateKey)
if err != nil {
t.Fatal(err)
}
private = pem.EncodeToMemory(&pem.Block{
Type: "Ed25519 PRIVATE KEY",
Bytes: private,
})
public, err = x509.MarshalPKIXPublicKey(publicKey)
if err != nil {
t.Fatal(err)
}
public = pem.EncodeToMemory(&pem.Block{
Type: "Ed25519 PUBLIC KEY",
Bytes: public,
})
} else {
key, err := rsa.GenerateKey(rand.Reader, 1024)
if err != nil {
t.Fatal(err)
}
private = pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(key),
})
public, err = x509.MarshalPKIXPublicKey(&key.PublicKey)
if err != nil {
t.Fatal(err)
}
public = pem.EncodeToMemory(&pem.Block{
Type: "RSA PUBLIC KEY",
Bytes: public,
})
}
privateKey := base64.StdEncoding.EncodeToString(private)
t.Setenv("PRIVATE_AUTH_TOKEN_"+t.Name(), privateKey)
publicKey := base64.StdEncoding.EncodeToString(public)
t.Setenv("PUBLIC_AUTH_TOKEN_"+t.Name(), publicKey)
return privateKey, publicKey
}
func getPrivateAuthToken(t *testing.T) (key interface{}) {
private, _ := ensureAuthTokens(t)
data, err := base64.StdEncoding.DecodeString(private)
if err != nil {
t.Fatal(err)
}
if strings.Contains(t.Name(), "ECDSA") {
key, err = jwt.ParseECPrivateKeyFromPEM(data)
} else if strings.Contains(t.Name(), "Ed25519") {
key, err = jwt.ParseEdPrivateKeyFromPEM(data)
} else {
key, err = jwt.ParseRSAPrivateKeyFromPEM(data)
}
if err != nil {
t.Fatal(err)
}
return key
}
func getPublicAuthToken(t *testing.T) (key interface{}) {
_, public := ensureAuthTokens(t)
data, err := base64.StdEncoding.DecodeString(public)
if err != nil {
t.Fatal(err)
}
if strings.Contains(t.Name(), "ECDSA") {
key, err = jwt.ParseECPublicKeyFromPEM(data)
} else if strings.Contains(t.Name(), "Ed25519") {
key, err = jwt.ParseEdPublicKeyFromPEM(data)
} else {
key, err = jwt.ParseRSAPublicKeyFromPEM(data)
}
if err != nil {
t.Fatal(err)
}
return key
}
func registerBackendHandler(t *testing.T, router *mux.Router) {
registerBackendHandlerUrl(t, router, "/")
}
func registerBackendHandlerUrl(t *testing.T, router *mux.Router, url string) {
handleFunc := validateBackendChecksum(t, func(w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse {
switch request.Type {
case "auth":
return processAuthRequest(t, w, r, request)
case "room":
return processRoomRequest(t, w, r, request)
case "session":
return processSessionRequest(t, w, r, request)
case "ping":
return processPingRequest(t, w, r, request)
default:
t.Fatalf("Unsupported request received: %+v", request)
return nil
}
})
router.HandleFunc(url, handleFunc)
if !strings.HasSuffix(url, "/") {
url += "/"
}
handleCapabilitiesFunc := func(w http.ResponseWriter, r *http.Request) {
features := []string{
"foo",
"bar",
}
if strings.Contains(t.Name(), "V3Api") {
features = append(features, "signaling-v3")
}
signaling := map[string]interface{}{
"foo": "bar",
"baz": 42,
}
config := map[string]interface{}{
"signaling": signaling,
}
if strings.Contains(t.Name(), "MultiRoom") {
signaling[ConfigKeySessionPingLimit] = 2
}
useV2 := true
if os.Getenv("SKIP_V2_CAPABILITIES") != "" {
useV2 = false
}
if strings.Contains(t.Name(), "V2") && useV2 {
key := getPublicAuthToken(t)
public, err := x509.MarshalPKIXPublicKey(key)
if err != nil {
t.Fatal(err)
}
var pemType string
if strings.Contains(t.Name(), "ECDSA") {
pemType = "ECDSA PUBLIC KEY"
} else if strings.Contains(t.Name(), "Ed25519") {
pemType = "Ed25519 PUBLIC KEY"
} else {
pemType = "RSA PUBLIC KEY"
}
public = pem.EncodeToMemory(&pem.Block{
Type: pemType,
Bytes: public,
})
if strings.Contains(t.Name(), "Ed25519_Nextcloud") {
// Simulate Nextcloud which returns the Ed25519 key as base64-encoded data.
encoded := base64.StdEncoding.EncodeToString(key.(ed25519.PublicKey))
signaling[ConfigKeyHelloV2TokenKey] = encoded
} else {
signaling[ConfigKeyHelloV2TokenKey] = string(public)
}
}
2022-07-04 13:53:02 +02:00
spreedCapa, _ := json.Marshal(map[string]interface{}{
"features": features,
"config": config,
})
response := &CapabilitiesResponse{
Version: CapabilitiesVersion{
Major: 20,
},
2022-07-04 13:53:02 +02:00
Capabilities: map[string]*json.RawMessage{
"spreed": (*json.RawMessage)(&spreedCapa),
},
}
data, err := json.Marshal(response)
if err != nil {
t.Errorf("Could not marshal %+v: %s", response, err)
}
var ocs OcsResponse
ocs.Ocs = &OcsBody{
Meta: OcsMeta{
Status: "ok",
StatusCode: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
Data: (*json.RawMessage)(&data),
}
if data, err = json.Marshal(ocs); err != nil {
t.Fatal(err)
}
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data) // nolint
}
router.HandleFunc(url+"ocs/v2.php/cloud/capabilities", handleCapabilitiesFunc)
if strings.Contains(t.Name(), "V3Api") {
router.HandleFunc(url+"ocs/v2.php/apps/spreed/api/v3/signaling/backend", handleFunc)
} else {
router.HandleFunc(url+"ocs/v2.php/apps/spreed/api/v1/signaling/backend", handleFunc)
}
}
func performHousekeeping(hub *Hub, now time.Time) *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
go func() {
hub.performHousekeeping(now)
wg.Done()
}()
return &wg
}
func TestInitialWelcome(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewTestClientContext(ctx, t, server, hub)
defer client.CloseWithBye()
msg, err := client.RunUntilMessage(ctx)
if err != nil {
t.Fatal(err)
}
if msg.Type != "welcome" {
t.Errorf("Expected \"welcome\" message, got %+v", msg)
} else if msg.Welcome.Version == "" {
t.Errorf("Expected welcome version, got %+v", msg)
} else if len(msg.Welcome.Features) == 0 {
t.Errorf("Expected welcome features, got %+v", msg)
}
}
func TestExpectClientHello(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
// The server will send an error and close the connection if no "Hello"
// is sent.
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
// Perform housekeeping in the future, this will cause the connection to
// be terminated due to the missing "Hello" request.
performHousekeeping(hub, time.Now().Add(initialHelloTimeout+time.Second))
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
message, err := client.RunUntilMessage(ctx)
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
message2, err := client.RunUntilMessage(ctx)
if message2 != nil {
t.Fatalf("Received multiple messages, already have %+v, also got %+v", message, message2)
}
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
if err := checkMessageType(message, "bye"); err != nil {
t.Error(err)
} else if message.Bye.Reason != "hello_timeout" {
t.Errorf("Expected \"hello_timeout\" reason, got %+v", message.Bye)
}
}
func TestExpectClientHelloUnsupportedVersion(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
params := TestBackendClientAuthParams{
UserId: testDefaultUserId,
}
if err := client.SendHelloParams(server.URL, "0.0", "", nil, params); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
message, err := client.RunUntilMessage(ctx)
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
if err := checkMessageType(message, "error"); err != nil {
t.Error(err)
} else if message.Error.Code != "invalid_hello_version" {
t.Errorf("Expected \"invalid_hello_version\" reason, got %+v", message.Error)
}
}
func TestClientHelloV1(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
if hello, err := client.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
}
}
func TestClientHelloV2(t *testing.T) {
for _, algo := range testHelloV2Algorithms {
t.Run(algo, func(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloV2(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
data := hub.decodeSessionId(hello.Hello.SessionId, publicSessionName)
if data == nil {
t.Fatalf("Could not decode session id: %s", hello.Hello.SessionId)
}
hub.mu.RLock()
session := hub.sessions[data.Sid]
hub.mu.RUnlock()
if session == nil {
t.Fatalf("Could not get session for id %+v", data)
}
var userdata map[string]string
if err := json.Unmarshal(*session.UserData(), &userdata); err != nil {
t.Fatal(err)
}
if expected := "Displayname " + testDefaultUserId; userdata["displayname"] != expected {
t.Errorf("Expected displayname %s, got %s", expected, userdata["displayname"])
}
})
}
}
func TestClientHelloV2_IssuedInFuture(t *testing.T) {
for _, algo := range testHelloV2Algorithms {
t.Run(algo, func(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
issuedAt := time.Now().Add(time.Minute)
expiresAt := issuedAt.Add(time.Second)
if err := client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, expiresAt); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
message, err := client.RunUntilMessage(ctx)
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
if err := checkMessageType(message, "error"); err != nil {
t.Error(err)
} else if message.Error.Code != "token_not_valid_yet" {
t.Errorf("Expected \"token_not_valid_yet\" reason, got %+v", message.Error)
}
})
}
}
func TestClientHelloV2_Expired(t *testing.T) {
for _, algo := range testHelloV2Algorithms {
t.Run(algo, func(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
issuedAt := time.Now().Add(-time.Minute)
if err := client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, issuedAt.Add(time.Second)); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
message, err := client.RunUntilMessage(ctx)
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
if err := checkMessageType(message, "error"); err != nil {
t.Error(err)
} else if message.Error.Code != "token_expired" {
t.Errorf("Expected \"token_expired\" reason, got %+v", message.Error)
}
})
}
}
func TestClientHelloV2_IssuedAtMissing(t *testing.T) {
for _, algo := range testHelloV2Algorithms {
t.Run(algo, func(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
var issuedAt time.Time
expiresAt := time.Now().Add(time.Minute)
if err := client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, expiresAt); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
message, err := client.RunUntilMessage(ctx)
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
if err := checkMessageType(message, "error"); err != nil {
t.Error(err)
} else if message.Error.Code != "token_not_valid_yet" {
t.Errorf("Expected \"token_not_valid_yet\" reason, got %+v", message.Error)
}
})
}
}
func TestClientHelloV2_ExpiresAtMissing(t *testing.T) {
for _, algo := range testHelloV2Algorithms {
t.Run(algo, func(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
issuedAt := time.Now().Add(-time.Minute)
var expiresAt time.Time
if err := client.SendHelloV2WithTimes(testDefaultUserId, issuedAt, expiresAt); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
message, err := client.RunUntilMessage(ctx)
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
if err := checkMessageType(message, "error"); err != nil {
t.Error(err)
} else if message.Error.Code != "token_expired" {
t.Errorf("Expected \"token_expired\" reason, got %+v", message.Error)
}
})
}
}
func TestClientHelloV2_CachedCapabilities(t *testing.T) {
for _, algo := range testHelloV2Algorithms {
t.Run(algo, func(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
// Simulate old-style Nextcloud without capabilities for Hello V2.
t.Setenv("SKIP_V2_CAPABILITIES", "1")
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHelloV1(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.UserId != testDefaultUserId+"1" {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId+"1", hello1.Hello)
}
if hello1.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello1.Hello)
}
// Simulate updated Nextcloud with capabilities for Hello V2.
t.Setenv("SKIP_V2_CAPABILITIES", "")
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHelloV2(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello2.Hello.UserId != testDefaultUserId+"2" {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId+"2", hello2.Hello)
}
if hello2.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello2.Hello)
}
})
}
}
func TestClientHelloWithSpaces(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
userId := "test user with spaces"
if err := client.SendHello(userId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
if hello, err := client.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != userId {
t.Errorf("Expected \"%s\", got %+v", userId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
}
}
func TestClientHelloAllowAll(t *testing.T) {
hub, _, _, server := CreateHubForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
config, err := getTestConfig(server)
if err != nil {
return nil, err
}
config.RemoveOption("backend", "allowed")
config.AddOption("backend", "allowall", "true")
return config, nil
})
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
if hello, err := client.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
}
}
func TestClientHelloSessionLimit(t *testing.T) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
var router1 *mux.Router
hub1, _, router1, server1 = CreateHubForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
config, err := getTestConfig(server)
if err != nil {
return nil, err
}
config.RemoveOption("backend", "allowed")
config.RemoveOption("backend", "secret")
config.AddOption("backend", "backends", "backend1, backend2")
config.AddOption("backend1", "url", server.URL+"/one")
config.AddOption("backend1", "secret", string(testBackendSecret))
config.AddOption("backend1", "sessionlimit", "1")
config.AddOption("backend2", "url", server.URL+"/two")
config.AddOption("backend2", "secret", string(testBackendSecret))
return config, nil
})
registerBackendHandlerUrl(t, router1, "/one")
registerBackendHandlerUrl(t, router1, "/two")
hub2 = hub1
server2 = server1
} else {
var router1 *mux.Router
var router2 *mux.Router
hub1, hub2, router1, router2, server1, server2 = CreateClusteredHubsForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
// Make sure all backends use the same server
if server1 == nil {
server1 = server
} else {
server = server1
}
config, err := getTestConfig(server)
if err != nil {
return nil, err
}
config.RemoveOption("backend", "allowed")
config.RemoveOption("backend", "secret")
config.AddOption("backend", "backends", "backend1, backend2")
config.AddOption("backend1", "url", server.URL+"/one")
config.AddOption("backend1", "secret", string(testBackendSecret))
config.AddOption("backend1", "sessionlimit", "1")
config.AddOption("backend2", "url", server.URL+"/two")
config.AddOption("backend2", "secret", string(testBackendSecret))
return config, nil
})
registerBackendHandlerUrl(t, router1, "/one")
registerBackendHandlerUrl(t, router1, "/two")
registerBackendHandlerUrl(t, router2, "/one")
registerBackendHandlerUrl(t, router2, "/two")
}
client := NewTestClient(t, server1, hub1)
defer client.CloseWithBye()
params1 := TestBackendClientAuthParams{
UserId: testDefaultUserId,
}
if err := client.SendHelloParams(server1.URL+"/one", HelloVersionV1, "client", nil, params1); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
if hello, err := client.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
}
// The second client can't connect as it would exceed the session limit.
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
params2 := TestBackendClientAuthParams{
UserId: testDefaultUserId + "2",
}
if err := client2.SendHelloParams(server1.URL+"/one", HelloVersionV1, "client", nil, params2); err != nil {
t.Fatal(err)
}
msg, err := client2.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
} else {
if msg.Type != "error" || msg.Error == nil {
t.Errorf("Expected error message, got %+v", msg)
} else if msg.Error.Code != "session_limit_exceeded" {
t.Errorf("Expected error \"session_limit_exceeded\", got %+v", msg.Error.Code)
}
}
// The client can connect to a different backend.
if err := client2.SendHelloParams(server1.URL+"/two", HelloVersionV1, "client", nil, params2); err != nil {
t.Fatal(err)
}
if hello, err := client2.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId+"2" {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId+"2", hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
}
// If the first client disconnects (and releases the session), a new one can connect.
client.CloseWithBye()
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
client3 := NewTestClient(t, server2, hub2)
defer client3.CloseWithBye()
params3 := TestBackendClientAuthParams{
UserId: testDefaultUserId + "3",
}
if err := client3.SendHelloParams(server1.URL+"/one", HelloVersionV1, "client", nil, params3); err != nil {
t.Fatal(err)
}
if hello, err := client3.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId+"3" {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId+"3", hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
}
})
}
}
func TestSessionIdsUnordered(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
publicSessionIds := make([]string, 0)
for i := 0; i < 20; i++ {
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
if hello, err := client.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
break
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
break
}
data := hub.decodeSessionId(hello.Hello.SessionId, publicSessionName)
if data == nil {
t.Errorf("Could not decode session id: %s", hello.Hello.SessionId)
break
}
hub.mu.RLock()
session := hub.sessions[data.Sid]
hub.mu.RUnlock()
if session == nil {
t.Errorf("Could not get session for id %+v", data)
break
}
publicSessionIds = append(publicSessionIds, session.PublicId())
}
}
if len(publicSessionIds) == 0 {
t.Fatal("no session ids decoded")
}
larger := 0
smaller := 0
prevSid := ""
for i, sid := range publicSessionIds {
if i > 0 {
if sid > prevSid {
larger++
} else if sid < prevSid {
smaller--
} else {
t.Error("should not have received the same session id twice")
}
}
prevSid = sid
}
// Public session ids should not be ordered.
if len(publicSessionIds) == larger {
t.Error("the session ids are all larger than the previous ones")
} else if len(publicSessionIds) == smaller {
t.Error("the session ids are all smaller than the previous ones")
}
}
func TestClientHelloResume(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
client.Close()
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
client = NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloResume(hello.Hello.ResumeId); err != nil {
t.Fatal(err)
}
hello2, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello2.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello2.Hello)
}
if hello2.Hello.SessionId != hello.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello.Hello.SessionId, hello2.Hello)
}
if hello2.Hello.ResumeId != hello.Hello.ResumeId {
t.Errorf("Expected resume id %s, got %+v", hello.Hello.ResumeId, hello2.Hello)
}
}
}
func TestClientHelloResumeExpired(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
client.Close()
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
// Perform housekeeping in the future, this will cause the session to be
// cleaned up after it is expired.
performHousekeeping(hub, time.Now().Add(sessionExpireDuration+time.Second)).Wait()
client = NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloResume(hello.Hello.ResumeId); err != nil {
t.Fatal(err)
}
msg, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
} else {
if msg.Type != "error" || msg.Error == nil {
t.Errorf("Expected error message, got %+v", msg)
} else if msg.Error.Code != "no_such_session" {
t.Errorf("Expected error \"no_such_session\", got %+v", msg.Error.Code)
}
}
}
func TestClientHelloResumeTakeover(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client1.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHelloResume(hello.Hello.ResumeId); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello2.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello2.Hello)
}
if hello2.Hello.SessionId != hello.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello.Hello.SessionId, hello2.Hello)
}
if hello2.Hello.ResumeId != hello.Hello.ResumeId {
t.Errorf("Expected resume id %s, got %+v", hello.Hello.ResumeId, hello2.Hello)
}
}
// The first client got disconnected with a reason in a "Bye" message.
msg, err := client1.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
} else {
if msg.Type != "bye" || msg.Bye == nil {
t.Errorf("Expected bye message, got %+v", msg)
} else if msg.Bye.Reason != "session_resumed" {
t.Errorf("Expected reason \"session_resumed\", got %+v", msg.Bye.Reason)
}
}
if msg, err := client1.RunUntilMessage(ctx); err == nil {
t.Errorf("Expected error but received %+v", msg)
} else if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
t.Errorf("Expected close error but received %+v", err)
}
}
func TestClientHelloResumeOtherHub(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
client.Close()
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
// Simulate a restart of the hub.
2023-06-15 13:36:53 +02:00
hub.sid.Store(0)
sessions := make([]Session, 0)
hub.mu.Lock()
for _, session := range hub.sessions {
sessions = append(sessions, session)
}
hub.mu.Unlock()
for _, session := range sessions {
session.Close()
}
hub.mu.Lock()
count := len(hub.sessions)
hub.mu.Unlock()
if count > 0 {
t.Errorf("Should have removed all sessions (still has %d)", count)
}
// The new client will get the same (internal) sid for his session.
newClient := NewTestClient(t, server, hub)
defer newClient.CloseWithBye()
if err := newClient.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
if hello, err := newClient.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
// The previous session (which had the same internal sid) can't be resumed.
client = NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloResume(hello.Hello.ResumeId); err != nil {
t.Fatal(err)
}
msg, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
} else {
if msg.Type != "error" || msg.Error == nil {
t.Errorf("Expected error message, got %+v", msg)
} else if msg.Error.Code != "no_such_session" {
t.Errorf("Expected error \"no_such_session\", got %+v", msg.Error.Code)
}
}
// Expire old sessions
hub.performHousekeeping(time.Now().Add(2 * sessionExpireDuration))
}
func TestClientHelloResumePublicId(t *testing.T) {
// Test that a client can't resume a "public" session of another user.
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
}
recipient2 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}
data := "from-1-to-2"
client1.SendMessage(recipient2, data) // nolint
var payload string
var sender *MessageServerMessageSender
if err := checkReceiveClientMessageWithSender(ctx, client2, "session", hello1.Hello, &payload, &sender); err != nil {
t.Error(err)
} else if payload != data {
t.Errorf("Expected payload %s, got %s", data, payload)
}
client1.Close()
if err := client1.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
client1 = NewTestClient(t, server, hub)
defer client1.CloseWithBye()
// Can't resume a session with the id received from messages of a client.
if err := client1.SendHelloResume(sender.SessionId); err != nil {
t.Fatal(err)
}
msg, err := client1.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
} else {
if msg.Type != "error" || msg.Error == nil {
t.Errorf("Expected error message, got %+v", msg)
} else if msg.Error.Code != "no_such_session" {
t.Errorf("Expected error \"no_such_session\", got %+v", msg.Error.Code)
}
}
// Expire old sessions
hub.performHousekeeping(time.Now().Add(2 * sessionExpireDuration))
}
func TestClientHelloByeResume(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
if err := client.SendBye(); err != nil {
t.Fatal(err)
}
if message, err := client.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else {
if err := checkMessageType(message, "bye"); err != nil {
t.Error(err)
}
}
client.Close()
if err := client.WaitForSessionRemoved(ctx, hello.Hello.SessionId); err != nil {
t.Error(err)
}
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
client = NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloResume(hello.Hello.ResumeId); err != nil {
t.Fatal(err)
}
msg, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
} else {
if msg.Type != "error" || msg.Error == nil {
t.Errorf("Expected \"error\", got %+v", *msg)
} else if msg.Error.Code != "no_such_session" {
t.Errorf("Expected error \"no_such_session\", got %+v", *msg)
}
}
}
func TestClientHelloResumeAndJoin(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
client.Close()
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
client = NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloResume(hello.Hello.ResumeId); err != nil {
t.Fatal(err)
}
hello2, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello2.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello2.Hello)
}
if hello2.Hello.SessionId != hello.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello.Hello.SessionId, hello2.Hello)
}
if hello2.Hello.ResumeId != hello.Hello.ResumeId {
t.Errorf("Expected resume id %s, got %+v", hello.Hello.ResumeId, hello2.Hello)
}
}
// Join room by id.
roomId := "test-room"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
}
func TestClientHelloClient(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloClient(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
if hello, err := client.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
}
func TestClientHelloClient_V3Api(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
params := TestBackendClientAuthParams{
UserId: testDefaultUserId,
}
// The "/api/v1/signaling/" URL will be changed to use "v3" as the "signaling-v3"
// feature is returned by the capabilities endpoint.
if err := client.SendHelloParams(server.URL+"/ocs/v2.php/apps/spreed/api/v1/signaling/backend", HelloVersionV1, "client", nil, params); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
if hello, err := client.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
}
func TestClientHelloInternal(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHelloInternal(); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
if hello, err := client.RunUntilHello(ctx); err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != "" {
t.Errorf("Expected empty user id, got %+v", hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
}
func TestClientMessageToSessionId(t *testing.T) {
2022-06-28 17:18:35 +02:00
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
}
recipient1 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}
recipient2 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}
data1 := "from-1-to-2"
client1.SendMessage(recipient2, data1) // nolint
data2 := "from-2-to-1"
client2.SendMessage(recipient1, data2) // nolint
var payload string
if err := checkReceiveClientMessage(ctx, client1, "session", hello2.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data2 {
t.Errorf("Expected payload %s, got %s", data2, payload)
}
if err := checkReceiveClientMessage(ctx, client2, "session", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data1 {
t.Errorf("Expected payload %s, got %s", data1, payload)
}
})
}
}
func TestClientControlToSessionId(t *testing.T) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
}
recipient1 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}
recipient2 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}
data1 := "from-1-to-2"
client1.SendControl(recipient2, data1) // nolint
data2 := "from-2-to-1"
client2.SendControl(recipient1, data2) // nolint
var payload string
if err := checkReceiveClientControl(ctx, client1, "session", hello2.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data2 {
t.Errorf("Expected payload %s, got %s", data2, payload)
}
if err := checkReceiveClientControl(ctx, client2, "session", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data1 {
t.Errorf("Expected payload %s, got %s", data1, payload)
}
})
}
}
func TestClientControlMissingPermissions(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
}
2022-06-28 17:18:35 +02:00
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
if session1 == nil {
t.Fatalf("Session %s does not exist", hello1.Hello.SessionId)
}
session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
if session2 == nil {
t.Fatalf("Session %s does not exist", hello2.Hello.SessionId)
}
// Client 1 may not send control messages (will be ignored).
session1.SetPermissions([]Permission{
PERMISSION_MAY_PUBLISH_AUDIO,
PERMISSION_MAY_PUBLISH_VIDEO,
})
// Client 2 may send control messages.
session2.SetPermissions([]Permission{
PERMISSION_MAY_PUBLISH_AUDIO,
PERMISSION_MAY_PUBLISH_VIDEO,
PERMISSION_MAY_CONTROL,
})
recipient1 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}
recipient2 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}
data1 := "from-1-to-2"
2022-06-28 17:18:35 +02:00
client1.SendControl(recipient2, data1) // nolint
data2 := "from-2-to-1"
2022-06-28 17:18:35 +02:00
client2.SendControl(recipient1, data2) // nolint
var payload string
2022-06-28 17:18:35 +02:00
if err := checkReceiveClientControl(ctx, client1, "session", hello2.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data2 {
t.Errorf("Expected payload %s, got %s", data2, payload)
}
2022-06-28 17:18:35 +02:00
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
if err := checkReceiveClientMessage(ctx2, client2, "session", hello1.Hello, &payload); err != nil {
if err != ErrNoMessageReceived {
t.Error(err)
}
} else {
t.Errorf("Expected no payload, got %+v", payload)
}
}
func TestClientMessageToUserId(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
} else if hello1.Hello.UserId == hello2.Hello.UserId {
t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId)
}
recipient1 := MessageClientMessageRecipient{
Type: "user",
UserId: hello1.Hello.UserId,
}
recipient2 := MessageClientMessageRecipient{
Type: "user",
UserId: hello2.Hello.UserId,
}
data1 := "from-1-to-2"
client1.SendMessage(recipient2, data1) // nolint
data2 := "from-2-to-1"
client2.SendMessage(recipient1, data2) // nolint
var payload string
if err := checkReceiveClientMessage(ctx, client1, "user", hello2.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data2 {
t.Errorf("Expected payload %s, got %s", data2, payload)
}
if err := checkReceiveClientMessage(ctx, client2, "user", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data1 {
t.Errorf("Expected payload %s, got %s", data1, payload)
}
}
2022-06-28 17:18:35 +02:00
func TestClientControlToUserId(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
} else if hello1.Hello.UserId == hello2.Hello.UserId {
t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId)
}
recipient1 := MessageClientMessageRecipient{
Type: "user",
UserId: hello1.Hello.UserId,
}
recipient2 := MessageClientMessageRecipient{
Type: "user",
UserId: hello2.Hello.UserId,
}
data1 := "from-1-to-2"
client1.SendControl(recipient2, data1) // nolint
data2 := "from-2-to-1"
client2.SendControl(recipient1, data2) // nolint
var payload string
if err := checkReceiveClientControl(ctx, client1, "user", hello2.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data2 {
t.Errorf("Expected payload %s, got %s", data2, payload)
}
if err := checkReceiveClientControl(ctx, client2, "user", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data1 {
t.Errorf("Expected payload %s, got %s", data1, payload)
}
}
func TestClientMessageToUserIdMultipleSessions(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2a := NewTestClient(t, server, hub)
defer client2a.CloseWithBye()
if err := client2a.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
client2b := NewTestClient(t, server, hub)
defer client2b.CloseWithBye()
if err := client2b.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2a, err := client2a.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2b, err := client2b.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2a.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
} else if hello1.Hello.SessionId == hello2b.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
} else if hello2a.Hello.SessionId == hello2b.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello2a.Hello.SessionId)
}
if hello1.Hello.UserId == hello2a.Hello.UserId {
t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId)
} else if hello1.Hello.UserId == hello2b.Hello.UserId {
t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId)
} else if hello2a.Hello.UserId != hello2b.Hello.UserId {
t.Fatalf("Expected the same user ids, got %s and %s", hello2a.Hello.UserId, hello2b.Hello.UserId)
}
recipient := MessageClientMessageRecipient{
Type: "user",
UserId: hello2a.Hello.UserId,
}
data1 := "from-1-to-2"
client1.SendMessage(recipient, data1) // nolint
// Both clients will receive the message as it was sent to the user.
var payload string
if err := checkReceiveClientMessage(ctx, client2a, "user", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data1 {
t.Errorf("Expected payload %s, got %s", data1, payload)
}
if err := checkReceiveClientMessage(ctx, client2b, "user", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data1 {
t.Errorf("Expected payload %s, got %s", data1, payload)
}
}
func WaitForUsersJoined(ctx context.Context, t *testing.T, client1 *TestClient, hello1 *ServerMessage, client2 *TestClient, hello2 *ServerMessage) {
// We will receive "joined" events for all clients. The ordering is not
// defined as messages are processed and sent by asynchronous event handlers.
if err := client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
}
func TestClientMessageToRoom(t *testing.T) {
2022-06-28 17:18:35 +02:00
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
2022-06-28 17:18:35 +02:00
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
2022-06-28 17:18:35 +02:00
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
} else if hello1.Hello.UserId == hello2.Hello.UserId {
t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId)
}
2022-06-28 17:18:35 +02:00
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
2022-06-28 17:18:35 +02:00
// Give message processing some time.
time.Sleep(10 * time.Millisecond)
2022-06-28 17:18:35 +02:00
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
2022-06-28 17:18:35 +02:00
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
2022-06-28 17:18:35 +02:00
recipient := MessageClientMessageRecipient{
Type: "room",
}
2022-06-28 17:18:35 +02:00
data1 := "from-1-to-2"
client1.SendMessage(recipient, data1) // nolint
data2 := "from-2-to-1"
client2.SendMessage(recipient, data2) // nolint
2022-06-28 17:18:35 +02:00
var payload string
if err := checkReceiveClientMessage(ctx, client1, "room", hello2.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data2 {
t.Errorf("Expected payload %s, got %s", data2, payload)
}
if err := checkReceiveClientMessage(ctx, client2, "room", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data1 {
t.Errorf("Expected payload %s, got %s", data1, payload)
}
})
}
2022-06-28 17:18:35 +02:00
}
2022-06-28 17:18:35 +02:00
func TestClientControlToRoom(t *testing.T) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
} else if hello1.Hello.UserId == hello2.Hello.UserId {
t.Fatalf("Expected different user ids, got %s twice", hello1.Hello.UserId)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// Give message processing some time.
time.Sleep(10 * time.Millisecond)
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
recipient := MessageClientMessageRecipient{
Type: "room",
}
data1 := "from-1-to-2"
client1.SendControl(recipient, data1) // nolint
data2 := "from-2-to-1"
client2.SendControl(recipient, data2) // nolint
var payload string
if err := checkReceiveClientControl(ctx, client1, "room", hello2.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data2 {
t.Errorf("Expected payload %s, got %s", data2, payload)
}
if err := checkReceiveClientControl(ctx, client2, "room", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if payload != data1 {
t.Errorf("Expected payload %s, got %s", data1, payload)
}
})
}
}
func TestJoinRoom(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event.
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
// Leave room.
if room, err := client.JoinRoom(ctx, ""); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != "" {
t.Fatalf("Expected empty room, got %s", room.Room.RoomId)
}
}
func TestJoinRoomTwice(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
} else if !bytes.Equal(testRoomProperties, *room.Room.Properties) {
t.Fatalf("Expected room properties %s, got %s", string(testRoomProperties), string(*room.Room.Properties))
}
// We will receive a "joined" event.
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
msg := &ClientMessage{
Id: "ABCD",
Type: "room",
Room: &RoomClientMessage{
RoomId: roomId,
SessionId: roomId + "-" + client.publicId + "-2",
},
}
if err := client.WriteJSON(msg); err != nil {
t.Fatal(err)
}
message, err := client.RunUntilMessage(ctx)
if err != nil {
t.Fatal(err)
}
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
if msg.Id != message.Id {
t.Errorf("expected message id %s, got %s", msg.Id, message.Id)
} else if err := checkMessageType(message, "error"); err != nil {
t.Fatal(err)
} else if expected := "already_joined"; message.Error.Code != expected {
t.Errorf("expected error %s, got %s", expected, message.Error.Code)
} else if message.Error.Details == nil {
t.Fatal("expected error details")
}
var roomMsg RoomErrorDetails
if err := json.Unmarshal(message.Error.Details, &roomMsg); err != nil {
t.Fatal(err)
} else if roomMsg.Room == nil {
t.Fatalf("expected room details, got %+v", message)
}
if roomMsg.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %+v", roomId, roomMsg.Room)
} else if !bytes.Equal(testRoomProperties, *roomMsg.Room.Properties) {
t.Fatalf("Expected room properties %s, got %s", string(testRoomProperties), string(*roomMsg.Room.Properties))
}
}
func TestExpectAnonymousJoinRoom(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(authAnonymousUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != "" {
t.Errorf("Expected an anonymous user, got %+v", hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
// Perform housekeeping in the future, this will cause the connection to
// be terminated because the anonymous client didn't join a room.
performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second))
message, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
}
if err := checkMessageType(message, "bye"); err != nil {
t.Error(err)
} else if message.Bye.Reason != "room_join_timeout" {
t.Errorf("Expected \"room_join_timeout\" reason, got %+v", message.Bye)
}
// Both the client and the session get removed from the hub.
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
if err := client.WaitForSessionRemoved(ctx, hello.Hello.SessionId); err != nil {
t.Error(err)
}
}
func TestExpectAnonymousJoinRoomAfterLeave(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(authAnonymousUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello.Hello.UserId != "" {
t.Errorf("Expected an anonymous user, got %+v", hello.Hello)
}
if hello.Hello.SessionId == "" {
t.Errorf("Expected session id, got %+v", hello.Hello)
}
if hello.Hello.ResumeId == "" {
t.Errorf("Expected resume id, got %+v", hello.Hello)
}
}
// Join room by id.
roomId := "test-room"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event.
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
// Perform housekeeping in the future, this will keep the connection as the
// session joined a room.
performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second))
// No message about the closing is sent to the new connection.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
// Leave room
if room, err := client.JoinRoom(ctx, ""); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != "" {
t.Fatalf("Expected room %s, got %s", "", room.Room.RoomId)
}
// Perform housekeeping in the future, this will cause the connection to
// be terminated because the anonymous client didn't join a room.
performHousekeeping(hub, time.Now().Add(anonmyousJoinRoomTimeout+time.Second))
message, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
}
if err := checkMessageType(message, "bye"); err != nil {
t.Error(err)
} else if message.Bye.Reason != "room_join_timeout" {
t.Errorf("Expected \"room_join_timeout\" reason, got %+v", message.Bye)
}
// Both the client and the session get removed from the hub.
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
if err := client.WaitForSessionRemoved(ctx, hello.Hello.SessionId); err != nil {
t.Error(err)
}
}
func TestJoinRoomChange(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event.
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
// Change room.
roomId = "other-test-room"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event.
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
// Leave room.
if room, err := client.JoinRoom(ctx, ""); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != "" {
t.Fatalf("Expected empty room, got %s", room.Room.RoomId)
}
}
func TestJoinMultiple(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
}
// Join room by id (first client).
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event.
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
// Join room by id (second client).
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event for the first and the second client.
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
// The first client will also receive a "joined" event from the second client.
if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
// Leave room.
if room, err := client1.JoinRoom(ctx, ""); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != "" {
t.Fatalf("Expected empty room, got %s", room.Room.RoomId)
}
// The second client will now receive a "left" event
if err := client2.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
if room, err := client2.JoinRoom(ctx, ""); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != "" {
t.Fatalf("Expected empty room, got %s", room.Room.RoomId)
}
}
func TestJoinDisplaynamesPermission(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
if session2 == nil {
t.Fatalf("Session %s does not exist", hello2.Hello.SessionId)
}
// Client 2 may not receive display names.
session2.SetPermissions([]Permission{PERMISSION_HIDE_DISPLAYNAMES})
// Join room by id (first client).
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event.
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
// Join room by id (second client).
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event for the first and the second client.
if events, unexpected, err := client2.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
} else {
if len(unexpected) > 0 {
t.Errorf("Received unexpected messages: %+v", unexpected)
} else if len(events) != 2 {
t.Errorf("Expected two event, got %+v", events)
} else if events[0].User != nil {
t.Errorf("Expected empty userdata for first event, got %+v", events[0].User)
} else if events[1].User != nil {
t.Errorf("Expected empty userdata for second event, got %+v", events[1].User)
}
}
// The first client will also receive a "joined" event from the second client.
if events, unexpected, err := client1.RunUntilJoinedAndReturn(ctx, hello2.Hello); err != nil {
t.Error(err)
} else {
if len(unexpected) > 0 {
t.Errorf("Received unexpected messages: %+v", unexpected)
} else if len(events) != 1 {
t.Errorf("Expected one event, got %+v", events)
} else if events[0].User == nil {
t.Errorf("Expected userdata for first event, got nothing")
}
}
}
func TestInitialRoomPermissions(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room-initial-permissions"
if room, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
session := hub.GetSessionByPublicId(hello.Hello.SessionId).(*ClientSession)
if session == nil {
t.Fatalf("Session %s does not exist", hello.Hello.SessionId)
}
if !session.HasPermission(PERMISSION_MAY_PUBLISH_AUDIO) {
t.Errorf("Session %s should have %s, got %+v", session.PublicId(), PERMISSION_MAY_PUBLISH_AUDIO, session.permissions)
}
if session.HasPermission(PERMISSION_MAY_PUBLISH_VIDEO) {
t.Errorf("Session %s should not have %s, got %+v", session.PublicId(), PERMISSION_MAY_PUBLISH_VIDEO, session.permissions)
}
}
func TestJoinRoomSwitchClient(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room-slow"
msg := &ClientMessage{
Id: "ABCD",
Type: "room",
Room: &RoomClientMessage{
RoomId: roomId,
SessionId: roomId + "-" + hello.Hello.SessionId,
},
}
if err := client.WriteJSON(msg); err != nil {
t.Fatal(err)
}
// Wait a bit to make sure request is sent before closing client.
time.Sleep(1 * time.Millisecond)
client.Close()
if err := client.WaitForClientRemoved(ctx); err != nil {
t.Fatal(err)
}
// The client needs some time to reconnect.
time.Sleep(200 * time.Millisecond)
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHelloResume(hello.Hello.ResumeId); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello2.Hello.UserId != testDefaultUserId {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId, hello2.Hello)
}
if hello2.Hello.SessionId != hello.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello.Hello.SessionId, hello2.Hello)
}
if hello2.Hello.ResumeId != hello.Hello.ResumeId {
t.Errorf("Expected resume id %s, got %+v", hello.Hello.ResumeId, hello2.Hello)
}
}
room, err := client2.RunUntilMessage(ctx)
if err != nil {
t.Fatal(err)
}
if err := checkUnexpectedClose(err); err != nil {
t.Fatal(err)
}
if err := checkMessageType(room, "room"); err != nil {
t.Fatal(err)
}
if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// We will receive a "joined" event.
if err := client2.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
// Leave room.
if room, err := client2.JoinRoom(ctx, ""); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != "" {
t.Fatalf("Expected empty room, got %s", room.Room.RoomId)
}
}
func TestGetRealUserIP(t *testing.T) {
REMOTE_ATTR := "192.168.1.2"
request := &http.Request{
RemoteAddr: REMOTE_ATTR,
}
if ip := getRealUserIP(request); ip != REMOTE_ATTR {
t.Errorf("Expected %s but got %s", REMOTE_ATTR, ip)
}
X_REAL_IP := "192.168.10.11"
request.Header = http.Header{
http.CanonicalHeaderKey("x-real-ip"): []string{X_REAL_IP},
}
if ip := getRealUserIP(request); ip != X_REAL_IP {
t.Errorf("Expected %s but got %s", X_REAL_IP, ip)
}
// "X-Real-IP" has preference before "X-Forwarded-For"
X_FORWARDED_FOR_IP := "192.168.20.21"
X_FORWARDED_FOR := X_FORWARDED_FOR_IP + ", 192.168.30.32"
request.Header = http.Header{
http.CanonicalHeaderKey("x-real-ip"): []string{X_REAL_IP},
http.CanonicalHeaderKey("x-forwarded-for"): []string{X_FORWARDED_FOR},
}
if ip := getRealUserIP(request); ip != X_REAL_IP {
t.Errorf("Expected %s but got %s", X_REAL_IP, ip)
}
request.Header = http.Header{
http.CanonicalHeaderKey("x-forwarded-for"): []string{X_FORWARDED_FOR},
}
if ip := getRealUserIP(request); ip != X_FORWARDED_FOR_IP {
t.Errorf("Expected %s but got %s", X_FORWARDED_FOR_IP, ip)
}
}
func TestClientMessageToSessionIdWhileDisconnected(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
}
client2.Close()
if err := client2.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
recipient2 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}
// The two chat messages should get combined into one when receiving pending messages.
chat_refresh := "{\"type\":\"chat\",\"chat\":{\"refresh\":true}}"
var data1 map[string]interface{}
if err := json.Unmarshal([]byte(chat_refresh), &data1); err != nil {
t.Fatal(err)
}
client1.SendMessage(recipient2, data1) // nolint
client1.SendMessage(recipient2, data1) // nolint
2021-05-31 16:53:52 +02:00
// Simulate some time until client resumes the session.
time.Sleep(10 * time.Millisecond)
client2 = NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHelloResume(hello2.Hello.ResumeId); err != nil {
t.Fatal(err)
}
hello3, err := client2.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello3.Hello.UserId != testDefaultUserId+"2" {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId+"2", hello3.Hello)
}
if hello3.Hello.SessionId != hello2.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello2.Hello.SessionId, hello3.Hello)
}
if hello3.Hello.ResumeId != hello2.Hello.ResumeId {
t.Errorf("Expected resume id %s, got %+v", hello2.Hello.ResumeId, hello3.Hello)
}
}
var payload map[string]interface{}
if err := checkReceiveClientMessage(ctx, client2, "session", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(payload, data1) {
t.Errorf("Expected payload %+v, got %+v", data1, payload)
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
if err := checkReceiveClientMessage(ctx2, client2, "session", hello1.Hello, &payload); err != nil {
2022-02-11 10:58:23 +01:00
if err != ErrNoMessageReceived {
t.Error(err)
}
} else {
t.Errorf("Expected no payload, got %+v", payload)
}
}
func TestRoomParticipantsListUpdateWhileDisconnected(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if hello1.Hello.SessionId == hello2.Hello.SessionId {
t.Fatalf("Expected different session ids, got %s twice", hello1.Hello.SessionId)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// Give message processing some time.
time.Sleep(10 * time.Millisecond)
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
// Simulate request from the backend that somebody joined the call.
users := []map[string]interface{}{
{
"sessionId": "the-session-id",
"inCall": 1,
},
}
room := hub.getRoom(roomId)
if room == nil {
t.Fatalf("Could not find room %s", roomId)
}
room.PublishUsersInCallChanged(users, users)
if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil {
t.Error(err)
}
client2.Close()
if err := client2.WaitForClientRemoved(ctx); err != nil {
t.Error(err)
}
room.PublishUsersInCallChanged(users, users)
// Give asynchronous events some time to be processed.
time.Sleep(100 * time.Millisecond)
recipient2 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}
chat_refresh := "{\"type\":\"chat\",\"chat\":{\"refresh\":true}}"
var data1 map[string]interface{}
if err := json.Unmarshal([]byte(chat_refresh), &data1); err != nil {
t.Fatal(err)
}
client1.SendMessage(recipient2, data1) // nolint
client2 = NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHelloResume(hello2.Hello.ResumeId); err != nil {
t.Fatal(err)
}
hello3, err := client2.RunUntilHello(ctx)
if err != nil {
t.Error(err)
} else {
if hello3.Hello.UserId != testDefaultUserId+"2" {
t.Errorf("Expected \"%s\", got %+v", testDefaultUserId+"2", hello3.Hello)
}
if hello3.Hello.SessionId != hello2.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello2.Hello.SessionId, hello3.Hello)
}
if hello3.Hello.ResumeId != hello2.Hello.ResumeId {
t.Errorf("Expected resume id %s, got %+v", hello2.Hello.ResumeId, hello3.Hello)
}
}
// The participants list update event is triggered again after the session resume.
// TODO(jojo): Check contents of message and try with multiple users.
if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil {
t.Error(err)
}
var payload map[string]interface{}
if err := checkReceiveClientMessage(ctx, client2, "session", hello1.Hello, &payload); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(payload, data1) {
t.Errorf("Expected payload %+v, got %+v", data1, payload)
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
if err := checkReceiveClientMessage(ctx2, client2, "session", hello1.Hello, &payload); err != nil {
2022-02-11 10:58:23 +01:00
if err != ErrNoMessageReceived {
t.Error(err)
}
} else {
t.Errorf("Expected no payload, got %+v", payload)
}
}
func TestClientTakeoverRoomSession(t *testing.T) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
RunTestClientTakeoverRoomSession(t)
})
}
}
func RunTestClientTakeoverRoomSession(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room-takeover-room-session"
roomSessionid := "room-session-id"
if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, roomSessionid); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if hubRoom := hub1.getRoom(roomId); hubRoom == nil {
t.Fatalf("Room %s does not exist", roomId)
}
if session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId); session1 == nil {
t.Fatalf("There should be a session %s", hello1.Hello.SessionId)
}
client3 := NewTestClient(t, server2, hub2)
defer client3.CloseWithBye()
if err := client3.SendHello(testDefaultUserId + "3"); err != nil {
t.Fatal(err)
}
hello3, err := client3.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if room, err := client3.JoinRoomWithRoomSession(ctx, roomId, roomSessionid+"other"); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// Wait until both users have joined.
WaitForUsersJoined(ctx, t, client1, hello1, client3, hello3)
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if room, err := client2.JoinRoomWithRoomSession(ctx, roomId, roomSessionid); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// The first client got disconnected with a reason in a "Bye" message.
msg, err := client1.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
} else {
if msg.Type != "bye" || msg.Bye == nil {
t.Errorf("Expected bye message, got %+v", msg)
} else if msg.Bye.Reason != "room_session_reconnected" {
t.Errorf("Expected reason \"room_session_reconnected\", got %+v", msg.Bye.Reason)
}
}
if msg, err := client1.RunUntilMessage(ctx); err == nil {
t.Errorf("Expected error but received %+v", msg)
} else if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
t.Errorf("Expected close error but received %+v", err)
}
// The first session has been closed
if session1 := hub1.GetSessionByPublicId(hello1.Hello.SessionId); session1 != nil {
t.Errorf("The session %s should have been removed", hello1.Hello.SessionId)
}
// The new client will receive "joined" events for the existing client3 and
// himself.
if err := client2.RunUntilJoined(ctx, hello3.Hello, hello2.Hello); err != nil {
t.Error(err)
}
// No message about the closing is sent to the new connection.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
// The permanently connected client will receive a "left" event from the
// overridden session and a "joined" for the new session. In that order as
// both were on the same server.
if err := client3.RunUntilLeft(ctx, hello1.Hello); err != nil {
t.Error(err)
}
if err := client3.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
}
func TestClientSendOfferPermissions(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
mcu, err := NewTestMCU()
if err != nil {
t.Fatal(err)
} else if err := mcu.Start(); err != nil {
t.Fatal(err)
}
defer mcu.Stop()
hub.SetMcu(mcu)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// Give message processing some time.
time.Sleep(10 * time.Millisecond)
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
if session1 == nil {
t.Fatalf("Session %s does not exist", hello1.Hello.SessionId)
}
session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
if session2 == nil {
t.Fatalf("Session %s does not exist", hello2.Hello.SessionId)
}
// Client 1 is the moderator
session1.SetPermissions([]Permission{PERMISSION_MAY_PUBLISH_MEDIA, PERMISSION_MAY_PUBLISH_SCREEN})
// Client 2 is a guest participant.
session2.SetPermissions([]Permission{})
// Client 2 may not send an offer (he doesn't have the necessary permissions).
if err := client2.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "sendoffer",
Sid: "12345",
RoomType: "screen",
}); err != nil {
t.Fatal(err)
}
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageError(msg, "not_allowed"); err != nil {
t.Fatal(err)
}
}
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "offer",
Sid: "12345",
RoomType: "screen",
Payload: map[string]interface{}{
"sdp": MockSdpOfferAudioAndVideo,
},
}); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo); err != nil {
t.Fatal(err)
}
// Client 1 may send an offer.
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}, MessageClientMessageData{
Type: "sendoffer",
Sid: "54321",
RoomType: "screen",
}); err != nil {
t.Fatal(err)
}
// The sender won't get a reply...
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client1.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
// ...but the other peer will get an offer.
if err := client2.RunUntilOffer(ctx, MockSdpOfferAudioAndVideo); err != nil {
t.Fatal(err)
}
}
func TestClientSendOfferPermissionsAudioOnly(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
mcu, err := NewTestMCU()
if err != nil {
t.Fatal(err)
} else if err := mcu.Start(); err != nil {
t.Fatal(err)
}
defer mcu.Stop()
hub.SetMcu(mcu)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
if session1 == nil {
t.Fatalf("Session %s does not exist", hello1.Hello.SessionId)
}
// Client is allowed to send audio only.
session1.SetPermissions([]Permission{PERMISSION_MAY_PUBLISH_AUDIO})
// Client may not send an offer with audio and video.
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "offer",
Sid: "54321",
RoomType: "video",
Payload: map[string]interface{}{
"sdp": MockSdpOfferAudioAndVideo,
},
}); err != nil {
t.Fatal(err)
}
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageError(msg, "not_allowed"); err != nil {
t.Fatal(err)
}
}
// Client may send an offer (audio only).
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "offer",
Sid: "54321",
RoomType: "video",
Payload: map[string]interface{}{
"sdp": MockSdpOfferAudioOnly,
},
}); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilAnswer(ctx, MockSdpAnswerAudioOnly); err != nil {
t.Fatal(err)
}
}
func TestClientSendOfferPermissionsAudioVideo(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
mcu, err := NewTestMCU()
if err != nil {
t.Fatal(err)
} else if err := mcu.Start(); err != nil {
t.Fatal(err)
}
defer mcu.Stop()
hub.SetMcu(mcu)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
if session1 == nil {
t.Fatalf("Session %s does not exist", hello1.Hello.SessionId)
}
// Client is allowed to send audio and video.
session1.SetPermissions([]Permission{PERMISSION_MAY_PUBLISH_AUDIO, PERMISSION_MAY_PUBLISH_VIDEO})
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "offer",
Sid: "54321",
RoomType: "video",
Payload: map[string]interface{}{
"sdp": MockSdpOfferAudioAndVideo,
},
}); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo); err != nil {
t.Fatal(err)
}
// Client is no longer allowed to send video, this will stop the publisher.
msg := &BackendServerRoomRequest{
Type: "participants",
Participants: &BackendRoomParticipantsRequest{
Changed: []map[string]interface{}{
{
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_AUDIO},
},
},
Users: []map[string]interface{}{
{
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_AUDIO},
},
},
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
2022-04-05 12:48:27 +02:00
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
ctx2, cancel2 := context.WithTimeout(ctx, time.Second)
defer cancel2()
pubs := mcu.GetPublishers()
if len(pubs) != 1 {
t.Fatalf("expected one publisher, got %+v", pubs)
}
loop:
for {
if err := ctx2.Err(); err != nil {
t.Errorf("publisher was not closed: %s", err)
}
for _, pub := range pubs {
if pub.isClosed() {
break loop
}
}
// Give some time to async processing.
time.Sleep(time.Millisecond)
}
}
func TestClientSendOfferPermissionsAudioVideoMedia(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
mcu, err := NewTestMCU()
if err != nil {
t.Fatal(err)
} else if err := mcu.Start(); err != nil {
t.Fatal(err)
}
defer mcu.Stop()
hub.SetMcu(mcu)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
if session1 == nil {
t.Fatalf("Session %s does not exist", hello1.Hello.SessionId)
}
// Client is allowed to send audio and video.
session1.SetPermissions([]Permission{PERMISSION_MAY_PUBLISH_MEDIA})
// Client may send an offer (audio and video).
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "offer",
Sid: "54321",
RoomType: "video",
Payload: map[string]interface{}{
"sdp": MockSdpOfferAudioAndVideo,
},
}); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo); err != nil {
t.Fatal(err)
}
// Client is no longer allowed to send video, this will stop the publisher.
msg := &BackendServerRoomRequest{
Type: "participants",
Participants: &BackendRoomParticipantsRequest{
Changed: []map[string]interface{}{
{
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA, PERMISSION_MAY_CONTROL},
},
},
Users: []map[string]interface{}{
{
"sessionId": roomId + "-" + hello1.Hello.SessionId,
"permissions": []Permission{PERMISSION_MAY_PUBLISH_MEDIA, PERMISSION_MAY_CONTROL},
},
},
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
2022-04-05 12:48:27 +02:00
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
ctx2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel2()
pubs := mcu.GetPublishers()
if len(pubs) != 1 {
t.Fatalf("expected one publisher, got %+v", pubs)
}
loop:
for {
if err := ctx2.Err(); err != nil {
if err != context.DeadlineExceeded {
t.Errorf("error while waiting for publisher: %s", err)
}
break
}
for _, pub := range pubs {
if pub.isClosed() {
t.Errorf("publisher was closed")
break loop
}
}
// Give some time to async processing.
time.Sleep(time.Millisecond)
}
}
func TestClientRequestOfferNotInRoom(t *testing.T) {
2022-06-28 17:18:35 +02:00
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
2022-06-28 17:18:35 +02:00
mcu, err := NewTestMCU()
if err != nil {
t.Fatal(err)
} else if err := mcu.Start(); err != nil {
t.Fatal(err)
}
defer mcu.Stop()
2022-06-28 17:18:35 +02:00
hub1.SetMcu(mcu)
hub2.SetMcu(mcu)
2022-06-28 17:18:35 +02:00
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
2022-06-28 17:18:35 +02:00
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
2022-06-28 17:18:35 +02:00
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
2022-06-28 17:18:35 +02:00
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1"); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
2022-06-28 17:18:35 +02:00
// We will receive a "joined" event.
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "offer",
Sid: "54321",
RoomType: "screen",
Payload: map[string]interface{}{
"sdp": MockSdpOfferAudioAndVideo,
},
}); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo); err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
// Client 2 may not request an offer (he is not in the room yet).
if err := client2.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "requestoffer",
Sid: "12345",
RoomType: "screen",
}); err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageError(msg, "not_allowed"); err != nil {
t.Fatal(err)
}
}
2022-06-28 17:18:35 +02:00
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
2022-06-28 17:18:35 +02:00
// We will receive a "joined" event.
if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
2022-06-28 17:18:35 +02:00
// Client 2 may not request an offer (he is not in the call yet).
if err := client2.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "requestoffer",
Sid: "12345",
RoomType: "screen",
}); err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageError(msg, "not_allowed"); err != nil {
t.Fatal(err)
}
}
2022-06-28 17:18:35 +02:00
// Simulate request from the backend that somebody joined the call.
users1 := []map[string]interface{}{
{
"sessionId": hello2.Hello.SessionId,
"inCall": 1,
},
}
room2 := hub2.getRoom(roomId)
if room2 == nil {
t.Fatalf("Could not find room %s", roomId)
}
room2.PublishUsersInCallChanged(users1, users1)
if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil {
t.Error(err)
}
if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil {
t.Error(err)
}
2022-06-28 17:18:35 +02:00
// Client 2 may not request an offer (recipient is not in the call yet).
if err := client2.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "requestoffer",
Sid: "12345",
RoomType: "screen",
}); err != nil {
t.Fatal(err)
}
2022-06-28 17:18:35 +02:00
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageError(msg, "not_allowed"); err != nil {
t.Fatal(err)
}
}
2022-06-28 17:18:35 +02:00
// Simulate request from the backend that somebody joined the call.
users2 := []map[string]interface{}{
{
"sessionId": hello1.Hello.SessionId,
"inCall": 1,
},
}
room1 := hub1.getRoom(roomId)
if room1 == nil {
t.Fatalf("Could not find room %s", roomId)
}
room1.PublishUsersInCallChanged(users2, users2)
if err := checkReceiveClientEvent(ctx, client1, "update", nil); err != nil {
t.Error(err)
}
if err := checkReceiveClientEvent(ctx, client2, "update", nil); err != nil {
t.Error(err)
}
2022-06-28 17:18:35 +02:00
// Client 2 may request an offer now (both are in the same room and call).
if err := client2.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "requestoffer",
Sid: "12345",
RoomType: "screen",
}); err != nil {
t.Fatal(err)
}
if err := client2.RunUntilOffer(ctx, MockSdpOfferAudioAndVideo); err != nil {
2022-06-28 17:18:35 +02:00
t.Fatal(err)
}
})
}
}
func TestNoSendBetweenSessionsOnDifferentBackends(t *testing.T) {
// Clients can't send messages to sessions connected from other backends.
hub, _, _, server := CreateHubWithMultipleBackendsForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
params1 := TestBackendClientAuthParams{
UserId: "user1",
}
if err := client1.SendHelloParams(server.URL+"/one", HelloVersionV1, "client", nil, params1); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
params2 := TestBackendClientAuthParams{
UserId: "user2",
}
if err := client2.SendHelloParams(server.URL+"/two", HelloVersionV1, "client", nil, params2); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
recipient1 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}
recipient2 := MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}
data1 := "from-1-to-2"
client1.SendMessage(recipient2, data1) // nolint
data2 := "from-2-to-1"
client2.SendMessage(recipient1, data2) // nolint
var payload string
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
if err := checkReceiveClientMessage(ctx2, client1, "session", hello2.Hello, &payload); err != nil {
2022-02-11 10:58:23 +01:00
if err != ErrNoMessageReceived {
t.Error(err)
}
} else {
t.Errorf("Expected no payload, got %+v", payload)
}
ctx3, cancel3 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel3()
if err := checkReceiveClientMessage(ctx3, client2, "session", hello1.Hello, &payload); err != nil {
2022-02-11 10:58:23 +01:00
if err != ErrNoMessageReceived {
t.Error(err)
}
} else {
t.Errorf("Expected no payload, got %+v", payload)
}
}
func TestNoSameRoomOnDifferentBackends(t *testing.T) {
hub, _, _, server := CreateHubWithMultipleBackendsForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
params1 := TestBackendClientAuthParams{
UserId: "user1",
}
if err := client1.SendHelloParams(server.URL+"/one", HelloVersionV1, "client", nil, params1); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server, hub)
defer client2.CloseWithBye()
params2 := TestBackendClientAuthParams{
UserId: "user2",
}
if err := client2.SendHelloParams(server.URL+"/two", HelloVersionV1, "client", nil, params2); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
msg1, err := client1.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
}
if err := client1.checkMessageJoined(msg1, hello1.Hello); err != nil {
t.Error(err)
}
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
msg2, err := client2.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
}
if err := client2.checkMessageJoined(msg2, hello2.Hello); err != nil {
t.Error(err)
}
hub.ru.RLock()
var rooms []*Room
for _, room := range hub.rooms {
defer room.Close()
rooms = append(rooms, room)
}
hub.ru.RUnlock()
if len(rooms) != 2 {
t.Errorf("Expected 2 rooms, got %+v", rooms)
}
if rooms[0].IsEqual(rooms[1]) {
t.Errorf("Rooms should be different: %+v", rooms)
}
recipient := MessageClientMessageRecipient{
Type: "room",
}
data1 := "from-1-to-2"
client1.SendMessage(recipient, data1) // nolint
data2 := "from-2-to-1"
client2.SendMessage(recipient, data2) // nolint
var payload string
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
if err := checkReceiveClientMessage(ctx2, client1, "session", hello2.Hello, &payload); err != nil {
2022-02-11 10:58:23 +01:00
if err != ErrNoMessageReceived {
t.Error(err)
}
} else {
t.Errorf("Expected no payload, got %+v", payload)
}
ctx3, cancel3 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel3()
if err := checkReceiveClientMessage(ctx3, client2, "session", hello1.Hello, &payload); err != nil {
2022-02-11 10:58:23 +01:00
if err != ErrNoMessageReceived {
t.Error(err)
}
} else {
t.Errorf("Expected no payload, got %+v", payload)
}
}
2022-06-28 17:18:35 +02:00
func TestClientSendOffer(t *testing.T) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
mcu, err := NewTestMCU()
if err != nil {
t.Fatal(err)
} else if err := mcu.Start(); err != nil {
t.Fatal(err)
}
defer mcu.Stop()
hub1.SetMcu(mcu)
hub2.SetMcu(mcu)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoomWithRoomSession(ctx, roomId, "roomsession1"); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
// Give message processing some time.
time.Sleep(10 * time.Millisecond)
if room, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2)
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "offer",
Sid: "12345",
RoomType: "video",
Payload: map[string]interface{}{
"sdp": MockSdpOfferAudioAndVideo,
},
}); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo); err != nil {
t.Fatal(err)
}
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello2.Hello.SessionId,
}, MessageClientMessageData{
Type: "sendoffer",
RoomType: "video",
}); err != nil {
t.Fatal(err)
}
// The sender won't get a reply...
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client1.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
// ...but the other peer will get an offer.
if err := client2.RunUntilOffer(ctx, MockSdpOfferAudioAndVideo); err != nil {
2022-06-28 17:18:35 +02:00
t.Fatal(err)
}
})
}
}
func TestClientUnshareScreen(t *testing.T) {
hub, _, _, server := CreateHubForTest(t)
mcu, err := NewTestMCU()
if err != nil {
t.Fatal(err)
} else if err := mcu.Start(); err != nil {
t.Fatal(err)
}
defer mcu.Stop()
hub.SetMcu(mcu)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server, hub)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
// Join room by id.
roomId := "test-room"
if room, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId {
t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession)
if session1 == nil {
t.Fatalf("Session %s does not exist", hello1.Hello.SessionId)
}
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "offer",
Sid: "54321",
RoomType: "screen",
Payload: map[string]interface{}{
"sdp": MockSdpOfferAudioOnly,
},
}); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilAnswer(ctx, MockSdpAnswerAudioOnly); err != nil {
t.Fatal(err)
}
publisher := mcu.GetPublisher(hello1.Hello.SessionId)
if publisher == nil {
t.Fatalf("No publisher for %s found", hello1.Hello.SessionId)
} else if publisher.isClosed() {
t.Fatalf("Publisher %s should not be closed", hello1.Hello.SessionId)
}
old := cleanupScreenPublisherDelay
cleanupScreenPublisherDelay = time.Millisecond
defer func() {
cleanupScreenPublisherDelay = old
}()
if err := client1.SendMessage(MessageClientMessageRecipient{
Type: "session",
SessionId: hello1.Hello.SessionId,
}, MessageClientMessageData{
Type: "unshareScreen",
Sid: "54321",
RoomType: "screen",
}); err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
if !publisher.isClosed() {
t.Fatalf("Publisher %s should be closed", hello1.Hello.SessionId)
}
}
2022-07-12 12:01:20 +02:00
func TestVirtualClientSessions(t *testing.T) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomId := "test-room"
if _, err := client1.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello); err != nil {
t.Error(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHelloInternal(); err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession)
if session2 == nil {
t.Fatalf("Session %s does not exist", hello2.Hello.SessionId)
}
if _, err := client2.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
}
if err := client1.RunUntilJoined(ctx, hello2.Hello); err != nil {
t.Error(err)
}
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if msg, err := checkMessageParticipantsInCall(msg); err != nil {
t.Error(err)
} else if len(msg.Users) != 1 {
t.Errorf("Expected one user, got %+v", msg)
} else if v, ok := msg.Users[0]["internal"].(bool); !ok || !v {
t.Errorf("Expected internal flag, got %+v", msg)
} else if v, ok := msg.Users[0]["sessionId"].(string); !ok || v != hello2.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello2.Hello.SessionId, msg)
} else if v, ok := msg.Users[0]["inCall"].(float64); !ok || v != 3 {
t.Errorf("Expected inCall flag 3, got %+v", msg)
}
_, unexpected, err := client2.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello)
if err != nil {
t.Error(err)
}
if len(unexpected) == 0 {
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else {
unexpected = append(unexpected, msg)
}
}
if len(unexpected) != 1 {
t.Fatalf("expected one message, got %+v", unexpected)
}
if msg, err := checkMessageParticipantsInCall(unexpected[0]); err != nil {
t.Error(err)
} else if len(msg.Users) != 1 {
t.Errorf("Expected one user, got %+v", msg)
} else if v, ok := msg.Users[0]["internal"].(bool); !ok || !v {
t.Errorf("Expected internal flag, got %+v", msg)
} else if v, ok := msg.Users[0]["sessionId"].(string); !ok || v != hello2.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello2.Hello.SessionId, msg)
} else if v, ok := msg.Users[0]["inCall"].(float64); !ok || v != FlagInCall|FlagWithAudio {
t.Errorf("Expected inCall flag %d, got %+v", FlagInCall|FlagWithAudio, msg)
}
calledCtx, calledCancel := context.WithTimeout(ctx, time.Second)
virtualSessionId := "virtual-session-id"
virtualUserId := "virtual-user-id"
generatedSessionId := GetVirtualSessionId(session2, virtualSessionId)
setSessionRequestHandler(t, func(request *BackendClientSessionRequest) {
defer calledCancel()
if request.Action != "add" {
t.Errorf("Expected action add, got %+v", request)
} else if request.RoomId != roomId {
t.Errorf("Expected room id %s, got %+v", roomId, request)
} else if request.SessionId == generatedSessionId {
t.Errorf("Expected generated session id %s, got %+v", generatedSessionId, request)
} else if request.UserId != virtualUserId {
t.Errorf("Expected session id %s, got %+v", virtualUserId, request)
}
})
if err := client2.SendInternalAddSession(&AddSessionInternalClientMessage{
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
SessionId: virtualSessionId,
RoomId: roomId,
},
UserId: virtualUserId,
Flags: FLAG_MUTED_SPEAKING,
}); err != nil {
t.Fatal(err)
}
<-calledCtx.Done()
if err := calledCtx.Err(); err != nil && !errors.Is(err, context.Canceled) {
t.Fatal(err)
}
virtualSessions := session2.GetVirtualSessions()
for len(virtualSessions) == 0 {
time.Sleep(time.Millisecond)
virtualSessions = session2.GetVirtualSessions()
}
virtualSession := virtualSessions[0]
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if err := client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId); err != nil {
t.Error(err)
}
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if msg, err := checkMessageParticipantsInCall(msg); err != nil {
t.Error(err)
} else if len(msg.Users) != 2 {
t.Errorf("Expected two users, got %+v", msg)
} else if v, ok := msg.Users[0]["internal"].(bool); !ok || !v {
t.Errorf("Expected internal flag, got %+v", msg)
} else if v, ok := msg.Users[0]["sessionId"].(string); !ok || v != hello2.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello2.Hello.SessionId, msg)
} else if v, ok := msg.Users[0]["inCall"].(float64); !ok || v != FlagInCall|FlagWithAudio {
t.Errorf("Expected inCall flag %d, got %+v", FlagInCall|FlagWithAudio, msg)
} else if v, ok := msg.Users[1]["virtual"].(bool); !ok || !v {
t.Errorf("Expected virtual flag, got %+v", msg)
} else if v, ok := msg.Users[1]["sessionId"].(string); !ok || v != virtualSession.PublicId() {
t.Errorf("Expected session id %s, got %+v", virtualSession.PublicId(), msg)
} else if v, ok := msg.Users[1]["inCall"].(float64); !ok || v != FlagInCall|FlagWithPhone {
t.Errorf("Expected inCall flag %d, got %+v", FlagInCall|FlagWithPhone, msg)
}
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if flags, err := checkMessageParticipantFlags(msg); err != nil {
t.Error(err)
} else if flags.RoomId != roomId {
t.Errorf("Expected room id %s, got %+v", roomId, msg)
} else if flags.SessionId != virtualSession.PublicId() {
t.Errorf("Expected session id %s, got %+v", virtualSession.PublicId(), msg)
} else if flags.Flags != FLAG_MUTED_SPEAKING {
t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, msg)
}
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if err := client2.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId); err != nil {
t.Error(err)
}
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if msg, err := checkMessageParticipantsInCall(msg); err != nil {
t.Error(err)
} else if len(msg.Users) != 2 {
t.Errorf("Expected two users, got %+v", msg)
} else if v, ok := msg.Users[0]["internal"].(bool); !ok || !v {
t.Errorf("Expected internal flag, got %+v", msg)
} else if v, ok := msg.Users[0]["sessionId"].(string); !ok || v != hello2.Hello.SessionId {
t.Errorf("Expected session id %s, got %+v", hello2.Hello.SessionId, msg)
} else if v, ok := msg.Users[0]["inCall"].(float64); !ok || v != FlagInCall|FlagWithAudio {
t.Errorf("Expected inCall flag %d, got %+v", FlagInCall|FlagWithAudio, msg)
} else if v, ok := msg.Users[1]["virtual"].(bool); !ok || !v {
t.Errorf("Expected virtual flag, got %+v", msg)
} else if v, ok := msg.Users[1]["sessionId"].(string); !ok || v != virtualSession.PublicId() {
t.Errorf("Expected session id %s, got %+v", virtualSession.PublicId(), msg)
} else if v, ok := msg.Users[1]["inCall"].(float64); !ok || v != FlagInCall|FlagWithPhone {
t.Errorf("Expected inCall flag %d, got %+v", FlagInCall|FlagWithPhone, msg)
}
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if flags, err := checkMessageParticipantFlags(msg); err != nil {
t.Error(err)
} else if flags.RoomId != roomId {
t.Errorf("Expected room id %s, got %+v", roomId, msg)
} else if flags.SessionId != virtualSession.PublicId() {
t.Errorf("Expected session id %s, got %+v", virtualSession.PublicId(), msg)
} else if flags.Flags != FLAG_MUTED_SPEAKING {
t.Errorf("Expected flags %d, got %+v", FLAG_MUTED_SPEAKING, msg)
}
updatedFlags := uint32(0)
if err := client2.SendInternalUpdateSession(&UpdateSessionInternalClientMessage{
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
SessionId: virtualSessionId,
RoomId: roomId,
},
Flags: &updatedFlags,
}); err != nil {
t.Fatal(err)
}
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if flags, err := checkMessageParticipantFlags(msg); err != nil {
t.Error(err)
} else if flags.RoomId != roomId {
t.Errorf("Expected room id %s, got %+v", roomId, msg)
} else if flags.SessionId != virtualSession.PublicId() {
t.Errorf("Expected session id %s, got %+v", virtualSession.PublicId(), msg)
} else if flags.Flags != 0 {
t.Errorf("Expected flags %d, got %+v", 0, msg)
}
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if flags, err := checkMessageParticipantFlags(msg); err != nil {
t.Error(err)
} else if flags.RoomId != roomId {
t.Errorf("Expected room id %s, got %+v", roomId, msg)
} else if flags.SessionId != virtualSession.PublicId() {
t.Errorf("Expected session id %s, got %+v", virtualSession.PublicId(), msg)
} else if flags.Flags != 0 {
t.Errorf("Expected flags %d, got %+v", 0, msg)
}
calledCtx, calledCancel = context.WithTimeout(ctx, time.Second)
setSessionRequestHandler(t, func(request *BackendClientSessionRequest) {
defer calledCancel()
if request.Action != "remove" {
t.Errorf("Expected action remove, got %+v", request)
} else if request.RoomId != roomId {
t.Errorf("Expected room id %s, got %+v", roomId, request)
} else if request.SessionId == generatedSessionId {
t.Errorf("Expected generated session id %s, got %+v", generatedSessionId, request)
} else if request.UserId != virtualUserId {
t.Errorf("Expected user id %s, got %+v", virtualUserId, request)
}
})
// Messages to virtual sessions are sent to the associated client session.
virtualRecipient := MessageClientMessageRecipient{
Type: "session",
SessionId: virtualSession.PublicId(),
}
data := "message-to-virtual"
client1.SendMessage(virtualRecipient, data) // nolint
var payload string
var sender *MessageServerMessageSender
var recipient *MessageClientMessageRecipient
if err := checkReceiveClientMessageWithSenderAndRecipient(ctx, client2, "session", hello1.Hello, &payload, &sender, &recipient); err != nil {
t.Error(err)
} else if recipient.SessionId != virtualSessionId {
t.Errorf("Expected session id %s, got %+v", virtualSessionId, recipient)
} else if payload != data {
t.Errorf("Expected payload %s, got %s", data, payload)
}
data = "control-to-virtual"
client1.SendControl(virtualRecipient, data) // nolint
if err := checkReceiveClientControlWithSenderAndRecipient(ctx, client2, "session", hello1.Hello, &payload, &sender, &recipient); err != nil {
t.Error(err)
} else if recipient.SessionId != virtualSessionId {
t.Errorf("Expected session id %s, got %+v", virtualSessionId, recipient)
} else if payload != data {
t.Errorf("Expected payload %s, got %s", data, payload)
}
if err := client2.SendInternalRemoveSession(&RemoveSessionInternalClientMessage{
CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{
SessionId: virtualSessionId,
RoomId: roomId,
},
UserId: virtualUserId,
}); err != nil {
t.Fatal(err)
}
<-calledCtx.Done()
if err := calledCtx.Err(); err != nil && !errors.Is(err, context.Canceled) {
t.Fatal(err)
}
if msg, err := client1.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if err := client1.checkMessageRoomLeaveSession(msg, virtualSession.PublicId()); err != nil {
t.Error(err)
}
if msg, err := client2.RunUntilMessage(ctx); err != nil {
t.Error(err)
} else if err := client2.checkMessageRoomLeaveSession(msg, virtualSession.PublicId()); err != nil {
t.Error(err)
}
})
}
}
2023-01-30 10:08:35 +01:00
func DoTestSwitchToOne(t *testing.T, details map[string]interface{}) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomSessionId1 := "roomsession1"
roomId1 := "test-room"
if room, err := client1.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId1); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId1 {
t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId)
}
roomSessionId2 := "roomsession2"
if room, err := client2.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId2); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId1 {
t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
roomId2 := "test-room-2"
var sessions json.RawMessage
if details != nil {
if sessions, err = json.Marshal(map[string]interface{}{
roomSessionId1: details,
}); err != nil {
t.Fatal(err)
}
} else {
if sessions, err = json.Marshal([]string{
roomSessionId1,
}); err != nil {
t.Fatal(err)
}
}
// Notify first client to switch to different room.
msg := &BackendServerRoomRequest{
Type: "switchto",
SwitchTo: &BackendRoomSwitchToMessageRequest{
RoomId: roomId2,
Sessions: &sessions,
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId1, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
var detailsData json.RawMessage
if details != nil {
if detailsData, err = json.Marshal(details); err != nil {
t.Fatal(err)
}
}
if _, err := client1.RunUntilSwitchTo(ctx, roomId2, detailsData); err != nil {
t.Error(err)
}
// The other client will not receive a message.
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
if message, err := client2.RunUntilMessage(ctx2); err != nil && err != ErrNoMessageReceived && err != context.DeadlineExceeded {
t.Error(err)
} else if message != nil {
t.Errorf("Expected no message, got %+v", message)
}
})
}
}
func TestSwitchToOneMap(t *testing.T) {
DoTestSwitchToOne(t, map[string]interface{}{
"foo": "bar",
})
}
func TestSwitchToOneList(t *testing.T) {
DoTestSwitchToOne(t, nil)
}
func DoTestSwitchToMultiple(t *testing.T, details1 map[string]interface{}, details2 map[string]interface{}) {
for _, subtest := range clusteredTests {
t.Run(subtest, func(t *testing.T) {
var hub1 *Hub
var hub2 *Hub
var server1 *httptest.Server
var server2 *httptest.Server
if isLocalTest(t) {
hub1, _, _, server1 = CreateHubForTest(t)
hub2 = hub1
server2 = server1
} else {
hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t)
}
client1 := NewTestClient(t, server1, hub1)
defer client1.CloseWithBye()
if err := client1.SendHello(testDefaultUserId + "1"); err != nil {
t.Fatal(err)
}
client2 := NewTestClient(t, server2, hub2)
defer client2.CloseWithBye()
if err := client2.SendHello(testDefaultUserId + "2"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
hello1, err := client1.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
hello2, err := client2.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomSessionId1 := "roomsession1"
roomId1 := "test-room"
if room, err := client1.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId1); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId1 {
t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId)
}
roomSessionId2 := "roomsession2"
if room, err := client2.JoinRoomWithRoomSession(ctx, roomId1, roomSessionId2); err != nil {
t.Fatal(err)
} else if room.Room.RoomId != roomId1 {
t.Fatalf("Expected room %s, got %s", roomId1, room.Room.RoomId)
}
if err := client1.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
if err := client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello); err != nil {
t.Error(err)
}
roomId2 := "test-room-2"
var sessions json.RawMessage
if details1 != nil || details2 != nil {
if sessions, err = json.Marshal(map[string]interface{}{
roomSessionId1: details1,
roomSessionId2: details2,
}); err != nil {
t.Fatal(err)
}
} else {
if sessions, err = json.Marshal([]string{
roomSessionId1,
roomSessionId2,
}); err != nil {
t.Fatal(err)
}
}
msg := &BackendServerRoomRequest{
Type: "switchto",
SwitchTo: &BackendRoomSwitchToMessageRequest{
RoomId: roomId2,
Sessions: &sessions,
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId1, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != 200 {
t.Errorf("Expected successful request, got %s: %s", res.Status, string(body))
}
var detailsData1 json.RawMessage
if details1 != nil {
if detailsData1, err = json.Marshal(details1); err != nil {
t.Fatal(err)
}
}
if _, err := client1.RunUntilSwitchTo(ctx, roomId2, detailsData1); err != nil {
t.Error(err)
}
var detailsData2 json.RawMessage
if details2 != nil {
if detailsData2, err = json.Marshal(details2); err != nil {
t.Fatal(err)
}
}
if _, err := client2.RunUntilSwitchTo(ctx, roomId2, detailsData2); err != nil {
t.Error(err)
}
})
}
}
func TestSwitchToMultipleMap(t *testing.T) {
DoTestSwitchToMultiple(t, map[string]interface{}{
"foo": "bar",
}, map[string]interface{}{
"bar": "baz",
})
}
func TestSwitchToMultipleList(t *testing.T) {
DoTestSwitchToMultiple(t, nil, nil)
}
func TestSwitchToMultipleMixed(t *testing.T) {
DoTestSwitchToMultiple(t, map[string]interface{}{
"foo": "bar",
}, nil)
}
func TestGeoipOverrides(t *testing.T) {
country1 := "DE"
country2 := "IT"
country3 := "site1"
hub, _, _, _ := CreateHubForTestWithConfig(t, func(server *httptest.Server) (*goconf.ConfigFile, error) {
conf, err := getTestConfig(server)
if err != nil {
return nil, err
}
conf.AddOption("geoip-overrides", "10.1.0.0/16", country1)
conf.AddOption("geoip-overrides", "10.2.0.0/16", country2)
conf.AddOption("geoip-overrides", "192.168.10.20", country3)
return conf, err
})
if country := hub.OnLookupCountry(&Client{addr: "127.0.0.1"}); country != loopback {
t.Errorf("expected country %s, got %s", loopback, country)
}
if country := hub.OnLookupCountry(&Client{addr: "8.8.8.8"}); country != unknownCountry {
t.Errorf("expected country %s, got %s", unknownCountry, country)
}
if country := hub.OnLookupCountry(&Client{addr: "10.1.1.2"}); country != country1 {
t.Errorf("expected country %s, got %s", country1, country)
}
if country := hub.OnLookupCountry(&Client{addr: "10.2.1.2"}); country != country2 {
t.Errorf("expected country %s, got %s", country2, country)
}
if country := hub.OnLookupCountry(&Client{addr: "192.168.10.20"}); country != strings.ToUpper(country3) {
t.Errorf("expected country %s, got %s", strings.ToUpper(country3), country)
}
}
2023-10-26 10:38:22 +02:00
func TestDialoutStatus(t *testing.T) {
_, _, _, hub, _, server := CreateBackendServerForTest(t)
internalClient := NewTestClient(t, server, hub)
defer internalClient.CloseWithBye()
if err := internalClient.SendHelloInternalWithFeatures([]string{"start-dialout"}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
_, err := internalClient.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
roomId := "12345"
client := NewTestClient(t, server, hub)
defer client.CloseWithBye()
if err := client.SendHello(testDefaultUserId); err != nil {
t.Fatal(err)
}
hello, err := client.RunUntilHello(ctx)
if err != nil {
t.Fatal(err)
}
if _, err := client.JoinRoom(ctx, roomId); err != nil {
t.Fatal(err)
}
if err := client.RunUntilJoined(ctx, hello.Hello); err != nil {
t.Error(err)
}
callId := "call-123"
stopped := make(chan struct{})
go func(client *TestClient) {
defer close(stopped)
msg, err := client.RunUntilMessage(ctx)
if err != nil {
t.Error(err)
return
}
if msg.Type != "internal" || msg.Internal.Type != "dialout" {
t.Errorf("expected internal dialout message, got %+v", msg)
return
}
if msg.Internal.Dialout.RoomId != roomId {
t.Errorf("expected room id %s, got %+v", roomId, msg)
}
response := &ClientMessage{
Id: msg.Id,
Type: "internal",
Internal: &InternalClientMessage{
Type: "dialout",
Dialout: &DialoutInternalClientMessage{
Type: "status",
RoomId: msg.Internal.Dialout.RoomId,
Status: &DialoutStatusInternalClientMessage{
Status: "accepted",
CallId: callId,
},
},
},
}
if err := client.WriteJSON(response); err != nil {
t.Error(err)
}
}(internalClient)
defer func() {
<-stopped
}()
msg := &BackendServerRoomRequest{
Type: "dialout",
Dialout: &BackendRoomDialoutRequest{
Number: "+1234567890",
},
}
data, err := json.Marshal(msg)
if err != nil {
t.Fatal(err)
}
res, err := performBackendRequest(server.URL+"/api/v1/room/"+roomId, data)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
t.Error(err)
}
if res.StatusCode != http.StatusOK {
t.Fatalf("Expected error %d, got %s: %s", http.StatusOK, res.Status, string(body))
}
var response BackendServerRoomResponse
if err := json.Unmarshal(body, &response); err != nil {
t.Fatal(err)
}
if response.Type != "dialout" || response.Dialout == nil {
t.Fatalf("expected type dialout, got %s", string(body))
}
if response.Dialout.Error != nil {
t.Fatalf("expected dialout success, got %s", string(body))
}
if response.Dialout.CallId != callId {
t.Errorf("expected call id %s, got %s", callId, string(body))
}
key := "callstatus_" + callId
if msg, err := client.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageTransientSet(msg, key, map[string]interface{}{
"callid": callId,
"status": "accepted",
}, nil); err != nil {
t.Error(err)
}
}
if err := internalClient.SendInternalDialout(&DialoutInternalClientMessage{
RoomId: roomId,
Type: "status",
Status: &DialoutStatusInternalClientMessage{
CallId: callId,
Status: "ringing",
},
}); err != nil {
t.Fatal(err)
}
if msg, err := client.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageTransientSet(msg, key, map[string]interface{}{
"callid": callId,
"status": "ringing",
},
map[string]interface{}{
"callid": callId,
"status": "accepted",
}); err != nil {
t.Error(err)
}
}
old := removeCallStatusTTL
defer func() {
removeCallStatusTTL = old
}()
removeCallStatusTTL = 500 * time.Millisecond
clearedCause := "cleared-call"
if err := internalClient.SendInternalDialout(&DialoutInternalClientMessage{
RoomId: roomId,
Type: "status",
Status: &DialoutStatusInternalClientMessage{
CallId: callId,
Status: "cleared",
Cause: clearedCause,
},
}); err != nil {
t.Fatal(err)
}
if msg, err := client.RunUntilMessage(ctx); err != nil {
t.Fatal(err)
} else {
if err := checkMessageTransientSet(msg, key, map[string]interface{}{
"callid": callId,
"status": "cleared",
"cause": clearedCause,
},
map[string]interface{}{
"callid": callId,
"status": "ringing",
}); err != nil {
t.Error(err)
}
}
ctx2, cancel := context.WithTimeout(ctx, removeCallStatusTTL*2)
defer cancel()
if msg, err := client.RunUntilMessage(ctx2); err != nil {
t.Fatal(err)
} else {
if err := checkMessageTransientRemove(msg, key, map[string]interface{}{
"callid": callId,
"status": "cleared",
"cause": clearedCause,
}); err != nil {
t.Error(err)
}
}
}