From 710264e3667a82b13b90135c6912e5348ce683f4 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 29 Jan 2026 16:35:50 +0100 Subject: [PATCH] Move SFU proxy tests closer to the checked code. --- server/hub_sfu_proxy_test.go | 653 ----------------------------------- sfu/proxy/proxy_test.go | 510 +++++++++++++++++++++++++++ 2 files changed, 510 insertions(+), 653 deletions(-) delete mode 100644 server/hub_sfu_proxy_test.go diff --git a/server/hub_sfu_proxy_test.go b/server/hub_sfu_proxy_test.go deleted file mode 100644 index 278f3be..0000000 --- a/server/hub_sfu_proxy_test.go +++ /dev/null @@ -1,653 +0,0 @@ -/** - * Standalone signaling server for the Nextcloud Spreed app. - * Copyright (C) 2026 struktur AG - * - * @author Joachim Bauch - * - * @license GNU AGPL version 3 or any later version - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package server - -import ( - "context" - "errors" - "net/url" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/strukturag/nextcloud-spreed-signaling/api" - etcdtest "github.com/strukturag/nextcloud-spreed-signaling/etcd/test" - "github.com/strukturag/nextcloud-spreed-signaling/grpc" - grpctest "github.com/strukturag/nextcloud-spreed-signaling/grpc/test" - "github.com/strukturag/nextcloud-spreed-signaling/sfu" - "github.com/strukturag/nextcloud-spreed-signaling/sfu/mock" - proxytest "github.com/strukturag/nextcloud-spreed-signaling/sfu/proxy/test" - "github.com/strukturag/nextcloud-spreed-signaling/sfu/proxy/testserver" - "github.com/strukturag/nextcloud-spreed-signaling/talk" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -type mockGrpcServerHub struct { - proxy atomic.Pointer[sfu.WithToken] - sessionsLock sync.Mutex - // +checklocks:sessionsLock - sessionByPublicId map[api.PublicSessionId]Session -} - -func (h *mockGrpcServerHub) setProxy(t *testing.T, proxy sfu.SFU) { - t.Helper() - - wt, ok := proxy.(sfu.WithToken) - require.True(t, ok, "need a sfu with token support") - h.proxy.Store(&wt) -} - -func (h *mockGrpcServerHub) getSession(sessionId api.PublicSessionId) Session { - h.sessionsLock.Lock() - defer h.sessionsLock.Unlock() - - return h.sessionByPublicId[sessionId] -} - -func (h *mockGrpcServerHub) addSession(session *ClientSession) { - h.sessionsLock.Lock() - defer h.sessionsLock.Unlock() - if h.sessionByPublicId == nil { - h.sessionByPublicId = make(map[api.PublicSessionId]Session) - } - h.sessionByPublicId[session.PublicId()] = session -} - -func (h *mockGrpcServerHub) removeSession(session *ClientSession) { - h.sessionsLock.Lock() - defer h.sessionsLock.Unlock() - delete(h.sessionByPublicId, session.PublicId()) -} - -func (h *mockGrpcServerHub) GetSessionIdByResumeId(resumeId api.PrivateSessionId) api.PublicSessionId { - return "" -} - -func (h *mockGrpcServerHub) GetSessionIdByRoomSessionId(roomSessionId api.RoomSessionId) (api.PublicSessionId, error) { - return "", nil -} - -func (h *mockGrpcServerHub) IsSessionIdInCall(sessionId api.PublicSessionId, roomId string, backendUrl string) (bool, bool) { - return false, false -} - -func (h *mockGrpcServerHub) DisconnectSessionByRoomSessionId(sessionId api.PublicSessionId, roomSessionId api.RoomSessionId, reason string) { -} - -func (h *mockGrpcServerHub) GetBackend(u *url.URL) *talk.Backend { - return nil -} - -func (h *mockGrpcServerHub) GetInternalSessions(roomId string, backend *talk.Backend) ([]*grpc.InternalSessionData, []*grpc.VirtualSessionData, bool) { - return nil, nil, false -} - -func (h *mockGrpcServerHub) GetTransientEntries(roomId string, backend *talk.Backend) (api.TransientDataEntries, bool) { - return nil, false -} - -func (h *mockGrpcServerHub) GetPublisherIdForSessionId(ctx context.Context, sessionId api.PublicSessionId, streamType sfu.StreamType) (*grpc.GetPublisherIdReply, error) { - session := h.getSession(sessionId) - if session == nil { - return nil, status.Error(codes.NotFound, "no such session") - } - - clientSession, ok := session.(*ClientSession) - if !ok { - return nil, status.Error(codes.NotFound, "no such session") - } - - publisher := clientSession.GetOrWaitForPublisher(ctx, streamType) - if publisher, ok := publisher.(sfu.PublisherWithConnectionUrlAndIP); ok { - connUrl, ip := publisher.GetConnectionURL() - reply := &grpc.GetPublisherIdReply{ - PublisherId: publisher.Id(), - ProxyUrl: connUrl, - } - if len(ip) > 0 { - reply.Ip = ip.String() - } - - if proxy := h.proxy.Load(); proxy != nil { - reply.ConnectToken, _ = (*proxy).CreateToken("") - reply.PublisherToken, _ = (*proxy).CreateToken(publisher.Id()) - } - return reply, nil - } - - return nil, status.Error(codes.NotFound, "no such publisher") -} - -func (h *mockGrpcServerHub) ProxySession(request grpc.RpcSessions_ProxySessionServer) error { - return errors.New("not implemented") -} - -func Test_ProxyRemotePublisher(t *testing.T) { - t.Parallel() - - embedEtcd := etcdtest.NewServerForTest(t) - - grpcServer1, addr1 := grpctest.NewServerForTest(t) - grpcServer2, addr2 := grpctest.NewServerForTest(t) - - hub1 := &mockGrpcServerHub{} - hub2 := &mockGrpcServerHub{} - grpcServer1.SetHub(hub1) - grpcServer2.SetHub(hub2) - - embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - - server1 := testserver.NewProxyServerForTest(t, "DE") - server2 := testserver.NewProxyServerForTest(t, "DE") - - mcu1, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - server2, - }, - }, 1, nil) - hub1.setProxy(t, mcu1) - mcu2, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - server2, - }, - }, 2, nil) - hub2.setProxy(t, mcu2) - - ctx, cancel := context.WithTimeout(t.Context(), testTimeout) - defer cancel() - - pubId := api.PublicSessionId("the-publisher") - pubSid := "1234567890" - pubListener := mock.NewListener(pubId + "-public") - pubInitiator := mock.NewInitiator("DE") - - session1 := &ClientSession{ - publicId: pubId, - publishers: make(map[sfu.StreamType]sfu.Publisher), - } - hub1.addSession(session1) - defer hub1.removeSession(session1) - - pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ - MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, - }, pubInitiator) - require.NoError(t, err) - - defer pub.Close(context.Background()) - - session1.mu.Lock() - session1.publishers[sfu.StreamTypeVideo] = pub - session1.publisherWaiters.Wakeup() - session1.mu.Unlock() - - subListener := mock.NewListener("subscriber-public") - subInitiator := mock.NewInitiator("DE") - sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) - require.NoError(t, err) - - defer sub.Close(context.Background()) -} - -func Test_ProxyMultipleRemotePublisher(t *testing.T) { - t.Parallel() - - embedEtcd := etcdtest.NewServerForTest(t) - - grpcServer1, addr1 := grpctest.NewServerForTest(t) - grpcServer2, addr2 := grpctest.NewServerForTest(t) - grpcServer3, addr3 := grpctest.NewServerForTest(t) - - hub1 := &mockGrpcServerHub{} - hub2 := &mockGrpcServerHub{} - hub3 := &mockGrpcServerHub{} - grpcServer1.SetHub(hub1) - grpcServer2.SetHub(hub2) - grpcServer3.SetHub(hub3) - - embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - embedEtcd.SetValue("/grpctargets/three", []byte("{\"address\":\""+addr3+"\"}")) - - server1 := testserver.NewProxyServerForTest(t, "DE") - server2 := testserver.NewProxyServerForTest(t, "US") - server3 := testserver.NewProxyServerForTest(t, "US") - - mcu1, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - server2, - server3, - }, - }, 1, nil) - hub1.setProxy(t, mcu1) - mcu2, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - server2, - server3, - }, - }, 2, nil) - hub2.setProxy(t, mcu2) - mcu3, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - server2, - server3, - }, - }, 3, nil) - hub3.setProxy(t, mcu3) - - ctx, cancel := context.WithTimeout(t.Context(), testTimeout) - defer cancel() - - pubId := api.PublicSessionId("the-publisher") - pubSid := "1234567890" - pubListener := mock.NewListener(pubId + "-public") - pubInitiator := mock.NewInitiator("DE") - - session1 := &ClientSession{ - publicId: pubId, - publishers: make(map[sfu.StreamType]sfu.Publisher), - } - hub1.addSession(session1) - defer hub1.removeSession(session1) - - pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ - MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, - }, pubInitiator) - require.NoError(t, err) - - defer pub.Close(context.Background()) - - session1.mu.Lock() - session1.publishers[sfu.StreamTypeVideo] = pub - session1.publisherWaiters.Wakeup() - session1.mu.Unlock() - - sub1Listener := mock.NewListener("subscriber-public-1") - sub1Initiator := mock.NewInitiator("US") - sub1, err := mcu2.NewSubscriber(ctx, sub1Listener, pubId, sfu.StreamTypeVideo, sub1Initiator) - require.NoError(t, err) - - defer sub1.Close(context.Background()) - - sub2Listener := mock.NewListener("subscriber-public-2") - sub2Initiator := mock.NewInitiator("US") - sub2, err := mcu3.NewSubscriber(ctx, sub2Listener, pubId, sfu.StreamTypeVideo, sub2Initiator) - require.NoError(t, err) - - defer sub2.Close(context.Background()) -} - -func Test_ProxyRemotePublisherWait(t *testing.T) { - t.Parallel() - - embedEtcd := etcdtest.NewServerForTest(t) - - grpcServer1, addr1 := grpctest.NewServerForTest(t) - grpcServer2, addr2 := grpctest.NewServerForTest(t) - - hub1 := &mockGrpcServerHub{} - hub2 := &mockGrpcServerHub{} - grpcServer1.SetHub(hub1) - grpcServer2.SetHub(hub2) - - embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - - server1 := testserver.NewProxyServerForTest(t, "DE") - server2 := testserver.NewProxyServerForTest(t, "DE") - - mcu1, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - server2, - }, - }, 1, nil) - hub1.setProxy(t, mcu1) - mcu2, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - server2, - }, - }, 2, nil) - hub2.setProxy(t, mcu2) - - ctx, cancel := context.WithTimeout(t.Context(), testTimeout) - defer cancel() - - pubId := api.PublicSessionId("the-publisher") - pubSid := "1234567890" - pubListener := mock.NewListener(pubId + "-public") - pubInitiator := mock.NewInitiator("DE") - - session1 := &ClientSession{ - publicId: pubId, - publishers: make(map[sfu.StreamType]sfu.Publisher), - } - hub1.addSession(session1) - defer hub1.removeSession(session1) - - subListener := mock.NewListener("subscriber-public") - subInitiator := mock.NewInitiator("DE") - - done := make(chan struct{}) - go func() { - defer close(done) - sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) - if !assert.NoError(t, err) { - return - } - - defer sub.Close(context.Background()) - }() - - // Give subscriber goroutine some time to start - time.Sleep(100 * time.Millisecond) - - pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ - MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, - }, pubInitiator) - require.NoError(t, err) - - defer pub.Close(context.Background()) - - session1.mu.Lock() - session1.publishers[sfu.StreamTypeVideo] = pub - session1.publisherWaiters.Wakeup() - session1.mu.Unlock() - - select { - case <-done: - case <-ctx.Done(): - assert.NoError(t, ctx.Err()) - } -} - -func Test_ProxyRemotePublisherTemporary(t *testing.T) { - t.Parallel() - - assert := assert.New(t) - embedEtcd := etcdtest.NewServerForTest(t) - - grpcServer1, addr1 := grpctest.NewServerForTest(t) - grpcServer2, addr2 := grpctest.NewServerForTest(t) - - hub1 := &mockGrpcServerHub{} - hub2 := &mockGrpcServerHub{} - grpcServer1.SetHub(hub1) - grpcServer2.SetHub(hub2) - - embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - - server1 := testserver.NewProxyServerForTest(t, "DE") - server2 := testserver.NewProxyServerForTest(t, "DE") - - mcu1, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - }, - }, 1, nil) - hub1.setProxy(t, mcu1) - mcu2, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server2, - }, - }, 2, nil) - hub2.setProxy(t, mcu2) - - ctx, cancel := context.WithTimeout(t.Context(), testTimeout) - defer cancel() - - pubId := api.PublicSessionId("the-publisher") - pubSid := "1234567890" - pubListener := mock.NewListener(pubId + "-public") - pubInitiator := mock.NewInitiator("DE") - - session1 := &ClientSession{ - publicId: pubId, - publishers: make(map[sfu.StreamType]sfu.Publisher), - } - hub1.addSession(session1) - defer hub1.removeSession(session1) - - pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ - MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, - }, pubInitiator) - require.NoError(t, err) - - defer pub.Close(context.Background()) - - session1.mu.Lock() - session1.publishers[sfu.StreamTypeVideo] = pub - session1.publisherWaiters.Wakeup() - session1.mu.Unlock() - - type connectionCounter interface { - ConnectionsCount() int - } - - if counter2, ok := mcu2.(connectionCounter); assert.True(ok) { - assert.Equal(1, counter2.ConnectionsCount()) - } - - subListener := mock.NewListener("subscriber-public") - subInitiator := mock.NewInitiator("DE") - sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) - require.NoError(t, err) - - defer sub.Close(context.Background()) - - if connSub, ok := sub.(sfu.SubscriberWithConnectionUrlAndIP); assert.True(ok) { - url, ip := connSub.GetConnectionURL() - assert.Equal(server1.URL(), url) - assert.Empty(ip) - } - - // The temporary connection has been added - if counter2, ok := mcu2.(connectionCounter); assert.True(ok) { - assert.Equal(2, counter2.ConnectionsCount()) - } - - sub.Close(context.Background()) - - // Wait for temporary connection to be removed. -loop: - for { - select { - case <-ctx.Done(): - assert.NoError(ctx.Err()) - default: - if counter2, ok := mcu2.(connectionCounter); assert.True(ok) { - if counter2.ConnectionsCount() == 1 { - break loop - } - } - } - } -} - -func Test_ProxyConnectToken(t *testing.T) { - t.Parallel() - - embedEtcd := etcdtest.NewServerForTest(t) - - grpcServer1, addr1 := grpctest.NewServerForTest(t) - grpcServer2, addr2 := grpctest.NewServerForTest(t) - - hub1 := &mockGrpcServerHub{} - hub2 := &mockGrpcServerHub{} - grpcServer1.SetHub(hub1) - grpcServer2.SetHub(hub2) - - embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - - server1 := testserver.NewProxyServerForTest(t, "DE") - server2 := testserver.NewProxyServerForTest(t, "DE") - - // Signaling server instances are in a cluster but don't share their proxies, - // i.e. they are only known to their local proxy, not the one of the other - // signaling server - so the connection token must be passed between them. - mcu1, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - }, - }, 1, nil) - hub1.setProxy(t, mcu1) - mcu2, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server2, - }, - }, 2, nil) - hub2.setProxy(t, mcu2) - - ctx, cancel := context.WithTimeout(t.Context(), testTimeout) - defer cancel() - - pubId := api.PublicSessionId("the-publisher") - pubSid := "1234567890" - pubListener := mock.NewListener(pubId + "-public") - pubInitiator := mock.NewInitiator("DE") - - session1 := &ClientSession{ - publicId: pubId, - publishers: make(map[sfu.StreamType]sfu.Publisher), - } - hub1.addSession(session1) - defer hub1.removeSession(session1) - - pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ - MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, - }, pubInitiator) - require.NoError(t, err) - - defer pub.Close(context.Background()) - - session1.mu.Lock() - session1.publishers[sfu.StreamTypeVideo] = pub - session1.publisherWaiters.Wakeup() - session1.mu.Unlock() - - subListener := mock.NewListener("subscriber-public") - subInitiator := mock.NewInitiator("DE") - sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) - require.NoError(t, err) - - defer sub.Close(context.Background()) -} - -func Test_ProxyPublisherToken(t *testing.T) { - t.Parallel() - - embedEtcd := etcdtest.NewServerForTest(t) - - grpcServer1, addr1 := grpctest.NewServerForTest(t) - grpcServer2, addr2 := grpctest.NewServerForTest(t) - - hub1 := &mockGrpcServerHub{} - hub2 := &mockGrpcServerHub{} - grpcServer1.SetHub(hub1) - grpcServer2.SetHub(hub2) - - embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - - server1 := testserver.NewProxyServerForTest(t, "DE") - server2 := testserver.NewProxyServerForTest(t, "US") - - // Signaling server instances are in a cluster but don't share their proxies, - // i.e. they are only known to their local proxy, not the one of the other - // signaling server - so the connection token must be passed between them. - // Also the subscriber is connecting from a different country, so a remote - // stream will be created that needs a valid token from the remote proxy. - mcu1, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server1, - }, - }, 1, nil) - hub1.setProxy(t, mcu1) - mcu2, _ := proxytest.NewMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ - Etcd: embedEtcd, - Servers: []testserver.ProxyTestServer{ - server2, - }, - }, 2, nil) - hub2.setProxy(t, mcu2) - // Support remote subscribers for the tests. - server1.Servers = append(server1.Servers, server2) - server2.Servers = append(server2.Servers, server1) - - ctx, cancel := context.WithTimeout(t.Context(), testTimeout) - defer cancel() - - pubId := api.PublicSessionId("the-publisher") - pubSid := "1234567890" - pubListener := mock.NewListener(pubId + "-public") - pubInitiator := mock.NewInitiator("DE") - - session1 := &ClientSession{ - publicId: pubId, - publishers: make(map[sfu.StreamType]sfu.Publisher), - } - hub1.addSession(session1) - defer hub1.removeSession(session1) - - pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ - MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, - }, pubInitiator) - require.NoError(t, err) - - defer pub.Close(context.Background()) - - session1.mu.Lock() - session1.publishers[sfu.StreamTypeVideo] = pub - session1.publisherWaiters.Wakeup() - session1.mu.Unlock() - - subListener := mock.NewListener("subscriber-public") - subInitiator := mock.NewInitiator("US") - sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) - require.NoError(t, err) - - defer sub.Close(context.Background()) -} diff --git a/sfu/proxy/proxy_test.go b/sfu/proxy/proxy_test.go index 41049fd..47c225f 100644 --- a/sfu/proxy/proxy_test.go +++ b/sfu/proxy/proxy_test.go @@ -25,6 +25,7 @@ import ( "context" "crypto/rand" "crypto/rsa" + "encoding/json" "fmt" "net" "net/url" @@ -32,6 +33,7 @@ import ( "slices" "strconv" "strings" + "sync" "testing" "time" @@ -40,10 +42,12 @@ import ( "github.com/stretchr/testify/require" "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/async" dnstest "github.com/strukturag/nextcloud-spreed-signaling/dns/test" "github.com/strukturag/nextcloud-spreed-signaling/etcd" etcdtest "github.com/strukturag/nextcloud-spreed-signaling/etcd/test" "github.com/strukturag/nextcloud-spreed-signaling/geoip" + "github.com/strukturag/nextcloud-spreed-signaling/grpc" grpctest "github.com/strukturag/nextcloud-spreed-signaling/grpc/test" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" @@ -1238,3 +1242,509 @@ func Test_ProxyResumeFail(t *testing.T) { assert.NotEqual(sessionId, connections[0].SessionId()) } } + +type publisherHub struct { + grpctest.MockHub + + mu sync.Mutex + // +checklocks:mu + publishers map[api.PublicSessionId]*proxyPublisher + waiter async.ChannelWaiters // +checklocksignore: Has its own locking. +} + +func newPublisherHub() *publisherHub { + return &publisherHub{ + publishers: make(map[api.PublicSessionId]*proxyPublisher), + } +} + +func (h *publisherHub) addPublisher(publisher *proxyPublisher) { + h.mu.Lock() + defer h.mu.Unlock() + + h.publishers[publisher.PublisherId()] = publisher + h.waiter.Wakeup() +} + +func (h *publisherHub) GetPublisherIdForSessionId(ctx context.Context, sessionId api.PublicSessionId, streamType sfu.StreamType) (*grpc.GetPublisherIdReply, error) { + h.mu.Lock() + defer h.mu.Unlock() + + pub, found := h.publishers[sessionId] + if !found { + ch := make(chan struct{}, 1) + id := h.waiter.Add(ch) + defer h.waiter.Remove(id) + + for !found { + h.mu.Unlock() + select { + case <-ch: + h.mu.Lock() + pub, found = h.publishers[sessionId] + case <-ctx.Done(): + h.mu.Lock() + return nil, ctx.Err() + } + } + } + + connToken, err := pub.conn.proxy.CreateToken("") + if err != nil { + return nil, err + } + pubToken, err := pub.conn.proxy.CreateToken(string(pub.Id())) + if err != nil { + return nil, err + } + + reply := &grpc.GetPublisherIdReply{ + PublisherId: pub.Id(), + ProxyUrl: pub.conn.rawUrl, + ConnectToken: connToken, + PublisherToken: pubToken, + } + if ip := pub.conn.ip; len(ip) > 0 { + reply.Ip = ip.String() + } + return reply, nil +} + +func Test_ProxyRemotePublisher(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + embedEtcd := etcdtest.NewServerForTest(t) + + grpcServer1, addr1 := grpctest.NewServerForTest(t) + grpcServer2, addr2 := grpctest.NewServerForTest(t) + + hub1 := newPublisherHub() + hub2 := newPublisherHub() + grpcServer1.SetHub(hub1) + grpcServer2.SetHub(hub2) + + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + + server1 := testserver.NewProxyServerForTest(t, "DE") + server2 := testserver.NewProxyServerForTest(t, "DE") + + mcu1, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + server2, + }, + }, 1, nil) + mcu2, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + server2, + }, + }, 2, nil) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + pubId := api.PublicSessionId("the-publisher") + pubSid := "1234567890" + pubListener := mock.NewListener(pubId + "-public") + pubInitiator := mock.NewInitiator("DE") + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ + MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, + }, pubInitiator) + require.NoError(t, err) + + defer pub.Close(context.Background()) + + if proxyPub, ok := pub.(*proxyPublisher); assert.True(ok) { + hub1.addPublisher(proxyPub) + } + + subListener := mock.NewListener("subscriber-public") + subInitiator := mock.NewInitiator("DE") + sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) + require.NoError(t, err) + + defer sub.Close(context.Background()) +} + +func Test_ProxyMultipleRemotePublisher(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + embedEtcd := etcdtest.NewServerForTest(t) + + grpcServer1, addr1 := grpctest.NewServerForTest(t) + grpcServer2, addr2 := grpctest.NewServerForTest(t) + grpcServer3, addr3 := grpctest.NewServerForTest(t) + + hub1 := newPublisherHub() + hub2 := newPublisherHub() + hub3 := newPublisherHub() + grpcServer1.SetHub(hub1) + grpcServer2.SetHub(hub2) + grpcServer3.SetHub(hub3) + + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/three", []byte("{\"address\":\""+addr3+"\"}")) + + server1 := testserver.NewProxyServerForTest(t, "DE") + server2 := testserver.NewProxyServerForTest(t, "US") + server3 := testserver.NewProxyServerForTest(t, "US") + + mcu1, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + server2, + server3, + }, + }, 1, nil) + mcu2, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + server2, + server3, + }, + }, 2, nil) + mcu3, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + server2, + server3, + }, + }, 3, nil) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + pubId := api.PublicSessionId("the-publisher") + pubSid := "1234567890" + pubListener := mock.NewListener(pubId + "-public") + pubInitiator := mock.NewInitiator("DE") + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ + MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, + }, pubInitiator) + require.NoError(t, err) + + defer pub.Close(context.Background()) + + if proxyPub, ok := pub.(*proxyPublisher); assert.True(ok) { + hub1.addPublisher(proxyPub) + } + + sub1Listener := mock.NewListener("subscriber-public-1") + sub1Initiator := mock.NewInitiator("US") + sub1, err := mcu2.NewSubscriber(ctx, sub1Listener, pubId, sfu.StreamTypeVideo, sub1Initiator) + require.NoError(t, err) + + defer sub1.Close(context.Background()) + + sub2Listener := mock.NewListener("subscriber-public-2") + sub2Initiator := mock.NewInitiator("US") + sub2, err := mcu3.NewSubscriber(ctx, sub2Listener, pubId, sfu.StreamTypeVideo, sub2Initiator) + require.NoError(t, err) + + defer sub2.Close(context.Background()) +} + +func Test_ProxyRemotePublisherWait(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + embedEtcd := etcdtest.NewServerForTest(t) + + grpcServer1, addr1 := grpctest.NewServerForTest(t) + grpcServer2, addr2 := grpctest.NewServerForTest(t) + + hub1 := newPublisherHub() + hub2 := newPublisherHub() + grpcServer1.SetHub(hub1) + grpcServer2.SetHub(hub2) + + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + + server1 := testserver.NewProxyServerForTest(t, "DE") + server2 := testserver.NewProxyServerForTest(t, "DE") + + mcu1, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + server2, + }, + }, 1, nil) + mcu2, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + server2, + }, + }, 2, nil) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + pubId := api.PublicSessionId("the-publisher") + pubSid := "1234567890" + pubListener := mock.NewListener(pubId + "-public") + pubInitiator := mock.NewInitiator("DE") + + subListener := mock.NewListener("subscriber-public") + subInitiator := mock.NewInitiator("DE") + + done := make(chan struct{}) + go func() { + defer close(done) + sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) + if !assert.NoError(err) { + return + } + + defer sub.Close(context.Background()) + }() + + // Give subscriber goroutine some time to start + time.Sleep(100 * time.Millisecond) + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ + MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, + }, pubInitiator) + require.NoError(t, err) + + defer pub.Close(context.Background()) + + if proxyPub, ok := pub.(*proxyPublisher); assert.True(ok) { + hub1.addPublisher(proxyPub) + } + + select { + case <-done: + case <-ctx.Done(): + assert.NoError(ctx.Err()) + } +} + +func Test_ProxyRemotePublisherTemporary(t *testing.T) { + t.Parallel() + + require := require.New(t) + assert := assert.New(t) + embedEtcd := etcdtest.NewServerForTest(t) + + server, addr1 := grpctest.NewServerForTest(t) + hub := newPublisherHub() + server.SetHub(hub) + + target := grpc.TargetInformationEtcd{ + Address: addr1, + } + encoded, err := json.Marshal(target) + require.NoError(err) + embedEtcd.SetValue("/grpctargets/server", encoded) + + server1 := testserver.NewProxyServerForTest(t, "DE") + server2 := testserver.NewProxyServerForTest(t, "DE") + + mcu1, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + }, + }, 1, nil) + mcu2, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server2, + }, + }, 2, nil) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + pubId := api.PublicSessionId("the-publisher") + pubSid := "1234567890" + pubListener := mock.NewListener(pubId + "-public") + pubInitiator := mock.NewInitiator("DE") + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ + MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, + }, pubInitiator) + require.NoError(err) + + defer pub.Close(context.Background()) + + if proxyPub, ok := pub.(*proxyPublisher); assert.True(ok) { + hub.addPublisher(proxyPub) + } + + subListener := mock.NewListener("subscriber-public") + subInitiator := mock.NewInitiator("DE") + sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) + require.NoError(err) + + defer sub.Close(context.Background()) + + if connSub, ok := sub.(*proxySubscriber); assert.True(ok) { + assert.Equal(server1.URL(), connSub.conn.rawUrl) + assert.Empty(connSub.conn.ip) + } + + // The temporary connection has been added + assert.Equal(2, mcu2.ConnectionsCount()) + + sub.Close(context.Background()) + + // Wait for temporary connection to be removed. +loop: + for { + select { + case <-ctx.Done(): + assert.NoError(ctx.Err()) + default: + if mcu2.ConnectionsCount() == 1 { + break loop + } + } + } +} + +func Test_ProxyConnectToken(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + embedEtcd := etcdtest.NewServerForTest(t) + + grpcServer1, addr1 := grpctest.NewServerForTest(t) + grpcServer2, addr2 := grpctest.NewServerForTest(t) + + hub1 := newPublisherHub() + hub2 := newPublisherHub() + grpcServer1.SetHub(hub1) + grpcServer2.SetHub(hub2) + + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + + server1 := testserver.NewProxyServerForTest(t, "DE") + server2 := testserver.NewProxyServerForTest(t, "DE") + + // Signaling server instances are in a cluster but don't share their proxies, + // i.e. they are only known to their local proxy, not the one of the other + // signaling server - so the connection token must be passed between them. + mcu1, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + }, + }, 1, nil) + mcu2, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server2, + }, + }, 2, nil) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + pubId := api.PublicSessionId("the-publisher") + pubSid := "1234567890" + pubListener := mock.NewListener(pubId + "-public") + pubInitiator := mock.NewInitiator("DE") + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ + MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, + }, pubInitiator) + require.NoError(t, err) + + defer pub.Close(context.Background()) + + if proxyPub, ok := pub.(*proxyPublisher); assert.True(ok) { + hub1.addPublisher(proxyPub) + } + + subListener := mock.NewListener("subscriber-public") + subInitiator := mock.NewInitiator("DE") + sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) + require.NoError(t, err) + + defer sub.Close(context.Background()) +} + +func Test_ProxyPublisherToken(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + embedEtcd := etcdtest.NewServerForTest(t) + + grpcServer1, addr1 := grpctest.NewServerForTest(t) + grpcServer2, addr2 := grpctest.NewServerForTest(t) + + hub1 := newPublisherHub() + hub2 := newPublisherHub() + grpcServer1.SetHub(hub1) + grpcServer2.SetHub(hub2) + + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + + server1 := testserver.NewProxyServerForTest(t, "DE") + server2 := testserver.NewProxyServerForTest(t, "US") + + // Signaling server instances are in a cluster but don't share their proxies, + // i.e. they are only known to their local proxy, not the one of the other + // signaling server - so the connection token must be passed between them. + // Also the subscriber is connecting from a different country, so a remote + // stream will be created that needs a valid token from the remote proxy. + mcu1, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server1, + }, + }, 1, nil) + mcu2, _ := newMcuProxyForTestWithOptions(t, testserver.ProxyTestOptions{ + Etcd: embedEtcd, + Servers: []testserver.ProxyTestServer{ + server2, + }, + }, 2, nil) + // Support remote subscribers for the tests. + server1.Servers = append(server1.Servers, server2) + server2.Servers = append(server2.Servers, server1) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + pubId := api.PublicSessionId("the-publisher") + pubSid := "1234567890" + pubListener := mock.NewListener(pubId + "-public") + pubInitiator := mock.NewInitiator("DE") + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, sfu.StreamTypeVideo, sfu.NewPublisherSettings{ + MediaTypes: sfu.MediaTypeVideo | sfu.MediaTypeAudio, + }, pubInitiator) + require.NoError(t, err) + + defer pub.Close(context.Background()) + + if proxyPub, ok := pub.(*proxyPublisher); assert.True(ok) { + hub1.addPublisher(proxyPub) + } + + subListener := mock.NewListener("subscriber-public") + subInitiator := mock.NewInitiator("US") + sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, sfu.StreamTypeVideo, subInitiator) + require.NoError(t, err) + + defer sub.Close(context.Background()) +}