/** * Standalone signaling server for the Nextcloud Spreed app. * Copyright (C) 2020 struktur AG * * @author Joachim Bauch * * @license GNU AGPL version 3 or any later version * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package signaling import ( "context" "crypto/rand" "crypto/rsa" "encoding/json" "errors" "fmt" "io" "net/http" "net/http/httptest" "net/url" "path" "strings" "sync" "sync/atomic" "testing" "time" "github.com/dlintw/goconf" "github.com/gorilla/websocket" "go.etcd.io/etcd/server/v3/embed" ) func TestMcuProxyStats(t *testing.T) { collectAndLint(t, proxyMcuStats...) } func newProxyConnectionWithCountry(country string) *mcuProxyConnection { conn := &mcuProxyConnection{} conn.country.Store(country) return conn } func Test_sortConnectionsForCountry(t *testing.T) { conn_de := newProxyConnectionWithCountry("DE") conn_at := newProxyConnectionWithCountry("AT") conn_jp := newProxyConnectionWithCountry("JP") conn_us := newProxyConnectionWithCountry("US") testcases := map[string][][]*mcuProxyConnection{ // Direct country match "DE": { {conn_at, conn_jp, conn_de}, {conn_de, conn_at, conn_jp}, }, // Direct country match "AT": { {conn_at, conn_jp, conn_de}, {conn_at, conn_de, conn_jp}, }, // Continent match "CH": { {conn_de, conn_jp, conn_at}, {conn_de, conn_at, conn_jp}, }, // Direct country match "JP": { {conn_de, conn_jp, conn_at}, {conn_jp, conn_de, conn_at}, }, // Continent match "CN": { {conn_de, conn_jp, conn_at}, {conn_jp, conn_de, conn_at}, }, // Continent match "RU": { {conn_us, conn_de, conn_jp, conn_at}, {conn_de, conn_at, conn_us, conn_jp}, }, // No match "AU": { {conn_us, conn_de, conn_jp, conn_at}, {conn_us, conn_de, conn_jp, conn_at}, }, } for country, test := range testcases { country := country test := test t.Run(country, func(t *testing.T) { sorted := sortConnectionsForCountry(test[0], country, nil) for idx, conn := range sorted { if test[1][idx] != conn { t.Errorf("Index %d for %s: expected %s, got %s", idx, country, test[1][idx].Country(), conn.Country()) } } }) } } func Test_sortConnectionsForCountryWithOverride(t *testing.T) { conn_de := newProxyConnectionWithCountry("DE") conn_at := newProxyConnectionWithCountry("AT") conn_jp := newProxyConnectionWithCountry("JP") conn_us := newProxyConnectionWithCountry("US") testcases := map[string][][]*mcuProxyConnection{ // Direct country match "DE": { {conn_at, conn_jp, conn_de}, {conn_de, conn_at, conn_jp}, }, // Direct country match "AT": { {conn_at, conn_jp, conn_de}, {conn_at, conn_de, conn_jp}, }, // Continent match "CH": { {conn_de, conn_jp, conn_at}, {conn_de, conn_at, conn_jp}, }, // Direct country match "JP": { {conn_de, conn_jp, conn_at}, {conn_jp, conn_de, conn_at}, }, // Continent match "CN": { {conn_de, conn_jp, conn_at}, {conn_jp, conn_de, conn_at}, }, // Continent match "RU": { {conn_us, conn_de, conn_jp, conn_at}, {conn_de, conn_at, conn_us, conn_jp}, }, // No match "AR": { {conn_us, conn_de, conn_jp, conn_at}, {conn_us, conn_de, conn_jp, conn_at}, }, // No match but override (OC -> AS / NA) "AU": { {conn_us, conn_jp}, {conn_us, conn_jp}, }, // No match but override (AF -> EU) "ZA": { {conn_de, conn_at}, {conn_de, conn_at}, }, } continentMap := map[string][]string{ // Use European connections for Africa. "AF": {"EU"}, // Use Asian and North American connections for Oceania. "OC": {"AS", "NA"}, } for country, test := range testcases { country := country test := test t.Run(country, func(t *testing.T) { sorted := sortConnectionsForCountry(test[0], country, continentMap) for idx, conn := range sorted { if test[1][idx] != conn { t.Errorf("Index %d for %s: expected %s, got %s", idx, country, test[1][idx].Country(), conn.Country()) } } }) } } type proxyServerClientHandler func(msg *ProxyClientMessage) (*ProxyServerMessage, error) type testProxyServerPublisher struct { id string } type testProxyServerSubscriber struct { id string sid string pub *testProxyServerPublisher remoteUrl string } type testProxyServerClient struct { t *testing.T server *TestProxyServerHandler ws *websocket.Conn processMessage proxyServerClientHandler mu sync.Mutex sessionId string } func (c *testProxyServerClient) processHello(msg *ProxyClientMessage) (*ProxyServerMessage, error) { if msg.Type != "hello" { return nil, fmt.Errorf("expected hello, got %+v", msg) } response := &ProxyServerMessage{ Id: msg.Id, Type: "hello", Hello: &HelloProxyServerMessage{ Version: "1.0", SessionId: c.sessionId, Server: &WelcomeServerMessage{ Version: "1.0", Country: c.server.country, }, }, } c.processMessage = c.processRegularMessage return response, nil } func (c *testProxyServerClient) processRegularMessage(msg *ProxyClientMessage) (*ProxyServerMessage, error) { var handler proxyServerClientHandler switch msg.Type { case "command": handler = c.processCommandMessage } if handler == nil { response := msg.NewWrappedErrorServerMessage(fmt.Errorf("type \"%s\" is not implemented", msg.Type)) return response, nil } return handler(msg) } func (c *testProxyServerClient) processCommandMessage(msg *ProxyClientMessage) (*ProxyServerMessage, error) { var response *ProxyServerMessage switch msg.Command.Type { case "create-publisher": pub := c.server.createPublisher() response = &ProxyServerMessage{ Id: msg.Id, Type: "command", Command: &CommandProxyServerMessage{ Id: pub.id, Bitrate: msg.Command.Bitrate, }, } c.server.updateLoad(1) case "delete-publisher": if pub, found := c.server.deletePublisher(msg.Command.ClientId); !found { response = msg.NewWrappedErrorServerMessage(fmt.Errorf("publisher %s not found", msg.Command.ClientId)) } else { response = &ProxyServerMessage{ Id: msg.Id, Type: "command", Command: &CommandProxyServerMessage{ Id: pub.id, }, } c.server.updateLoad(-1) } case "create-subscriber": var pub *testProxyServerPublisher if msg.Command.RemoteUrl != "" { for _, server := range c.server.servers { if server.URL != msg.Command.RemoteUrl { continue } pub = server.getPublisher(msg.Command.PublisherId) break } } else { pub = c.server.getPublisher(msg.Command.PublisherId) } if pub == nil { response = msg.NewWrappedErrorServerMessage(fmt.Errorf("publisher %s not found", msg.Command.PublisherId)) } else { sub := c.server.createSubscriber(pub) response = &ProxyServerMessage{ Id: msg.Id, Type: "command", Command: &CommandProxyServerMessage{ Id: sub.id, Sid: sub.sid, }, } c.server.updateLoad(1) } case "delete-subscriber": if sub, found := c.server.deleteSubscriber(msg.Command.ClientId); !found { response = msg.NewWrappedErrorServerMessage(fmt.Errorf("subscriber %s not found", msg.Command.ClientId)) } else { if msg.Command.RemoteUrl != sub.remoteUrl { response = msg.NewWrappedErrorServerMessage(fmt.Errorf("remote subscriber %s not found", msg.Command.ClientId)) return response, nil } response = &ProxyServerMessage{ Id: msg.Id, Type: "command", Command: &CommandProxyServerMessage{ Id: sub.id, }, } c.server.updateLoad(-1) } } if response == nil { response = msg.NewWrappedErrorServerMessage(fmt.Errorf("command \"%s\" is not implemented", msg.Command.Type)) } return response, nil } func (c *testProxyServerClient) close() { c.mu.Lock() defer c.mu.Unlock() c.ws.Close() c.ws = nil } func (c *testProxyServerClient) handleSendMessageError(fmt string, msg *ProxyServerMessage, err error) { c.t.Helper() if !errors.Is(err, websocket.ErrCloseSent) || msg.Type != "event" || msg.Event.Type != "update-load" { c.t.Errorf(fmt, msg, err) } } func (c *testProxyServerClient) sendMessage(msg *ProxyServerMessage) { c.mu.Lock() defer c.mu.Unlock() if c.ws == nil { return } data, err := json.Marshal(msg) if err != nil { c.handleSendMessageError("error marshalling %+v: %s", msg, err) return } w, err := c.ws.NextWriter(websocket.TextMessage) if err != nil { c.handleSendMessageError("error creating writer for %+v: %s", msg, err) return } if _, err := w.Write(data); err != nil { c.handleSendMessageError("error sending %+v: %s", msg, err) return } if err := w.Close(); err != nil { c.handleSendMessageError("error during close of sending %+v: %s", msg, err) } } func (c *testProxyServerClient) run() { defer func() { c.mu.Lock() defer c.mu.Unlock() c.server.removeClient(c) c.ws = nil }() c.processMessage = c.processHello for { c.mu.Lock() ws := c.ws c.mu.Unlock() if ws == nil { break } msgType, reader, err := ws.NextReader() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { c.t.Error(err) } return } body, err := io.ReadAll(reader) if err != nil { c.t.Error(err) continue } if msgType != websocket.TextMessage { c.t.Errorf("unexpected message type %q (%s)", msgType, string(body)) continue } var msg ProxyClientMessage if err := json.Unmarshal(body, &msg); err != nil { c.t.Errorf("could not decode message %s: %s", string(body), err) continue } if err := msg.CheckValid(); err != nil { c.t.Errorf("invalid message %s: %s", string(body), err) continue } response, err := c.processMessage(&msg) if err != nil { c.t.Error(err) continue } c.sendMessage(response) if response.Type == "hello" { c.server.sendLoad(c) } } } type TestProxyServerHandler struct { t *testing.T URL string server *httptest.Server servers []*TestProxyServerHandler upgrader *websocket.Upgrader country string mu sync.Mutex load atomic.Int64 incoming atomic.Pointer[float64] outgoing atomic.Pointer[float64] clients map[string]*testProxyServerClient publishers map[string]*testProxyServerPublisher subscribers map[string]*testProxyServerSubscriber } func (h *TestProxyServerHandler) createPublisher() *testProxyServerPublisher { h.mu.Lock() defer h.mu.Unlock() pub := &testProxyServerPublisher{ id: newRandomString(32), } for { if _, found := h.publishers[pub.id]; !found { break } pub.id = newRandomString(32) } h.publishers[pub.id] = pub return pub } func (h *TestProxyServerHandler) getPublisher(id string) *testProxyServerPublisher { h.mu.Lock() defer h.mu.Unlock() return h.publishers[id] } func (h *TestProxyServerHandler) deletePublisher(id string) (*testProxyServerPublisher, bool) { h.mu.Lock() defer h.mu.Unlock() pub, found := h.publishers[id] if !found { return nil, false } delete(h.publishers, id) return pub, true } func (h *TestProxyServerHandler) createSubscriber(pub *testProxyServerPublisher) *testProxyServerSubscriber { h.mu.Lock() defer h.mu.Unlock() sub := &testProxyServerSubscriber{ id: newRandomString(32), sid: newRandomString(8), pub: pub, } for { if _, found := h.subscribers[sub.id]; !found { break } sub.id = newRandomString(32) } h.subscribers[sub.id] = sub return sub } func (h *TestProxyServerHandler) deleteSubscriber(id string) (*testProxyServerSubscriber, bool) { h.mu.Lock() defer h.mu.Unlock() sub, found := h.subscribers[id] if !found { return nil, false } delete(h.subscribers, id) return sub, true } func (h *TestProxyServerHandler) UpdateBandwidth(incoming float64, outgoing float64) { h.incoming.Store(&incoming) h.outgoing.Store(&outgoing) h.mu.Lock() defer h.mu.Unlock() msg := h.getLoadMessage(h.load.Load()) for _, c := range h.clients { c.sendMessage(msg) } } func (h *TestProxyServerHandler) Clear(incoming bool, outgoing bool) { if incoming { h.incoming.Store(nil) } if outgoing { h.outgoing.Store(nil) } h.mu.Lock() defer h.mu.Unlock() msg := h.getLoadMessage(h.load.Load()) for _, c := range h.clients { c.sendMessage(msg) } } func (h *TestProxyServerHandler) getLoadMessage(load int64) *ProxyServerMessage { msg := &ProxyServerMessage{ Type: "event", Event: &EventProxyServerMessage{ Type: "update-load", Load: load, }, } incoming := h.incoming.Load() outgoing := h.outgoing.Load() if incoming != nil || outgoing != nil { msg.Event.Bandwidth = &EventProxyServerBandwidth{ Incoming: incoming, Outgoing: outgoing, } } return msg } func (h *TestProxyServerHandler) updateLoad(delta int64) { if delta == 0 { return } load := h.load.Add(delta) h.mu.Lock() defer h.mu.Unlock() msg := h.getLoadMessage(load) for _, c := range h.clients { go c.sendMessage(msg) } } func (h *TestProxyServerHandler) sendLoad(c *testProxyServerClient) { msg := h.getLoadMessage(h.load.Load()) c.sendMessage(msg) } func (h *TestProxyServerHandler) removeClient(client *testProxyServerClient) { h.mu.Lock() defer h.mu.Unlock() delete(h.clients, client.sessionId) } func (h *TestProxyServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ws, err := h.upgrader.Upgrade(w, r, nil) if err != nil { h.t.Error(err) return } client := &testProxyServerClient{ t: h.t, server: h, ws: ws, sessionId: newRandomString(32), } h.mu.Lock() h.clients[client.sessionId] = client h.mu.Unlock() go client.run() } func NewProxyServerForTest(t *testing.T, country string) *TestProxyServerHandler { t.Helper() upgrader := websocket.Upgrader{} proxyHandler := &TestProxyServerHandler{ t: t, upgrader: &upgrader, country: country, clients: make(map[string]*testProxyServerClient), publishers: make(map[string]*testProxyServerPublisher), subscribers: make(map[string]*testProxyServerSubscriber), } server := httptest.NewServer(proxyHandler) proxyHandler.server = server proxyHandler.URL = server.URL t.Cleanup(func() { server.Close() proxyHandler.mu.Lock() defer proxyHandler.mu.Unlock() for _, c := range proxyHandler.clients { c.close() } }) return proxyHandler } type proxyTestOptions struct { etcd *embed.Etcd servers []*TestProxyServerHandler } func newMcuProxyForTestWithOptions(t *testing.T, options proxyTestOptions) *mcuProxy { t.Helper() if options.etcd == nil { options.etcd = NewEtcdForTest(t) } grpcClients, dnsMonitor := NewGrpcClientsWithEtcdForTest(t, options.etcd) tokenKey, err := rsa.GenerateKey(rand.Reader, 1024) if err != nil { t.Fatal(err) } dir := t.TempDir() privkeyFile := path.Join(dir, "privkey.pem") pubkeyFile := path.Join(dir, "pubkey.pem") WritePrivateKey(tokenKey, privkeyFile) // nolint WritePublicKey(&tokenKey.PublicKey, pubkeyFile) // nolint cfg := goconf.NewConfigFile() cfg.AddOption("mcu", "urltype", "static") var urls []string waitingMap := make(map[string]bool) if len(options.servers) == 0 { options.servers = []*TestProxyServerHandler{ NewProxyServerForTest(t, "DE"), } } for _, s := range options.servers { s.servers = options.servers urls = append(urls, s.URL) waitingMap[s.URL] = true } cfg.AddOption("mcu", "url", strings.Join(urls, " ")) cfg.AddOption("mcu", "token_id", "test-token") cfg.AddOption("mcu", "token_key", privkeyFile) etcdConfig := goconf.NewConfigFile() etcdConfig.AddOption("etcd", "endpoints", options.etcd.Config().ListenClientUrls[0].String()) etcdConfig.AddOption("etcd", "loglevel", "error") etcdClient, err := NewEtcdClient(etcdConfig, "") if err != nil { t.Fatal(err) } t.Cleanup(func() { if err := etcdClient.Close(); err != nil { t.Error(err) } }) mcu, err := NewMcuProxy(cfg, etcdClient, grpcClients, dnsMonitor) if err != nil { t.Fatal(err) } t.Cleanup(func() { mcu.Stop() }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() if err := mcu.Start(ctx); err != nil { t.Fatal(err) } proxy := mcu.(*mcuProxy) if err := proxy.WaitForConnections(ctx); err != nil { t.Fatal(err) } for len(waitingMap) > 0 { if err := ctx.Err(); err != nil { t.Fatal(err) } for u := range waitingMap { proxy.connectionsMu.RLock() connections := proxy.connections proxy.connectionsMu.RUnlock() for _, c := range connections { if c.rawUrl == u && c.IsConnected() && c.SessionId() != "" { delete(waitingMap, u) break } } } time.Sleep(time.Millisecond) } return proxy } func newMcuProxyForTestWithServers(t *testing.T, servers []*TestProxyServerHandler) *mcuProxy { t.Helper() return newMcuProxyForTestWithOptions(t, proxyTestOptions{ servers: servers, }) } func newMcuProxyForTest(t *testing.T) *mcuProxy { t.Helper() server := NewProxyServerForTest(t, "DE") return newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{server}) } func Test_ProxyPublisherSubscriber(t *testing.T) { CatchLogForTest(t) t.Parallel() mcu := newMcuProxyForTest(t) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } defer pub.Close(context.Background()) subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "DE", } sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Fatal(err) } defer sub.Close(context.Background()) } func Test_ProxyWaitForPublisher(t *testing.T) { CatchLogForTest(t) t.Parallel() mcu := newMcuProxyForTest(t) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "DE", } done := make(chan struct{}) go func() { defer close(done) sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Error(err) return } defer sub.Close(context.Background()) }() // Give subscriber goroutine some time to start time.Sleep(100 * time.Millisecond) pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } select { case <-done: case <-ctx.Done(): t.Error(ctx.Err()) } defer pub.Close(context.Background()) } func Test_ProxyPublisherBandwidth(t *testing.T) { CatchLogForTest(t) t.Parallel() server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ server1, server2, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pub1Id := "the-publisher-1" pub1Sid := "1234567890" pub1Listener := &MockMcuListener{ publicId: pub1Id + "-public", } pub1Initiator := &MockMcuInitiator{ country: "DE", } pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator) if err != nil { t.Fatal(err) } defer pub1.Close(context.Background()) if pub1.(*mcuProxyPublisher).conn.rawUrl == server1.URL { server1.UpdateBandwidth(100, 0) } else { server2.UpdateBandwidth(100, 0) } // Wait until proxy has been updated for ctx.Err() == nil { mcu.connectionsMu.RLock() connections := mcu.connections mcu.connectionsMu.RUnlock() missing := true for _, c := range connections { if c.Bandwidth() != nil { missing = false break } } if !missing { break } time.Sleep(time.Millisecond) } pub2Id := "the-publisher-2" pub2id := "1234567890" pub2Listener := &MockMcuListener{ publicId: pub2Id + "-public", } pub2Initiator := &MockMcuInitiator{ country: "DE", } pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator) if err != nil { t.Fatal(err) } defer pub2.Close(context.Background()) if pub1.(*mcuProxyPublisher).conn.rawUrl == pub2.(*mcuProxyPublisher).conn.rawUrl { t.Errorf("servers should be different, got %s", pub1.(*mcuProxyPublisher).conn.rawUrl) } } func Test_ProxyPublisherBandwidthOverload(t *testing.T) { CatchLogForTest(t) t.Parallel() server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ server1, server2, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pub1Id := "the-publisher-1" pub1Sid := "1234567890" pub1Listener := &MockMcuListener{ publicId: pub1Id + "-public", } pub1Initiator := &MockMcuInitiator{ country: "DE", } pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator) if err != nil { t.Fatal(err) } defer pub1.Close(context.Background()) // If all servers are bandwidth loaded, select the one with the least usage. if pub1.(*mcuProxyPublisher).conn.rawUrl == server1.URL { server1.UpdateBandwidth(100, 0) server2.UpdateBandwidth(102, 0) } else { server1.UpdateBandwidth(102, 0) server2.UpdateBandwidth(100, 0) } // Wait until proxy has been updated for ctx.Err() == nil { mcu.connectionsMu.RLock() connections := mcu.connections mcu.connectionsMu.RUnlock() missing := false for _, c := range connections { if c.Bandwidth() == nil { missing = true break } } if !missing { break } time.Sleep(time.Millisecond) } pub2Id := "the-publisher-2" pub2id := "1234567890" pub2Listener := &MockMcuListener{ publicId: pub2Id + "-public", } pub2Initiator := &MockMcuInitiator{ country: "DE", } pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator) if err != nil { t.Fatal(err) } defer pub2.Close(context.Background()) if pub1.(*mcuProxyPublisher).conn.rawUrl != pub2.(*mcuProxyPublisher).conn.rawUrl { t.Errorf("servers should be the same, got %s / %s", pub1.(*mcuProxyPublisher).conn.rawUrl, pub2.(*mcuProxyPublisher).conn.rawUrl) } } func Test_ProxyPublisherLoad(t *testing.T) { CatchLogForTest(t) t.Parallel() server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ server1, server2, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pub1Id := "the-publisher-1" pub1Sid := "1234567890" pub1Listener := &MockMcuListener{ publicId: pub1Id + "-public", } pub1Initiator := &MockMcuInitiator{ country: "DE", } pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator) if err != nil { t.Fatal(err) } defer pub1.Close(context.Background()) // Make sure connections are re-sorted. mcu.nextSort.Store(0) time.Sleep(100 * time.Millisecond) pub2Id := "the-publisher-2" pub2id := "1234567890" pub2Listener := &MockMcuListener{ publicId: pub2Id + "-public", } pub2Initiator := &MockMcuInitiator{ country: "DE", } pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator) if err != nil { t.Fatal(err) } defer pub2.Close(context.Background()) if pub1.(*mcuProxyPublisher).conn.rawUrl == pub2.(*mcuProxyPublisher).conn.rawUrl { t.Errorf("servers should be different, got %s", pub1.(*mcuProxyPublisher).conn.rawUrl) } } func Test_ProxyPublisherCountry(t *testing.T) { CatchLogForTest(t) t.Parallel() serverDE := NewProxyServerForTest(t, "DE") serverUS := NewProxyServerForTest(t, "US") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ serverDE, serverUS, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubDEId := "the-publisher-de" pubDESid := "1234567890" pubDEListener := &MockMcuListener{ publicId: pubDEId + "-public", } pubDEInitiator := &MockMcuInitiator{ country: "DE", } pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubDEInitiator) if err != nil { t.Fatal(err) } defer pubDE.Close(context.Background()) if pubDE.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, pubDE.(*mcuProxyPublisher).conn.rawUrl) } pubUSId := "the-publisher-us" pubUSSid := "1234567890" pubUSListener := &MockMcuListener{ publicId: pubUSId + "-public", } pubUSInitiator := &MockMcuInitiator{ country: "US", } pubUS, err := mcu.NewPublisher(ctx, pubUSListener, pubUSId, pubUSSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubUSInitiator) if err != nil { t.Fatal(err) } defer pubUS.Close(context.Background()) if pubUS.(*mcuProxyPublisher).conn.rawUrl != serverUS.URL { t.Errorf("expected server %s, go %s", serverUS.URL, pubUS.(*mcuProxyPublisher).conn.rawUrl) } } func Test_ProxyPublisherContinent(t *testing.T) { CatchLogForTest(t) t.Parallel() serverDE := NewProxyServerForTest(t, "DE") serverUS := NewProxyServerForTest(t, "US") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ serverDE, serverUS, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubDEId := "the-publisher-de" pubDESid := "1234567890" pubDEListener := &MockMcuListener{ publicId: pubDEId + "-public", } pubDEInitiator := &MockMcuInitiator{ country: "DE", } pubDE, err := mcu.NewPublisher(ctx, pubDEListener, pubDEId, pubDESid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubDEInitiator) if err != nil { t.Fatal(err) } defer pubDE.Close(context.Background()) if pubDE.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, pubDE.(*mcuProxyPublisher).conn.rawUrl) } pubFRId := "the-publisher-fr" pubFRSid := "1234567890" pubFRListener := &MockMcuListener{ publicId: pubFRId + "-public", } pubFRInitiator := &MockMcuInitiator{ country: "FR", } pubFR, err := mcu.NewPublisher(ctx, pubFRListener, pubFRId, pubFRSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubFRInitiator) if err != nil { t.Fatal(err) } defer pubFR.Close(context.Background()) if pubFR.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, pubFR.(*mcuProxyPublisher).conn.rawUrl) } } func Test_ProxySubscriberCountry(t *testing.T) { CatchLogForTest(t) t.Parallel() serverDE := NewProxyServerForTest(t, "DE") serverUS := NewProxyServerForTest(t, "US") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ serverDE, serverUS, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } defer pub.Close(context.Background()) if pub.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, pub.(*mcuProxyPublisher).conn.rawUrl) } subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "US", } sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Fatal(err) } defer sub.Close(context.Background()) if sub.(*mcuProxySubscriber).conn.rawUrl != serverUS.URL { t.Errorf("expected server %s, go %s", serverUS.URL, sub.(*mcuProxySubscriber).conn.rawUrl) } } func Test_ProxySubscriberContinent(t *testing.T) { CatchLogForTest(t) t.Parallel() serverDE := NewProxyServerForTest(t, "DE") serverUS := NewProxyServerForTest(t, "US") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ serverDE, serverUS, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } defer pub.Close(context.Background()) if pub.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, pub.(*mcuProxyPublisher).conn.rawUrl) } subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "FR", } sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Fatal(err) } defer sub.Close(context.Background()) if sub.(*mcuProxySubscriber).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, sub.(*mcuProxySubscriber).conn.rawUrl) } } func Test_ProxySubscriberBandwidth(t *testing.T) { CatchLogForTest(t) t.Parallel() serverDE := NewProxyServerForTest(t, "DE") serverUS := NewProxyServerForTest(t, "US") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ serverDE, serverUS, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } defer pub.Close(context.Background()) if pub.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, pub.(*mcuProxyPublisher).conn.rawUrl) } serverDE.UpdateBandwidth(0, 100) // Wait until proxy has been updated for ctx.Err() == nil { mcu.connectionsMu.RLock() connections := mcu.connections mcu.connectionsMu.RUnlock() missing := true for _, c := range connections { if c.Bandwidth() != nil { missing = false break } } if !missing { break } time.Sleep(time.Millisecond) } subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "US", } sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Fatal(err) } defer sub.Close(context.Background()) if sub.(*mcuProxySubscriber).conn.rawUrl != serverUS.URL { t.Errorf("expected server %s, go %s", serverUS.URL, sub.(*mcuProxySubscriber).conn.rawUrl) } } func Test_ProxySubscriberBandwidthOverload(t *testing.T) { CatchLogForTest(t) t.Parallel() serverDE := NewProxyServerForTest(t, "DE") serverUS := NewProxyServerForTest(t, "US") mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ serverDE, serverUS, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } defer pub.Close(context.Background()) if pub.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, pub.(*mcuProxyPublisher).conn.rawUrl) } serverDE.UpdateBandwidth(0, 100) serverUS.UpdateBandwidth(0, 102) // Wait until proxy has been updated for ctx.Err() == nil { mcu.connectionsMu.RLock() connections := mcu.connections mcu.connectionsMu.RUnlock() missing := false for _, c := range connections { if c.Bandwidth() == nil { missing = true break } } if !missing { break } time.Sleep(time.Millisecond) } subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "US", } sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Fatal(err) } defer sub.Close(context.Background()) if sub.(*mcuProxySubscriber).conn.rawUrl != serverDE.URL { t.Errorf("expected server %s, go %s", serverDE.URL, sub.(*mcuProxySubscriber).conn.rawUrl) } } type mockGrpcServerHub struct { sessionsLock sync.Mutex sessionByPublicId map[string]Session } func (h *mockGrpcServerHub) addSession(session *ClientSession) { h.sessionsLock.Lock() defer h.sessionsLock.Unlock() if h.sessionByPublicId == nil { h.sessionByPublicId = make(map[string]Session) } h.sessionByPublicId[session.PublicId()] = session } func (h *mockGrpcServerHub) removeSession(session *ClientSession) { h.sessionsLock.Lock() defer h.sessionsLock.Unlock() delete(h.sessionByPublicId, session.PublicId()) } func (h *mockGrpcServerHub) GetSessionByResumeId(resumeId string) Session { return nil } func (h *mockGrpcServerHub) GetSessionByPublicId(sessionId string) Session { h.sessionsLock.Lock() defer h.sessionsLock.Unlock() return h.sessionByPublicId[sessionId] } func (h *mockGrpcServerHub) GetSessionIdByRoomSessionId(roomSessionId string) (string, error) { return "", nil } func (h *mockGrpcServerHub) GetBackend(u *url.URL) *Backend { return nil } func Test_ProxyRemotePublisher(t *testing.T) { CatchLogForTest(t) t.Parallel() etcd := NewEtcdForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) hub1 := &mockGrpcServerHub{} hub2 := &mockGrpcServerHub{} grpcServer1.hub = hub1 grpcServer2.hub = hub2 SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ etcd: etcd, servers: []*TestProxyServerHandler{ server1, server2, }, }) mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ etcd: etcd, servers: []*TestProxyServerHandler{ server1, server2, }, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } session1 := &ClientSession{ publicId: pubId, publishers: make(map[StreamType]McuPublisher), } hub1.addSession(session1) defer hub1.removeSession(session1) pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } defer pub.Close(context.Background()) session1.mu.Lock() session1.publishers[StreamTypeVideo] = pub session1.publisherWaiters.Wakeup() session1.mu.Unlock() subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "DE", } sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Fatal(err) } defer sub.Close(context.Background()) } func Test_ProxyRemotePublisherWait(t *testing.T) { CatchLogForTest(t) t.Parallel() etcd := NewEtcdForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) hub1 := &mockGrpcServerHub{} hub2 := &mockGrpcServerHub{} grpcServer1.hub = hub1 grpcServer2.hub = hub2 SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ etcd: etcd, servers: []*TestProxyServerHandler{ server1, server2, }, }) mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ etcd: etcd, servers: []*TestProxyServerHandler{ server1, server2, }, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } session1 := &ClientSession{ publicId: pubId, publishers: make(map[StreamType]McuPublisher), } hub1.addSession(session1) defer hub1.removeSession(session1) subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "DE", } done := make(chan struct{}) go func() { defer close(done) sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Error(err) return } defer sub.Close(context.Background()) }() // Give subscriber goroutine some time to start time.Sleep(100 * time.Millisecond) pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } defer pub.Close(context.Background()) session1.mu.Lock() session1.publishers[StreamTypeVideo] = pub session1.publisherWaiters.Wakeup() session1.mu.Unlock() select { case <-done: case <-ctx.Done(): t.Error(ctx.Err()) } } func Test_ProxyRemotePublisherTemporary(t *testing.T) { CatchLogForTest(t) t.Parallel() etcd := NewEtcdForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) hub1 := &mockGrpcServerHub{} hub2 := &mockGrpcServerHub{} grpcServer1.hub = hub1 grpcServer2.hub = hub2 SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ etcd: etcd, servers: []*TestProxyServerHandler{ server1, }, }) mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ etcd: etcd, servers: []*TestProxyServerHandler{ server2, }, }) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() pubId := "the-publisher" pubSid := "1234567890" pubListener := &MockMcuListener{ publicId: pubId + "-public", } pubInitiator := &MockMcuInitiator{ country: "DE", } session1 := &ClientSession{ publicId: pubId, publishers: make(map[StreamType]McuPublisher), } hub1.addSession(session1) defer hub1.removeSession(session1) pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) if err != nil { t.Fatal(err) } defer pub.Close(context.Background()) session1.mu.Lock() session1.publishers[StreamTypeVideo] = pub session1.publisherWaiters.Wakeup() session1.mu.Unlock() mcu2.connectionsMu.RLock() count := len(mcu2.connections) mcu2.connectionsMu.RUnlock() if expected := 1; count != expected { t.Errorf("expected %d connections, got %+v", expected, count) } subListener := &MockMcuListener{ publicId: "subscriber-public", } subInitiator := &MockMcuInitiator{ country: "DE", } sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) if err != nil { t.Fatal(err) } defer sub.Close(context.Background()) if sub.(*mcuProxySubscriber).conn.rawUrl != server1.URL { t.Errorf("expected server %s, go %s", server1.URL, sub.(*mcuProxySubscriber).conn.rawUrl) } // The temporary connection has been added mcu2.connectionsMu.RLock() count = len(mcu2.connections) mcu2.connectionsMu.RUnlock() if expected := 2; count != expected { t.Errorf("expected %d connections, got %+v", expected, count) } sub.Close(context.Background()) // Wait for temporary connection to be removed. loop: for { select { case <-ctx.Done(): t.Error(ctx.Err()) default: mcu2.connectionsMu.RLock() count = len(mcu2.connections) mcu2.connectionsMu.RUnlock() if count == 1 { break loop } } } }