Add tests for publishers on different hubs.

This commit is contained in:
Joachim Bauch 2024-04-29 12:13:38 +02:00
parent 70cdc7a1aa
commit 499f41c849
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02

View file

@ -31,6 +31,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"net/url"
"path"
"strings"
"sync"
@ -1394,3 +1395,331 @@ func Test_ProxySubscriberBandwidthOverload(t *testing.T) {
t.Errorf("expected server %s, go %s", serverDE.URL, sub.(*mcuProxySubscriber).conn.rawUrl)
}
}
type mockGrpcServerHub struct {
sessionsLock sync.Mutex
sessionByPublicId map[string]Session
}
func (h *mockGrpcServerHub) addSession(session *ClientSession) {
h.sessionsLock.Lock()
defer h.sessionsLock.Unlock()
if h.sessionByPublicId == nil {
h.sessionByPublicId = make(map[string]Session)
}
h.sessionByPublicId[session.PublicId()] = session
}
func (h *mockGrpcServerHub) removeSession(session *ClientSession) {
h.sessionsLock.Lock()
defer h.sessionsLock.Unlock()
delete(h.sessionByPublicId, session.PublicId())
}
func (h *mockGrpcServerHub) GetSessionByResumeId(resumeId string) Session {
return nil
}
func (h *mockGrpcServerHub) GetSessionByPublicId(sessionId string) Session {
h.sessionsLock.Lock()
defer h.sessionsLock.Unlock()
return h.sessionByPublicId[sessionId]
}
func (h *mockGrpcServerHub) GetSessionIdByRoomSessionId(roomSessionId string) (string, error) {
return "", nil
}
func (h *mockGrpcServerHub) GetBackend(u *url.URL) *Backend {
return nil
}
func Test_ProxyRemotePublisher(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
etcd := NewEtcdForTest(t)
grpcServer1, addr1 := NewGrpcServerForTest(t)
grpcServer2, addr2 := NewGrpcServerForTest(t)
hub1 := &mockGrpcServerHub{}
hub2 := &mockGrpcServerHub{}
grpcServer1.hub = hub1
grpcServer2.hub = hub2
SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}"))
SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}"))
server1 := NewProxyServerForTest(t, "DE")
server2 := NewProxyServerForTest(t, "DE")
mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{
etcd: etcd,
servers: []*TestProxyServerHandler{
server1,
server2,
},
})
mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{
etcd: etcd,
servers: []*TestProxyServerHandler{
server1,
server2,
},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
pubId := "the-publisher"
pubSid := "1234567890"
pubListener := &MockMcuListener{
publicId: pubId + "-public",
}
pubInitiator := &MockMcuInitiator{
country: "DE",
}
session1 := &ClientSession{
publicId: pubId,
publishers: make(map[StreamType]McuPublisher),
}
hub1.addSession(session1)
defer hub1.removeSession(session1)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
if err != nil {
t.Fatal(err)
}
defer pub.Close(context.Background())
session1.mu.Lock()
session1.publishers[StreamTypeVideo] = pub
session1.publisherWaiters.Wakeup()
session1.mu.Unlock()
subListener := &MockMcuListener{
publicId: "subscriber-public",
}
subInitiator := &MockMcuInitiator{
country: "DE",
}
sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator)
if err != nil {
t.Fatal(err)
}
defer sub.Close(context.Background())
}
func Test_ProxyRemotePublisherWait(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
etcd := NewEtcdForTest(t)
grpcServer1, addr1 := NewGrpcServerForTest(t)
grpcServer2, addr2 := NewGrpcServerForTest(t)
hub1 := &mockGrpcServerHub{}
hub2 := &mockGrpcServerHub{}
grpcServer1.hub = hub1
grpcServer2.hub = hub2
SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}"))
SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}"))
server1 := NewProxyServerForTest(t, "DE")
server2 := NewProxyServerForTest(t, "DE")
mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{
etcd: etcd,
servers: []*TestProxyServerHandler{
server1,
server2,
},
})
mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{
etcd: etcd,
servers: []*TestProxyServerHandler{
server1,
server2,
},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
pubId := "the-publisher"
pubSid := "1234567890"
pubListener := &MockMcuListener{
publicId: pubId + "-public",
}
pubInitiator := &MockMcuInitiator{
country: "DE",
}
session1 := &ClientSession{
publicId: pubId,
publishers: make(map[StreamType]McuPublisher),
}
hub1.addSession(session1)
defer hub1.removeSession(session1)
subListener := &MockMcuListener{
publicId: "subscriber-public",
}
subInitiator := &MockMcuInitiator{
country: "DE",
}
done := make(chan struct{})
go func() {
defer close(done)
sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator)
if err != nil {
t.Error(err)
return
}
defer sub.Close(context.Background())
}()
// Give subscriber goroutine some time to start
time.Sleep(100 * time.Millisecond)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
if err != nil {
t.Fatal(err)
}
defer pub.Close(context.Background())
session1.mu.Lock()
session1.publishers[StreamTypeVideo] = pub
session1.publisherWaiters.Wakeup()
session1.mu.Unlock()
select {
case <-done:
case <-ctx.Done():
t.Error(ctx.Err())
}
}
func Test_ProxyRemotePublisherTemporary(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
etcd := NewEtcdForTest(t)
grpcServer1, addr1 := NewGrpcServerForTest(t)
grpcServer2, addr2 := NewGrpcServerForTest(t)
hub1 := &mockGrpcServerHub{}
hub2 := &mockGrpcServerHub{}
grpcServer1.hub = hub1
grpcServer2.hub = hub2
SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}"))
SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}"))
server1 := NewProxyServerForTest(t, "DE")
server2 := NewProxyServerForTest(t, "DE")
mcu1 := newMcuProxyForTestWithOptions(t, proxyTestOptions{
etcd: etcd,
servers: []*TestProxyServerHandler{
server1,
},
})
mcu2 := newMcuProxyForTestWithOptions(t, proxyTestOptions{
etcd: etcd,
servers: []*TestProxyServerHandler{
server2,
},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
pubId := "the-publisher"
pubSid := "1234567890"
pubListener := &MockMcuListener{
publicId: pubId + "-public",
}
pubInitiator := &MockMcuInitiator{
country: "DE",
}
session1 := &ClientSession{
publicId: pubId,
publishers: make(map[StreamType]McuPublisher),
}
hub1.addSession(session1)
defer hub1.removeSession(session1)
pub, err := mcu1.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
if err != nil {
t.Fatal(err)
}
defer pub.Close(context.Background())
session1.mu.Lock()
session1.publishers[StreamTypeVideo] = pub
session1.publisherWaiters.Wakeup()
session1.mu.Unlock()
mcu2.connectionsMu.RLock()
count := len(mcu2.connections)
mcu2.connectionsMu.RUnlock()
if expected := 1; count != expected {
t.Errorf("expected %d connections, got %+v", expected, count)
}
subListener := &MockMcuListener{
publicId: "subscriber-public",
}
subInitiator := &MockMcuInitiator{
country: "DE",
}
sub, err := mcu2.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator)
if err != nil {
t.Fatal(err)
}
defer sub.Close(context.Background())
if sub.(*mcuProxySubscriber).conn.rawUrl != server1.URL {
t.Errorf("expected server %s, go %s", server1.URL, sub.(*mcuProxySubscriber).conn.rawUrl)
}
// The temporary connection has been added
mcu2.connectionsMu.RLock()
count = len(mcu2.connections)
mcu2.connectionsMu.RUnlock()
if expected := 2; count != expected {
t.Errorf("expected %d connections, got %+v", expected, count)
}
sub.Close(context.Background())
// Wait for temporary connection to be removed.
loop:
for {
select {
case <-ctx.Done():
t.Error(ctx.Err())
default:
mcu2.connectionsMu.RLock()
count = len(mcu2.connections)
mcu2.connectionsMu.RUnlock()
if count == 1 {
break loop
}
}
}
}