From 499f41c8490c72c758414ed9f5252d205c2b5c03 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 29 Apr 2024 12:13:38 +0200 Subject: [PATCH] Add tests for publishers on different hubs. --- mcu_proxy_test.go | 329 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 329 insertions(+) diff --git a/mcu_proxy_test.go b/mcu_proxy_test.go index 2e88f6c..b6fe38d 100644 --- a/mcu_proxy_test.go +++ b/mcu_proxy_test.go @@ -31,6 +31,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "path" "strings" "sync" @@ -1394,3 +1395,331 @@ func Test_ProxySubscriberBandwidthOverload(t *testing.T) { t.Errorf("expected server %s, go %s", serverDE.URL, sub.(*mcuProxySubscriber).conn.rawUrl) } } + +type mockGrpcServerHub struct { + sessionsLock sync.Mutex + sessionByPublicId map[string]Session +} + +func (h *mockGrpcServerHub) addSession(session *ClientSession) { + h.sessionsLock.Lock() + defer h.sessionsLock.Unlock() + if h.sessionByPublicId == nil { + h.sessionByPublicId = make(map[string]Session) + } + h.sessionByPublicId[session.PublicId()] = session +} + +func (h *mockGrpcServerHub) removeSession(session *ClientSession) { + h.sessionsLock.Lock() + defer h.sessionsLock.Unlock() + delete(h.sessionByPublicId, session.PublicId()) +} + +func (h *mockGrpcServerHub) GetSessionByResumeId(resumeId string) Session { + return nil +} + +func (h *mockGrpcServerHub) GetSessionByPublicId(sessionId string) Session { + h.sessionsLock.Lock() + defer h.sessionsLock.Unlock() + return h.sessionByPublicId[sessionId] +} + +func (h *mockGrpcServerHub) GetSessionIdByRoomSessionId(roomSessionId string) (string, error) { + return "", nil +} + +func (h *mockGrpcServerHub) GetBackend(u *url.URL) *Backend { + return nil +} + +func Test_ProxyRemotePublisher(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + + etcd := NewEtcdForTest(t) + + grpcServer1, addr1 := NewGrpcServerForTest(t) + grpcServer2, addr2 := NewGrpcServerForTest(t) + + hub1 := &mockGrpcServerHub{} + hub2 := &mockGrpcServerHub{} + grpcServer1.hub = hub1 + grpcServer2.hub = hub2 + + SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + + server1 := NewProxyServerForTest(t, "DE") + server2 := NewProxyServerForTest(t, "DE") + + mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ + etcd: etcd, + servers: []*TestProxyServerHandler{ + server1, + server2, + }, + }) + mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ + etcd: etcd, + servers: []*TestProxyServerHandler{ + server1, + server2, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pubId := "the-publisher" + pubSid := "1234567890" + pubListener := &MockMcuListener{ + publicId: pubId + "-public", + } + pubInitiator := &MockMcuInitiator{ + country: "DE", + } + + session1 := &ClientSession{ + publicId: pubId, + publishers: make(map[StreamType]McuPublisher), + } + hub1.addSession(session1) + defer hub1.removeSession(session1) + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) + if err != nil { + t.Fatal(err) + } + + defer pub.Close(context.Background()) + + session1.mu.Lock() + session1.publishers[StreamTypeVideo] = pub + session1.publisherWaiters.Wakeup() + session1.mu.Unlock() + + subListener := &MockMcuListener{ + publicId: "subscriber-public", + } + subInitiator := &MockMcuInitiator{ + country: "DE", + } + sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) + if err != nil { + t.Fatal(err) + } + + defer sub.Close(context.Background()) +} + +func Test_ProxyRemotePublisherWait(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + + etcd := NewEtcdForTest(t) + + grpcServer1, addr1 := NewGrpcServerForTest(t) + grpcServer2, addr2 := NewGrpcServerForTest(t) + + hub1 := &mockGrpcServerHub{} + hub2 := &mockGrpcServerHub{} + grpcServer1.hub = hub1 + grpcServer2.hub = hub2 + + SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + + server1 := NewProxyServerForTest(t, "DE") + server2 := NewProxyServerForTest(t, "DE") + + mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ + etcd: etcd, + servers: []*TestProxyServerHandler{ + server1, + server2, + }, + }) + mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ + etcd: etcd, + servers: []*TestProxyServerHandler{ + server1, + server2, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pubId := "the-publisher" + pubSid := "1234567890" + pubListener := &MockMcuListener{ + publicId: pubId + "-public", + } + pubInitiator := &MockMcuInitiator{ + country: "DE", + } + + session1 := &ClientSession{ + publicId: pubId, + publishers: make(map[StreamType]McuPublisher), + } + hub1.addSession(session1) + defer hub1.removeSession(session1) + + subListener := &MockMcuListener{ + publicId: "subscriber-public", + } + subInitiator := &MockMcuInitiator{ + country: "DE", + } + + done := make(chan struct{}) + go func() { + defer close(done) + sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) + if err != nil { + t.Error(err) + return + } + + defer sub.Close(context.Background()) + }() + + // Give subscriber goroutine some time to start + time.Sleep(100 * time.Millisecond) + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) + if err != nil { + t.Fatal(err) + } + + defer pub.Close(context.Background()) + + session1.mu.Lock() + session1.publishers[StreamTypeVideo] = pub + session1.publisherWaiters.Wakeup() + session1.mu.Unlock() + + select { + case <-done: + case <-ctx.Done(): + t.Error(ctx.Err()) + } +} + +func Test_ProxyRemotePublisherTemporary(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + + etcd := NewEtcdForTest(t) + + grpcServer1, addr1 := NewGrpcServerForTest(t) + grpcServer2, addr2 := NewGrpcServerForTest(t) + + hub1 := &mockGrpcServerHub{} + hub2 := &mockGrpcServerHub{} + grpcServer1.hub = hub1 + grpcServer2.hub = hub2 + + SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + + server1 := NewProxyServerForTest(t, "DE") + server2 := NewProxyServerForTest(t, "DE") + + mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ + etcd: etcd, + servers: []*TestProxyServerHandler{ + server1, + }, + }) + mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{ + etcd: etcd, + servers: []*TestProxyServerHandler{ + server2, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pubId := "the-publisher" + pubSid := "1234567890" + pubListener := &MockMcuListener{ + publicId: pubId + "-public", + } + pubInitiator := &MockMcuInitiator{ + country: "DE", + } + + session1 := &ClientSession{ + publicId: pubId, + publishers: make(map[StreamType]McuPublisher), + } + hub1.addSession(session1) + defer hub1.removeSession(session1) + + pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) + if err != nil { + t.Fatal(err) + } + + defer pub.Close(context.Background()) + + session1.mu.Lock() + session1.publishers[StreamTypeVideo] = pub + session1.publisherWaiters.Wakeup() + session1.mu.Unlock() + + mcu2.connectionsMu.RLock() + count := len(mcu2.connections) + mcu2.connectionsMu.RUnlock() + if expected := 1; count != expected { + t.Errorf("expected %d connections, got %+v", expected, count) + } + + subListener := &MockMcuListener{ + publicId: "subscriber-public", + } + subInitiator := &MockMcuInitiator{ + country: "DE", + } + sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) + if err != nil { + t.Fatal(err) + } + + defer sub.Close(context.Background()) + + if sub.(*mcuProxySubscriber).conn.rawUrl != server1.URL { + t.Errorf("expected server %s, go %s", server1.URL, sub.(*mcuProxySubscriber).conn.rawUrl) + } + + // The temporary connection has been added + mcu2.connectionsMu.RLock() + count = len(mcu2.connections) + mcu2.connectionsMu.RUnlock() + if expected := 2; count != expected { + t.Errorf("expected %d connections, got %+v", expected, count) + } + + sub.Close(context.Background()) + + // Wait for temporary connection to be removed. +loop: + for { + select { + case <-ctx.Done(): + t.Error(ctx.Err()) + default: + mcu2.connectionsMu.RLock() + count = len(mcu2.connections) + mcu2.connectionsMu.RUnlock() + if count == 1 { + break loop + } + } + } +}