From 1ceb806c201b2a9b5eaf93fc1deb61a9af912dd9 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 21 Jan 2021 14:39:33 +0100 Subject: [PATCH] Support defining maximum bandwidths at diferent levels. - Individually for each backend. - For the proxy client (i.e. signaling server using cluster of proxies). - For the proxy server / MCU. The smallest bandwidth configured will be used, e.g. if a backend has a larger limit than the MCU assigned for the stream, the limit of the MCU server will be used. --- api_proxy.go | 1 + backend_configuration.go | 15 +++++++++++++++ clientsession.go | 11 ++++++++++- mcu_common.go | 2 +- mcu_janus.go | 33 +++++++++++++++++++++++++-------- mcu_proxy.go | 37 ++++++++++++++++++++++++++++++++++--- mcu_test.go | 2 +- proxy/proxy_server.go | 2 +- server.conf.in | 18 ++++++++++++++---- 9 files changed, 102 insertions(+), 19 deletions(-) diff --git a/api_proxy.go b/api_proxy.go index 8ba5b60..6b03c18 100644 --- a/api_proxy.go +++ b/api_proxy.go @@ -174,6 +174,7 @@ type CommandProxyClientMessage struct { StreamType string `json:"streamType,omitempty"` PublisherId string `json:"publisherId,omitempty"` ClientId string `json:"clientId,omitempty"` + Bitrate int `json:"bitrate,omitempty"` } func (m *CommandProxyClientMessage) CheckValid() error { diff --git a/backend_configuration.go b/backend_configuration.go index 8ff1536..93be997 100644 --- a/backend_configuration.go +++ b/backend_configuration.go @@ -41,6 +41,9 @@ type Backend struct { secret []byte compat bool + maxStreamBitrate int + maxScreenBitrate int + sessionLimit uint64 sessionsLock sync.Mutex sessions map[string]bool @@ -269,11 +272,23 @@ func getConfiguredHosts(backendIds string, config *goconf.ConfigFile) (hosts map log.Printf("Backend %s allows a maximum of %d sessions", id, sessionLimit) } + maxStreamBitrate, err := config.GetInt(id, "maxstreambitrate") + if err != nil || maxStreamBitrate < 0 { + maxStreamBitrate = 0 + } + maxScreenBitrate, err := config.GetInt(id, "maxscreenbitrate") + if err != nil || maxScreenBitrate < 0 { + maxScreenBitrate = 0 + } + hosts[parsed.Host] = append(hosts[parsed.Host], &Backend{ id: id, url: u, secret: []byte(secret), + maxStreamBitrate: maxStreamBitrate, + maxScreenBitrate: maxScreenBitrate, + sessionLimit: uint64(sessionLimit), }) } diff --git a/clientsession.go b/clientsession.go index 7792ea9..2962234 100644 --- a/clientsession.go +++ b/clientsession.go @@ -627,8 +627,17 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea if !found { client := s.getClientUnlocked() s.mu.Unlock() + + var bitrate int + if backend := s.Backend(); backend != nil { + if streamType == streamTypeScreen { + bitrate = backend.maxScreenBitrate + } else { + bitrate = backend.maxStreamBitrate + } + } var err error - publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, client) + publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, bitrate, client) s.mu.Lock() if err != nil { return nil, err diff --git a/mcu_common.go b/mcu_common.go index f30b851..d4425da 100644 --- a/mcu_common.go +++ b/mcu_common.go @@ -63,7 +63,7 @@ type Mcu interface { GetStats() interface{} - NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) + NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) } diff --git a/mcu_janus.go b/mcu_janus.go index 7f44f15..dd92b7e 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -574,10 +574,19 @@ func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) { type mcuJanusPublisher struct { mcuJanusClient - id string + id string + bitrate int } -func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType string) (*JanusHandle, uint64, uint64, error) { +func min(a, b int) int { + if a <= b { + return a + } else { + return b + } +} + +func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType string, bitrate int) (*JanusHandle, uint64, uint64, error) { session := m.session if session == nil { return nil, 0, 0, ErrNotConnected @@ -603,11 +612,18 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st // orientation changes in Firefox. "videoorient_ext": false, } + var maxBitrate int if streamType == streamTypeScreen { - create_msg["bitrate"] = m.maxScreenBitrate + maxBitrate = m.maxScreenBitrate } else { - create_msg["bitrate"] = m.maxStreamBitrate + maxBitrate = m.maxStreamBitrate } + if bitrate <= 0 { + bitrate = maxBitrate + } else { + bitrate = min(bitrate, maxBitrate) + } + create_msg["bitrate"] = bitrate create_response, err := handle.Request(ctx, create_msg) if err != nil { handle.Detach(ctx) @@ -641,12 +657,12 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st return handle, response.Session, roomId, nil } -func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) { +func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) { if _, found := streamTypeUserIds[streamType]; !found { return nil, fmt.Errorf("Unsupported stream type %s", streamType) } - handle, session, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType) + handle, session, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate) if err != nil { return nil, err } @@ -666,7 +682,8 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st closeChan: make(chan bool, 1), deferred: make(chan func(), 64), }, - id: id, + id: id, + bitrate: bitrate, } client.mcuJanusClient.handleEvent = client.handleEvent client.mcuJanusClient.handleHangup = client.handleHangup @@ -734,7 +751,7 @@ func (p *mcuJanusPublisher) publishNats(messageType string) error { func (p *mcuJanusPublisher) NotifyReconnected() { ctx := context.TODO() - handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType) + handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate) if err != nil { log.Printf("Could not reconnect publisher %s: %s\n", p.id, err) // TODO(jojo): Retry diff --git a/mcu_proxy.go b/mcu_proxy.go index 1c4ca91..95559ac 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -903,12 +903,13 @@ func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyC } } -func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) { +func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int) (McuPublisher, error) { msg := &ProxyClientMessage{ Type: "command", Command: &CommandProxyClientMessage{ Type: "create-publisher", StreamType: streamType, + Bitrate: bitrate, }, } @@ -980,6 +981,9 @@ type mcuProxy struct { connectionsMu sync.RWMutex proxyTimeout time.Duration + maxStreamBitrate int + maxScreenBitrate int + mu sync.RWMutex publishers map[string]*mcuProxyConnection @@ -1014,6 +1018,15 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) { proxyTimeout := time.Duration(proxyTimeoutSeconds) * time.Second log.Printf("Using a timeout of %s for proxy requests", proxyTimeout) + maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate") + if maxStreamBitrate <= 0 { + maxStreamBitrate = defaultMaxStreamBitrate + } + maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate") + if maxScreenBitrate <= 0 { + maxScreenBitrate = defaultMaxScreenBitrate + } + mcu := &mcuProxy{ tokenId: tokenId, tokenKey: tokenKey, @@ -1025,6 +1038,9 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) { connectionsMap: make(map[string]*mcuProxyConnection), proxyTimeout: proxyTimeout, + maxStreamBitrate: maxStreamBitrate, + maxScreenBitrate: maxScreenBitrate, + publishers: make(map[string]*mcuProxyConnection), publisherWaiters: make(map[uint64]chan bool), @@ -1083,6 +1099,9 @@ func (m *mcuProxy) Start() error { m.connectionsMu.RLock() defer m.connectionsMu.RUnlock() + log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate) + log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate) + for _, c := range m.connections { if err := c.start(); err != nil { return err @@ -1570,7 +1589,7 @@ func (m *mcuProxy) removeWaiter(id uint64) { delete(m.publisherWaiters, id) } -func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) { +func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) { connections := m.getSortedConnections(initiator) for _, conn := range connections { if conn.IsShutdownScheduled() { @@ -1579,7 +1598,19 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout) defer cancel() - publisher, err := conn.newPublisher(subctx, listener, id, streamType) + + var maxBitrate int + if streamType == streamTypeScreen { + maxBitrate = m.maxScreenBitrate + } else { + maxBitrate = m.maxStreamBitrate + } + if bitrate <= 0 { + bitrate = maxBitrate + } else { + bitrate = min(bitrate, maxBitrate) + } + publisher, err := conn.newPublisher(subctx, listener, id, streamType, bitrate) if err != nil { log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn.url, err) continue diff --git a/mcu_test.go b/mcu_test.go index 102fd3a..6e59a81 100644 --- a/mcu_test.go +++ b/mcu_test.go @@ -55,7 +55,7 @@ func (m *TestMCU) GetStats() interface{} { return nil } -func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) { +func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) { return nil, fmt.Errorf("Not implemented") } diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 4bd521b..5dca9ff 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -630,7 +630,7 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s } id := uuid.New().String() - publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType, &emptyInitiator{}) + publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType, cmd.Bitrate, &emptyInitiator{}) if err == context.DeadlineExceeded { log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId()) session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher)) diff --git a/server.conf.in b/server.conf.in index ef9c9f2..8a00834 100644 --- a/server.conf.in +++ b/server.conf.in @@ -86,6 +86,14 @@ connectionsperhost = 8 # Omit or set to 0 to not limit the number of sessions. #sessionlimit = 10 +# The maximum bitrate per publishing stream (in bits per second). +# Defaults to the maximum bitrate configured for the proxy / MCU. +#maxstreambitrate = 1048576 + +# The maximum bitrate per screensharing stream (in bits per second). +# Defaults to the maximum bitrate configured for the proxy / MCU. +#maxscreenbitrate = 2097152 + #[another-backend] # URL of the Nextcloud instance #url = https://cloud.otherdomain.invalid @@ -110,14 +118,16 @@ connectionsperhost = 8 # For type "proxy": a space-separated list of proxy URLs to connect to. #url = -# For type "janus": the maximum bitrate per publishing stream (in bits per -# second). +# The maximum bitrate per publishing stream (in bits per second). # Defaults to 1 mbit/sec. +# For type "proxy": will be capped to the maximum bitrate configured at the +# proxy server that is used. #maxstreambitrate = 1048576 -# For type "janus": the maximum bitrate per screensharing stream (in bits per -# second). +# The maximum bitrate per screensharing stream (in bits per second). # Default is 2 mbit/sec. +# For type "proxy": will be capped to the maximum bitrate configured at the +# proxy server that is used. #maxscreenbitrate = 2097152 # For type "proxy": timeout in seconds for requests to the proxy server.