Support defining maximum bandwidths at diferent levels.

- Individually for each backend.
- For the proxy client (i.e. signaling server using cluster of proxies).
- For the proxy server / MCU.

The smallest bandwidth configured will be used, e.g. if a backend has a
larger limit than the MCU assigned for the stream, the limit of the MCU
server will be used.
This commit is contained in:
Joachim Bauch 2021-01-21 14:39:33 +01:00
parent e8012d99d0
commit 1ceb806c20
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
9 changed files with 102 additions and 19 deletions

View File

@ -174,6 +174,7 @@ type CommandProxyClientMessage struct {
StreamType string `json:"streamType,omitempty"` StreamType string `json:"streamType,omitempty"`
PublisherId string `json:"publisherId,omitempty"` PublisherId string `json:"publisherId,omitempty"`
ClientId string `json:"clientId,omitempty"` ClientId string `json:"clientId,omitempty"`
Bitrate int `json:"bitrate,omitempty"`
} }
func (m *CommandProxyClientMessage) CheckValid() error { func (m *CommandProxyClientMessage) CheckValid() error {

View File

@ -41,6 +41,9 @@ type Backend struct {
secret []byte secret []byte
compat bool compat bool
maxStreamBitrate int
maxScreenBitrate int
sessionLimit uint64 sessionLimit uint64
sessionsLock sync.Mutex sessionsLock sync.Mutex
sessions map[string]bool sessions map[string]bool
@ -269,11 +272,23 @@ func getConfiguredHosts(backendIds string, config *goconf.ConfigFile) (hosts map
log.Printf("Backend %s allows a maximum of %d sessions", id, sessionLimit) log.Printf("Backend %s allows a maximum of %d sessions", id, sessionLimit)
} }
maxStreamBitrate, err := config.GetInt(id, "maxstreambitrate")
if err != nil || maxStreamBitrate < 0 {
maxStreamBitrate = 0
}
maxScreenBitrate, err := config.GetInt(id, "maxscreenbitrate")
if err != nil || maxScreenBitrate < 0 {
maxScreenBitrate = 0
}
hosts[parsed.Host] = append(hosts[parsed.Host], &Backend{ hosts[parsed.Host] = append(hosts[parsed.Host], &Backend{
id: id, id: id,
url: u, url: u,
secret: []byte(secret), secret: []byte(secret),
maxStreamBitrate: maxStreamBitrate,
maxScreenBitrate: maxScreenBitrate,
sessionLimit: uint64(sessionLimit), sessionLimit: uint64(sessionLimit),
}) })
} }

View File

@ -627,8 +627,17 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
if !found { if !found {
client := s.getClientUnlocked() client := s.getClientUnlocked()
s.mu.Unlock() s.mu.Unlock()
var bitrate int
if backend := s.Backend(); backend != nil {
if streamType == streamTypeScreen {
bitrate = backend.maxScreenBitrate
} else {
bitrate = backend.maxStreamBitrate
}
}
var err error var err error
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, client) publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, bitrate, client)
s.mu.Lock() s.mu.Lock()
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -63,7 +63,7 @@ type Mcu interface {
GetStats() interface{} GetStats() interface{}
NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error)
} }

View File

@ -574,10 +574,19 @@ func (c *mcuJanusClient) handleTrickle(event *TrickleMsg) {
type mcuJanusPublisher struct { type mcuJanusPublisher struct {
mcuJanusClient mcuJanusClient
id string id string
bitrate int
} }
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType string) (*JanusHandle, uint64, uint64, error) { func min(a, b int) int {
if a <= b {
return a
} else {
return b
}
}
func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, streamType string, bitrate int) (*JanusHandle, uint64, uint64, error) {
session := m.session session := m.session
if session == nil { if session == nil {
return nil, 0, 0, ErrNotConnected return nil, 0, 0, ErrNotConnected
@ -603,11 +612,18 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
// orientation changes in Firefox. // orientation changes in Firefox.
"videoorient_ext": false, "videoorient_ext": false,
} }
var maxBitrate int
if streamType == streamTypeScreen { if streamType == streamTypeScreen {
create_msg["bitrate"] = m.maxScreenBitrate maxBitrate = m.maxScreenBitrate
} else { } else {
create_msg["bitrate"] = m.maxStreamBitrate maxBitrate = m.maxStreamBitrate
} }
if bitrate <= 0 {
bitrate = maxBitrate
} else {
bitrate = min(bitrate, maxBitrate)
}
create_msg["bitrate"] = bitrate
create_response, err := handle.Request(ctx, create_msg) create_response, err := handle.Request(ctx, create_msg)
if err != nil { if err != nil {
handle.Detach(ctx) handle.Detach(ctx)
@ -641,12 +657,12 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
return handle, response.Session, roomId, nil return handle, response.Session, roomId, nil
} }
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) { func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) {
if _, found := streamTypeUserIds[streamType]; !found { if _, found := streamTypeUserIds[streamType]; !found {
return nil, fmt.Errorf("Unsupported stream type %s", streamType) return nil, fmt.Errorf("Unsupported stream type %s", streamType)
} }
handle, session, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType) handle, session, roomId, err := m.getOrCreatePublisherHandle(ctx, id, streamType, bitrate)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -666,7 +682,8 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
closeChan: make(chan bool, 1), closeChan: make(chan bool, 1),
deferred: make(chan func(), 64), deferred: make(chan func(), 64),
}, },
id: id, id: id,
bitrate: bitrate,
} }
client.mcuJanusClient.handleEvent = client.handleEvent client.mcuJanusClient.handleEvent = client.handleEvent
client.mcuJanusClient.handleHangup = client.handleHangup client.mcuJanusClient.handleHangup = client.handleHangup
@ -734,7 +751,7 @@ func (p *mcuJanusPublisher) publishNats(messageType string) error {
func (p *mcuJanusPublisher) NotifyReconnected() { func (p *mcuJanusPublisher) NotifyReconnected() {
ctx := context.TODO() ctx := context.TODO()
handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType) handle, session, roomId, err := p.mcu.getOrCreatePublisherHandle(ctx, p.id, p.streamType, p.bitrate)
if err != nil { if err != nil {
log.Printf("Could not reconnect publisher %s: %s\n", p.id, err) log.Printf("Could not reconnect publisher %s: %s\n", p.id, err)
// TODO(jojo): Retry // TODO(jojo): Retry

View File

@ -903,12 +903,13 @@ func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyC
} }
} }
func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) { func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int) (McuPublisher, error) {
msg := &ProxyClientMessage{ msg := &ProxyClientMessage{
Type: "command", Type: "command",
Command: &CommandProxyClientMessage{ Command: &CommandProxyClientMessage{
Type: "create-publisher", Type: "create-publisher",
StreamType: streamType, StreamType: streamType,
Bitrate: bitrate,
}, },
} }
@ -980,6 +981,9 @@ type mcuProxy struct {
connectionsMu sync.RWMutex connectionsMu sync.RWMutex
proxyTimeout time.Duration proxyTimeout time.Duration
maxStreamBitrate int
maxScreenBitrate int
mu sync.RWMutex mu sync.RWMutex
publishers map[string]*mcuProxyConnection publishers map[string]*mcuProxyConnection
@ -1014,6 +1018,15 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) {
proxyTimeout := time.Duration(proxyTimeoutSeconds) * time.Second proxyTimeout := time.Duration(proxyTimeoutSeconds) * time.Second
log.Printf("Using a timeout of %s for proxy requests", proxyTimeout) log.Printf("Using a timeout of %s for proxy requests", proxyTimeout)
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
mcu := &mcuProxy{ mcu := &mcuProxy{
tokenId: tokenId, tokenId: tokenId,
tokenKey: tokenKey, tokenKey: tokenKey,
@ -1025,6 +1038,9 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) {
connectionsMap: make(map[string]*mcuProxyConnection), connectionsMap: make(map[string]*mcuProxyConnection),
proxyTimeout: proxyTimeout, proxyTimeout: proxyTimeout,
maxStreamBitrate: maxStreamBitrate,
maxScreenBitrate: maxScreenBitrate,
publishers: make(map[string]*mcuProxyConnection), publishers: make(map[string]*mcuProxyConnection),
publisherWaiters: make(map[uint64]chan bool), publisherWaiters: make(map[uint64]chan bool),
@ -1083,6 +1099,9 @@ func (m *mcuProxy) Start() error {
m.connectionsMu.RLock() m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock() defer m.connectionsMu.RUnlock()
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate)
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate)
for _, c := range m.connections { for _, c := range m.connections {
if err := c.start(); err != nil { if err := c.start(); err != nil {
return err return err
@ -1570,7 +1589,7 @@ func (m *mcuProxy) removeWaiter(id uint64) {
delete(m.publisherWaiters, id) delete(m.publisherWaiters, id)
} }
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) { func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) {
connections := m.getSortedConnections(initiator) connections := m.getSortedConnections(initiator)
for _, conn := range connections { for _, conn := range connections {
if conn.IsShutdownScheduled() { if conn.IsShutdownScheduled() {
@ -1579,7 +1598,19 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout) subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout)
defer cancel() defer cancel()
publisher, err := conn.newPublisher(subctx, listener, id, streamType)
var maxBitrate int
if streamType == streamTypeScreen {
maxBitrate = m.maxScreenBitrate
} else {
maxBitrate = m.maxStreamBitrate
}
if bitrate <= 0 {
bitrate = maxBitrate
} else {
bitrate = min(bitrate, maxBitrate)
}
publisher, err := conn.newPublisher(subctx, listener, id, streamType, bitrate)
if err != nil { if err != nil {
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn.url, err) log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn.url, err)
continue continue

View File

@ -55,7 +55,7 @@ func (m *TestMCU) GetStats() interface{} {
return nil return nil
} }
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) { func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, bitrate int, initiator McuInitiator) (McuPublisher, error) {
return nil, fmt.Errorf("Not implemented") return nil, fmt.Errorf("Not implemented")
} }

View File

@ -630,7 +630,7 @@ func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, s
} }
id := uuid.New().String() id := uuid.New().String()
publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType, &emptyInitiator{}) publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType, cmd.Bitrate, &emptyInitiator{})
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId()) log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId())
session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher)) session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher))

View File

@ -86,6 +86,14 @@ connectionsperhost = 8
# Omit or set to 0 to not limit the number of sessions. # Omit or set to 0 to not limit the number of sessions.
#sessionlimit = 10 #sessionlimit = 10
# The maximum bitrate per publishing stream (in bits per second).
# Defaults to the maximum bitrate configured for the proxy / MCU.
#maxstreambitrate = 1048576
# The maximum bitrate per screensharing stream (in bits per second).
# Defaults to the maximum bitrate configured for the proxy / MCU.
#maxscreenbitrate = 2097152
#[another-backend] #[another-backend]
# URL of the Nextcloud instance # URL of the Nextcloud instance
#url = https://cloud.otherdomain.invalid #url = https://cloud.otherdomain.invalid
@ -110,14 +118,16 @@ connectionsperhost = 8
# For type "proxy": a space-separated list of proxy URLs to connect to. # For type "proxy": a space-separated list of proxy URLs to connect to.
#url = #url =
# For type "janus": the maximum bitrate per publishing stream (in bits per # The maximum bitrate per publishing stream (in bits per second).
# second).
# Defaults to 1 mbit/sec. # Defaults to 1 mbit/sec.
# For type "proxy": will be capped to the maximum bitrate configured at the
# proxy server that is used.
#maxstreambitrate = 1048576 #maxstreambitrate = 1048576
# For type "janus": the maximum bitrate per screensharing stream (in bits per # The maximum bitrate per screensharing stream (in bits per second).
# second).
# Default is 2 mbit/sec. # Default is 2 mbit/sec.
# For type "proxy": will be capped to the maximum bitrate configured at the
# proxy server that is used.
#maxscreenbitrate = 2097152 #maxscreenbitrate = 2097152
# For type "proxy": timeout in seconds for requests to the proxy server. # For type "proxy": timeout in seconds for requests to the proxy server.