Calculate proxy load based on maximum bandwidth.

Take maximum bandwidth of connected clients into account when calculating
load as screensharing requires more than regular audio/video.
This commit is contained in:
Joachim Bauch 2024-02-27 14:23:52 +01:00
parent 7d09c71ab9
commit 8f4fc2db6d
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
6 changed files with 51 additions and 20 deletions

View file

@ -215,6 +215,8 @@ func (m *CommandProxyClientMessage) CheckValid() error {
type CommandProxyServerMessage struct {
Id string `json:"id,omitempty"`
Sid string `json:"sid,omitempty"`
Bitrate int `json:"bitrate,omitempty"`
}
// Type "payload"

View file

@ -104,6 +104,7 @@ type McuClient interface {
Id() string
Sid() string
StreamType() StreamType
MaxBitrate() int
Close(ctx context.Context)

View file

@ -438,6 +438,7 @@ type mcuJanusClient struct {
roomId uint64
sid string
streamType StreamType
maxBitrate int
handle *JanusHandle
handleId uint64
@ -464,6 +465,10 @@ func (c *mcuJanusClient) StreamType() StreamType {
return c.streamType
}
func (c *mcuJanusClient) MaxBitrate() int {
return c.maxBitrate
}
func (c *mcuJanusClient) Close(ctx context.Context) {
}
@ -715,14 +720,14 @@ func min(a, b int) int {
return b
}
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType StreamType, bitrate int) (*JanusHandle, uint64, uint64, error) {
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType StreamType, bitrate int) (*JanusHandle, uint64, uint64, int, error) {
session := m.session
if session == nil {
return nil, 0, 0, ErrNotConnected
return nil, 0, 0, 0, ErrNotConnected
}
handle, err := session.Attach(ctx, pluginVideoRoom)
if err != nil {
return nil, 0, 0, err
return nil, 0, 0, 0, err
}
log.Printf("Attached %s as publisher %d to plugin %s in session %d", streamType, handle.Id, pluginVideoRoom, session.Id)
@ -752,7 +757,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
}
return nil, 0, 0, err
return nil, 0, 0, 0, err
}
roomId := getPluginIntValue(create_response.PluginData, pluginVideoRoom, "room")
@ -760,7 +765,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
if _, err := handle.Detach(ctx); err != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err)
}
return nil, 0, 0, fmt.Errorf("No room id received: %+v", create_response)
return nil, 0, 0, 0, fmt.Errorf("No room id received: %+v", create_response)
}
log.Println("Created room", roomId, create_response.PluginData)
@ -777,10 +782,10 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
if _, err2 := handle.Detach(ctx); err2 != nil {
log.Printf("Error detaching handle %d: %s", handle.Id, err2)
}
return nil, 0, 0, err
return nil, 0, 0, 0, err
}
return handle, response.Session, roomId, nil
return handle, response.Session, roomId, bitrate, nil
}
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
@ -788,7 +793,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
return nil, fmt.Errorf("Unsupported stream type %s", streamType)
}
handle, session, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate)
handle, session, roomId, maxBitrate, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate)
if err != nil {
return nil, err
}
@ -803,6 +808,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
roomId: roomId,
sid: sid,
streamType: streamType,
maxBitrate: maxBitrate,
handle: handle,
handleId: handle.Id,
@ -892,7 +898,7 @@ func (p *mcuJanusPublisher) SetMedia(mt MediaType) {
func (p *mcuJanusPublisher) NotifyReconnected() {
ctx := context.TODO()
handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
handle, session, roomId, _, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
if err != nil {
log.Printf("Could not reconnect publisher %s: %s", p.id, err)
// TODO(jojo): Retry
@ -1043,6 +1049,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
roomId: pub.roomId,
sid: strconv.FormatUint(handle.Id, 10),
streamType: streamType,
maxBitrate: pub.MaxBitrate(),
handle: handle,
handleId: handle.Id,

View file

@ -77,6 +77,7 @@ type McuProxy interface {
type mcuProxyPubSubCommon struct {
sid string
streamType StreamType
maxBitrate int
proxyId string
conn *mcuProxyConnection
listener McuListener
@ -94,6 +95,10 @@ func (c *mcuProxyPubSubCommon) StreamType() StreamType {
return c.streamType
}
func (c *mcuProxyPubSubCommon) MaxBitrate() int {
return c.maxBitrate
}
func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClientMessage, callback func(error, map[string]interface{})) {
c.conn.performAsyncRequest(ctx, msg, func(err error, response *ProxyServerMessage) {
if err != nil {
@ -132,11 +137,12 @@ type mcuProxyPublisher struct {
mediaTypes MediaType
}
func newMcuProxyPublisher(id string, sid string, streamType StreamType, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitrate int, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
return &mcuProxyPublisher{
mcuProxyPubSubCommon: mcuProxyPubSubCommon{
sid: sid,
streamType: streamType,
maxBitrate: maxBitrate,
proxyId: proxyId,
conn: conn,
listener: listener,
@ -217,11 +223,12 @@ type mcuProxySubscriber struct {
publisherId string
}
func newMcuProxySubscriber(publisherId string, sid string, streamType StreamType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber {
func newMcuProxySubscriber(publisherId string, sid string, streamType StreamType, maxBitrate int, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber {
return &mcuProxySubscriber{
mcuProxyPubSubCommon: mcuProxyPubSubCommon{
sid: sid,
streamType: streamType,
maxBitrate: maxBitrate,
proxyId: proxyId,
conn: conn,
listener: listener,
@ -1054,7 +1061,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe
proxyId := response.Command.Id
log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id)
publisher := newMcuProxyPublisher(id, sid, streamType, mediaTypes, proxyId, c, listener)
publisher := newMcuProxyPublisher(id, sid, streamType, response.Command.Bitrate, mediaTypes, proxyId, c, listener)
c.publishersLock.Lock()
c.publishers[proxyId] = publisher
c.publisherIds[getStreamId(id, streamType)] = proxyId
@ -1084,7 +1091,7 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList
proxyId := response.Command.Id
log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisherSessionId)
subscriber := newMcuProxySubscriber(publisherSessionId, response.Command.Sid, streamType, proxyId, c, listener)
subscriber := newMcuProxySubscriber(publisherSessionId, response.Command.Sid, streamType, response.Command.Bitrate, proxyId, c, listener)
c.subscribersLock.Lock()
c.subscribers[proxyId] = subscriber
c.subscribersLock.Unlock()

View file

@ -158,6 +158,10 @@ func (c *TestMCUClient) StreamType() StreamType {
return c.streamType
}
func (c *TestMCUClient) MaxBitrate() int {
return 0
}
func (c *TestMCUClient) Close(ctx context.Context) {
if c.closed.CompareAndSwap(false, true) {
log.Printf("Close MCU client %s", c.id)

View file

@ -305,9 +305,7 @@ loop:
}
func (s *ProxyServer) updateLoad() {
// TODO: Take maximum bandwidth of clients into account when calculating
// load (screensharing requires more than regular audio/video).
load := s.GetClientCount()
load := s.GetClientsLoad()
if load == s.load.Load() {
return
}
@ -390,7 +388,7 @@ func (s *ProxyServer) ScheduleShutdown() {
session.sendMessage(msg)
})
if s.GetClientCount() == 0 {
if !s.HasClients() {
go close(s.shutdownChannel)
}
}
@ -653,7 +651,8 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
Id: message.Id,
Type: "command",
Command: &signaling.CommandProxyServerMessage{
Id: id,
Id: id,
Bitrate: int(publisher.MaxBitrate()),
},
}
session.sendMessage(response)
@ -978,10 +977,21 @@ func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) bool {
return true
}
func (s *ProxyServer) GetClientCount() int64 {
func (s *ProxyServer) HasClients() bool {
s.clientsLock.RLock()
defer s.clientsLock.RUnlock()
return int64(len(s.clients))
return len(s.clients) > 0
}
func (s *ProxyServer) GetClientsLoad() int64 {
s.clientsLock.RLock()
defer s.clientsLock.RUnlock()
var load int64
for _, c := range s.clients {
load += int64(c.MaxBitrate())
}
return load / 1024
}
func (s *ProxyServer) GetClient(id string) signaling.McuClient {