From 5fc709434bba3ce4e3f1ea10ec3d5c471f7ca97c Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 2 Feb 2026 11:42:57 +0100 Subject: [PATCH] Add tests for SFU-related session events. --- server/clientsession_test.go | 175 +++++++++++++++++++++++++++++++++++ sfu/test/sfu.go | 12 +++ 2 files changed, 187 insertions(+) diff --git a/server/clientsession_test.go b/server/clientsession_test.go index 5466618..5c2def2 100644 --- a/server/clientsession_test.go +++ b/server/clientsession_test.go @@ -742,3 +742,178 @@ func TestPermissionHideDisplayNames(t *testing.T) { t.Run("without-hide-displaynames", testFunc(false)) // nolint:paralleltest t.Run("with-hide-displaynames", testFunc(true)) // nolint:paralleltest } + +func Test_ClientSessionPublisherEvents(t *testing.T) { + t.Parallel() + require := require.New(t) + assert := assert.New(t) + hub, _, _, server := CreateHubForTest(t) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + mcu := test.NewSFU(t) + require.NoError(mcu.Start(ctx)) + defer mcu.Stop() + + hub.SetMcu(mcu) + + client, hello := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId) + defer client.CloseWithBye() + + roomId := "test-room" + roomMsg := MustSucceed2(t, client.JoinRoom, ctx, roomId) + require.Equal(roomId, roomMsg.Room.RoomId) + + client.RunUntilJoined(ctx, hello.Hello) + + room := hub.getRoom(roomId) + require.NotNil(room) + + session := hub.GetSessionByPublicId(hello.Hello.SessionId).(*ClientSession) + require.NotNil(session, "Session %s does not exist", hello.Hello.SessionId) + + require.NoError(client.SendMessage(api.MessageClientMessageRecipient{ + Type: "session", + SessionId: hello.Hello.SessionId, + }, api.MessageClientMessageData{ + Type: "offer", + Sid: "54321", + RoomType: string(sfu.StreamTypeVideo), + Payload: api.StringMap{ + "sdp": mock.MockSdpOfferAudioAndVideo, + }, + })) + + require.True(client.RunUntilAnswer(ctx, mock.MockSdpAnswerAudioAndVideo)) + + pub := mcu.GetPublisher(hello.Hello.SessionId) + require.NotNil(pub) + + assert.Equal(pub, session.GetPublisher(sfu.StreamTypeVideo)) + session.OnIceCandidate(pub, "test-candidate") + + if message, ok := client.RunUntilMessage(ctx); ok { + assert.Equal("message", message.Type) + if msg := message.Message; assert.NotNil(msg) { + if sender := msg.Sender; assert.NotNil(sender) { + assert.Equal("session", sender.Type) + assert.Equal(hello.Hello.SessionId, sender.SessionId) + } + var ao api.AnswerOfferMessage + if assert.NoError(json.Unmarshal(msg.Data, &ao)) { + assert.Equal(hello.Hello.SessionId, ao.From) + assert.Equal(hello.Hello.SessionId, ao.To) + assert.Equal("candidate", ao.Type) + assert.EqualValues(sfu.StreamTypeVideo, ao.RoomType) + assert.Equal("test-candidate", ao.Payload["candidate"]) + } + } + } + + // No-op + session.OnIceCompleted(pub) + + session.PublisherClosed(pub) + assert.Nil(session.GetPublisher(sfu.StreamTypeVideo)) +} + +func Test_ClientSessionSubscriberEvents(t *testing.T) { + t.Parallel() + require := require.New(t) + assert := assert.New(t) + hub, _, _, server := CreateHubForTest(t) + hub.allowSubscribeAnyStream = true + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + mcu := test.NewSFU(t) + require.NoError(mcu.Start(ctx)) + defer mcu.Stop() + + hub.SetMcu(mcu) + + client1, hello1 := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId+"1") + defer client1.CloseWithBye() + client2, hello2 := NewTestClientWithHello(ctx, t, server, hub, testDefaultUserId+"2") + defer client2.CloseWithBye() + + roomId := "test-room" + roomMsg := MustSucceed2(t, client1.JoinRoom, ctx, roomId) + require.Equal(roomId, roomMsg.Room.RoomId) + roomMsg = MustSucceed2(t, client2.JoinRoom, ctx, roomId) + require.Equal(roomId, roomMsg.Room.RoomId) + + WaitForUsersJoined(ctx, t, client1, hello1, client2, hello2) + + room := hub.getRoom(roomId) + require.NotNil(room) + + session1 := hub.GetSessionByPublicId(hello1.Hello.SessionId).(*ClientSession) + require.NotNil(session1, "Session %s does not exist", hello1.Hello.SessionId) + session2 := hub.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession) + require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId) + + require.NoError(client1.SendMessage(api.MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, api.MessageClientMessageData{ + Type: "offer", + Sid: "54321", + RoomType: string(sfu.StreamTypeVideo), + Payload: api.StringMap{ + "sdp": mock.MockSdpOfferAudioAndVideo, + }, + })) + + require.True(client1.RunUntilAnswer(ctx, mock.MockSdpAnswerAudioAndVideo)) + + require.NoError(client2.SendMessage(api.MessageClientMessageRecipient{ + Type: "session", + SessionId: hello1.Hello.SessionId, + }, api.MessageClientMessageData{ + Type: "requestoffer", + Sid: "54321", + RoomType: string(sfu.StreamTypeVideo), + })) + + require.True(client2.RunUntilOffer(ctx, mock.MockSdpOfferAudioAndVideo)) + + sub := mcu.GetSubscriber(hello1.Hello.SessionId, sfu.StreamTypeVideo) + require.NotNil(sub) + + assert.Equal(sub, session2.GetSubscriber(hello1.Hello.SessionId, sfu.StreamTypeVideo)) + session2.OnIceCandidate(sub, "test-candidate") + + if message, ok := client2.RunUntilMessage(ctx); ok { + assert.Equal("message", message.Type) + if msg := message.Message; assert.NotNil(msg) { + if sender := msg.Sender; assert.NotNil(sender) { + assert.Equal("session", sender.Type) + assert.Equal(hello1.Hello.SessionId, sender.SessionId) + } + var ao api.AnswerOfferMessage + if assert.NoError(json.Unmarshal(msg.Data, &ao)) { + assert.Equal(hello1.Hello.SessionId, ao.From) + assert.Equal(hello2.Hello.SessionId, ao.To) + assert.Equal("candidate", ao.Type) + assert.EqualValues(sfu.StreamTypeVideo, ao.RoomType) + assert.Equal("test-candidate", ao.Payload["candidate"]) + } + } + } + + // No-op + session2.OnIceCompleted(sub) + + session2.OnUpdateOffer(sub, api.StringMap{ + "type": "offer", + "sdp": mock.MockSdpOfferAudioOnly, + }) + + require.True(client2.RunUntilOffer(ctx, mock.MockSdpOfferAudioOnly)) + + session2.SubscriberClosed(sub) + assert.Nil(session2.GetSubscriber(hello1.Hello.SessionId, sfu.StreamTypeVideo)) +} diff --git a/sfu/test/sfu.go b/sfu/test/sfu.go index 7f77726..dabd659 100644 --- a/sfu/test/sfu.go +++ b/sfu/test/sfu.go @@ -32,6 +32,7 @@ import ( "testing" "github.com/dlintw/goconf" + "github.com/stretchr/testify/assert" "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/internal" @@ -145,6 +146,14 @@ func (m *SFU) GetPublisher(id api.PublicSessionId) *SFUPublisher { return m.publishers[id] } +func (m *SFU) GetSubscriber(id api.PublicSessionId, streamType sfu.StreamType) *SFUSubscriber { + m.mu.Lock() + defer m.mu.Unlock() + + key := fmt.Sprintf("%s|%s", id, streamType) + return m.subscribers[key] +} + func (m *SFU) NewSubscriber(ctx context.Context, listener sfu.Listener, publisher api.PublicSessionId, streamType sfu.StreamType, initiator sfu.Initiator) (sfu.Subscriber, error) { m.mu.Lock() defer m.mu.Unlock() @@ -164,6 +173,9 @@ func (m *SFU) NewSubscriber(ctx context.Context, listener sfu.Listener, publishe publisher: pub, } + key := fmt.Sprintf("%s|%s", publisher, streamType) + assert.Empty(m.t, m.subscribers[key], "duplicate subscriber") + m.subscribers[key] = sub return sub, nil }