Support reloading maximum stream / screen bandwidths.

This commit is contained in:
Joachim Bauch 2024-05-28 12:33:02 +02:00
parent 15edeca814
commit c7cccc9287
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
3 changed files with 49 additions and 21 deletions

View file

@ -137,8 +137,8 @@ type mcuJanus struct {
url string
mu sync.Mutex
maxStreamBitrate int
maxScreenBitrate int
maxStreamBitrate atomic.Int32
maxScreenBitrate atomic.Int32
mcuTimeout time.Duration
gw *JanusGateway
@ -185,18 +185,18 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc
mcuTimeout := time.Duration(mcuTimeoutSeconds) * time.Second
mcu := &mcuJanus{
url: url,
maxStreamBitrate: maxStreamBitrate,
maxScreenBitrate: maxScreenBitrate,
mcuTimeout: mcuTimeout,
closeChan: make(chan struct{}, 1),
clients: make(map[clientInterface]bool),
url: url,
mcuTimeout: mcuTimeout,
closeChan: make(chan struct{}, 1),
clients: make(map[clientInterface]bool),
publishers: make(map[string]*mcuJanusPublisher),
remotePublishers: make(map[string]*mcuJanusRemotePublisher),
reconnectInterval: initialReconnectInterval,
}
mcu.maxStreamBitrate.Store(int32(maxStreamBitrate))
mcu.maxScreenBitrate.Store(int32(maxScreenBitrate))
mcu.onConnected.Store(emptyOnConnected)
mcu.onDisconnected.Store(emptyOnDisconnected)
@ -323,8 +323,8 @@ func (m *mcuJanus) Start(ctx context.Context) error {
} else {
log.Println("Full-Trickle is enabled")
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate)
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate)
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
if m.session, err = m.gw.Create(ctx); err != nil {
m.disconnect()
@ -378,6 +378,19 @@ func (m *mcuJanus) Stop() {
}
func (m *mcuJanus) Reload(config *goconf.ConfigFile) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
m.maxStreamBitrate.Store(int32(maxStreamBitrate))
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
m.maxScreenBitrate.Store(int32(maxScreenBitrate))
}
func (m *mcuJanus) SetOnConnected(f func()) {
@ -473,9 +486,9 @@ func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle,
}
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = m.maxScreenBitrate
maxBitrate = int(m.maxScreenBitrate.Load())
} else {
maxBitrate = m.maxStreamBitrate
maxBitrate = int(m.maxStreamBitrate.Load())
}
if bitrate <= 0 {
bitrate = maxBitrate

View file

@ -1255,8 +1255,8 @@ type mcuProxy struct {
connRequests atomic.Int64
nextSort atomic.Int64
maxStreamBitrate int
maxScreenBitrate int
maxStreamBitrate atomic.Int32
maxScreenBitrate atomic.Int32
mu sync.RWMutex
publishers map[string]*mcuProxyConnection
@ -1319,14 +1319,14 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *
connectionsMap: make(map[string][]*mcuProxyConnection),
proxyTimeout: proxyTimeout,
maxStreamBitrate: maxStreamBitrate,
maxScreenBitrate: maxScreenBitrate,
publishers: make(map[string]*mcuProxyConnection),
rpcClients: rpcClients,
}
mcu.maxStreamBitrate.Store(int32(maxStreamBitrate))
mcu.maxScreenBitrate.Store(int32(maxScreenBitrate))
if err := mcu.loadContinentsMap(config); err != nil {
return nil, err
}
@ -1396,8 +1396,8 @@ func (m *mcuProxy) loadContinentsMap(config *goconf.ConfigFile) error {
}
func (m *mcuProxy) Start(ctx context.Context) error {
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate)
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate)
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
return m.config.Start()
}
@ -1556,6 +1556,20 @@ func (m *mcuProxy) KeepConnection(url string, ips ...net.IP) {
}
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
m.maxStreamBitrate.Store(int32(maxStreamBitrate))
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
m.maxScreenBitrate.Store(int32(maxScreenBitrate))
if err := m.loadContinentsMap(config); err != nil {
log.Printf("Error loading continents map: %s", err)
}
@ -1751,9 +1765,9 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) {
func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher {
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = m.maxScreenBitrate
maxBitrate = int(m.maxScreenBitrate.Load())
} else {
maxBitrate = m.maxStreamBitrate
maxBitrate = int(m.maxStreamBitrate.Load())
}
if bitrate <= 0 {
bitrate = maxBitrate

View file

@ -575,6 +575,7 @@ func (s *ProxyServer) Reload(config *goconf.ConfigFile) {
}
s.tokens.Reload(config)
s.mcu.Reload(config)
}
func (s *ProxyServer) setCommonHeaders(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {