From ffd8a30f61ccd8656f493e2444fcf4db39556b10 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 10 Jan 2022 14:30:35 +0100 Subject: [PATCH] Clients can provide the maximum publishing bandwidth in offer requests. This will still be capped to any backend / proxy / Janus limits. --- api_signaling.go | 1 + clientsession.go | 12 ++- clientsession_test.go | 178 ++++++++++++++++++++++++++++++++++++++++++ mcu_test.go | 18 +++++ 4 files changed, 206 insertions(+), 3 deletions(-) diff --git a/api_signaling.go b/api_signaling.go index e101ba9..a20eed3 100644 --- a/api_signaling.go +++ b/api_signaling.go @@ -411,6 +411,7 @@ type MessageClientMessageData struct { Type string `json:"type"` Sid string `json:"sid"` RoomType string `json:"roomType"` + Bitrate int `json:"bitrate,omitempty"` Payload map[string]interface{} `json:"payload"` } diff --git a/clientsession.go b/clientsession.go index acc92bb..0e24176 100644 --- a/clientsession.go +++ b/clientsession.go @@ -820,12 +820,18 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea client := s.getClientUnlocked() s.mu.Unlock() - var bitrate int + bitrate := data.Bitrate if backend := s.Backend(); backend != nil { + var maxBitrate int if streamType == streamTypeScreen { - bitrate = backend.maxScreenBitrate + maxBitrate = backend.maxScreenBitrate } else { - bitrate = backend.maxStreamBitrate + maxBitrate = backend.maxStreamBitrate + } + if bitrate <= 0 { + bitrate = maxBitrate + } else if maxBitrate > 0 && bitrate > maxBitrate { + bitrate = maxBitrate } } var err error diff --git a/clientsession_test.go b/clientsession_test.go index 1dadd3d..f0604ef 100644 --- a/clientsession_test.go +++ b/clientsession_test.go @@ -22,6 +22,8 @@ package signaling import ( + "context" + "net/url" "strconv" "testing" ) @@ -122,3 +124,179 @@ func Test_permissionsEqual(t *testing.T) { }) } } + +func TestBandwidth_Client(t *testing.T) { + hub, _, _, server, shutdown := CreateHubForTest(t) + defer shutdown() + + mcu, err := NewTestMCU() + if err != nil { + t.Fatal(err) + } else if err := mcu.Start(); err != nil { + t.Fatal(err) + } + defer mcu.Stop() + + hub.SetMcu(mcu) + + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + + if err := client.SendHello(testDefaultUserId); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + hello, err := client.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + // Join room by id. + roomId := "test-room" + if room, err := client.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // We will receive a "joined" event. + if err := client.RunUntilJoined(ctx, hello.Hello); err != nil { + t.Error(err) + } + + // Client may not send an offer with audio and video. + bitrate := 10000 + if err := client.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello.Hello.SessionId, + }, MessageClientMessageData{ + Type: "offer", + Sid: "54321", + RoomType: "video", + Bitrate: bitrate, + Payload: map[string]interface{}{ + "sdp": MockSdpOfferAudioAndVideo, + }, + }); err != nil { + t.Fatal(err) + } + + if err := client.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo); err != nil { + t.Fatal(err) + } + + pub := mcu.GetPublisher(hello.Hello.SessionId) + if pub == nil { + t.Fatal("Could not find publisher") + } + + if pub.bitrate != bitrate { + t.Errorf("Expected bitrate %d, got %d", bitrate, pub.bitrate) + } +} + +func TestBandwidth_Backend(t *testing.T) { + hub, _, _, server, shutdown := CreateHubWithMultipleBackendsForTest(t) + defer shutdown() + + u, err := url.Parse(server.URL + "/one") + if err != nil { + t.Fatal(err) + } + backend := hub.backend.GetBackend(u) + if backend == nil { + t.Fatal("Could not get backend") + } + + backend.maxScreenBitrate = 1000 + backend.maxStreamBitrate = 2000 + + mcu, err := NewTestMCU() + if err != nil { + t.Fatal(err) + } else if err := mcu.Start(); err != nil { + t.Fatal(err) + } + defer mcu.Stop() + + hub.SetMcu(mcu) + + streamTypes := []string{ + streamTypeVideo, + streamTypeScreen, + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + for _, streamType := range streamTypes { + t.Run(streamType, func(t *testing.T) { + client := NewTestClient(t, server, hub) + defer client.CloseWithBye() + + params := TestBackendClientAuthParams{ + UserId: testDefaultUserId, + } + if err := client.SendHelloParams(server.URL+"/one", "client", params); err != nil { + t.Fatal(err) + } + + hello, err := client.RunUntilHello(ctx) + if err != nil { + t.Fatal(err) + } + + // Join room by id. + roomId := "test-room" + if room, err := client.JoinRoom(ctx, roomId); err != nil { + t.Fatal(err) + } else if room.Room.RoomId != roomId { + t.Fatalf("Expected room %s, got %s", roomId, room.Room.RoomId) + } + + // We will receive a "joined" event. + if err := client.RunUntilJoined(ctx, hello.Hello); err != nil { + t.Error(err) + } + + // Client may not send an offer with audio and video. + bitrate := 10000 + if err := client.SendMessage(MessageClientMessageRecipient{ + Type: "session", + SessionId: hello.Hello.SessionId, + }, MessageClientMessageData{ + Type: "offer", + Sid: "54321", + RoomType: streamType, + Bitrate: bitrate, + Payload: map[string]interface{}{ + "sdp": MockSdpOfferAudioAndVideo, + }, + }); err != nil { + t.Fatal(err) + } + + if err := client.RunUntilAnswer(ctx, MockSdpAnswerAudioAndVideo); err != nil { + t.Fatal(err) + } + + pub := mcu.GetPublisher(hello.Hello.SessionId) + if pub == nil { + t.Fatal("Could not find publisher") + } + + var expectBitrate int + if streamType == streamTypeVideo { + expectBitrate = backend.maxStreamBitrate + } else { + expectBitrate = backend.maxScreenBitrate + } + if pub.bitrate != expectBitrate { + t.Errorf("Expected bitrate %d, got %d", expectBitrate, pub.bitrate) + } + }) + } +} diff --git a/mcu_test.go b/mcu_test.go index a08c9d0..605f0a9 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -31,6 +31,11 @@ import ( "github.com/dlintw/goconf" ) +const ( + TestMaxBitrateScreen = 12345678 + TestMaxBitrateVideo = 23456789 +) + type TestMCU struct { mu sync.Mutex publishers map[string]*TestMCUPublisher @@ -63,6 +68,17 @@ func (m *TestMCU) GetStats() interface{} { } func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { + var maxBitrate int + if streamType == streamTypeScreen { + maxBitrate = TestMaxBitrateScreen + } else { + maxBitrate = TestMaxBitrateVideo + } + if bitrate <= 0 { + bitrate = maxBitrate + } else if bitrate > maxBitrate { + bitrate = maxBitrate + } pub := &TestMCUPublisher{ TestMCUClient: TestMCUClient{ id: id, @@ -70,6 +86,7 @@ func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id str }, mediaTypes: mediaTypes, + bitrate: bitrate, } m.mu.Lock() @@ -129,6 +146,7 @@ type TestMCUPublisher struct { TestMCUClient mediaTypes MediaType + bitrate int } func (p *TestMCUPublisher) HasMedia(mt MediaType) bool {