From c7cccc92876c2dedf435c48ed2f4e18f74149371 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 28 May 2024 12:33:02 +0200 Subject: [PATCH] Support reloading maximum stream / screen bandwidths. --- mcu_janus.go | 37 +++++++++++++++++++++++++------------ mcu_proxy.go | 32 +++++++++++++++++++++++--------- proxy/proxy_server.go | 1 + 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/mcu_janus.go b/mcu_janus.go index 0f70328..5807daf 100644 --- a/mcu_janus.go +++ b/mcu_janus.go @@ -137,8 +137,8 @@ type mcuJanus struct { url string mu sync.Mutex - maxStreamBitrate int - maxScreenBitrate int + maxStreamBitrate atomic.Int32 + maxScreenBitrate atomic.Int32 mcuTimeout time.Duration gw *JanusGateway @@ -185,18 +185,18 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc mcuTimeout := time.Duration(mcuTimeoutSeconds) * time.Second mcu := &mcuJanus{ - url: url, - maxStreamBitrate: maxStreamBitrate, - maxScreenBitrate: maxScreenBitrate, - mcuTimeout: mcuTimeout, - closeChan: make(chan struct{}, 1), - clients: make(map[clientInterface]bool), + url: url, + mcuTimeout: mcuTimeout, + closeChan: make(chan struct{}, 1), + clients: make(map[clientInterface]bool), publishers: make(map[string]*mcuJanusPublisher), remotePublishers: make(map[string]*mcuJanusRemotePublisher), reconnectInterval: initialReconnectInterval, } + mcu.maxStreamBitrate.Store(int32(maxStreamBitrate)) + mcu.maxScreenBitrate.Store(int32(maxScreenBitrate)) mcu.onConnected.Store(emptyOnConnected) mcu.onDisconnected.Store(emptyOnDisconnected) @@ -323,8 +323,8 @@ func (m *mcuJanus) Start(ctx context.Context) error { } else { log.Println("Full-Trickle is enabled") } - log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate) - log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate) + log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load()) + log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load()) if m.session, err = m.gw.Create(ctx); err != nil { m.disconnect() @@ -378,6 +378,19 @@ func (m *mcuJanus) Stop() { } func (m *mcuJanus) Reload(config *goconf.ConfigFile) { + maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate") + if maxStreamBitrate <= 0 { + maxStreamBitrate = defaultMaxStreamBitrate + } + log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load()) + m.maxStreamBitrate.Store(int32(maxStreamBitrate)) + + maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate") + if maxScreenBitrate <= 0 { + maxScreenBitrate = defaultMaxScreenBitrate + } + log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load()) + m.maxScreenBitrate.Store(int32(maxScreenBitrate)) } func (m *mcuJanus) SetOnConnected(f func()) { @@ -473,9 +486,9 @@ func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle, } var maxBitrate int if streamType == StreamTypeScreen { - maxBitrate = m.maxScreenBitrate + maxBitrate = int(m.maxScreenBitrate.Load()) } else { - maxBitrate = m.maxStreamBitrate + maxBitrate = int(m.maxStreamBitrate.Load()) } if bitrate <= 0 { bitrate = maxBitrate diff --git a/mcu_proxy.go b/mcu_proxy.go index 5b34426..427d675 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -1255,8 +1255,8 @@ type mcuProxy struct { connRequests atomic.Int64 nextSort atomic.Int64 - maxStreamBitrate int - maxScreenBitrate int + maxStreamBitrate atomic.Int32 + maxScreenBitrate atomic.Int32 mu sync.RWMutex publishers map[string]*mcuProxyConnection @@ -1319,14 +1319,14 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients * connectionsMap: make(map[string][]*mcuProxyConnection), proxyTimeout: proxyTimeout, - maxStreamBitrate: maxStreamBitrate, - maxScreenBitrate: maxScreenBitrate, - publishers: make(map[string]*mcuProxyConnection), rpcClients: rpcClients, } + mcu.maxStreamBitrate.Store(int32(maxStreamBitrate)) + mcu.maxScreenBitrate.Store(int32(maxScreenBitrate)) + if err := mcu.loadContinentsMap(config); err != nil { return nil, err } @@ -1396,8 +1396,8 @@ func (m *mcuProxy) loadContinentsMap(config *goconf.ConfigFile) error { } func (m *mcuProxy) Start(ctx context.Context) error { - log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate) - log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate) + log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load()) + log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load()) return m.config.Start() } @@ -1556,6 +1556,20 @@ func (m *mcuProxy) KeepConnection(url string, ips ...net.IP) { } func (m *mcuProxy) Reload(config *goconf.ConfigFile) { + maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate") + if maxStreamBitrate <= 0 { + maxStreamBitrate = defaultMaxStreamBitrate + } + log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load()) + m.maxStreamBitrate.Store(int32(maxStreamBitrate)) + + maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate") + if maxScreenBitrate <= 0 { + maxScreenBitrate = defaultMaxScreenBitrate + } + log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load()) + m.maxScreenBitrate.Store(int32(maxScreenBitrate)) + if err := m.loadContinentsMap(config); err != nil { log.Printf("Error loading continents map: %s", err) } @@ -1751,9 +1765,9 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) { func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher { var maxBitrate int if streamType == StreamTypeScreen { - maxBitrate = m.maxScreenBitrate + maxBitrate = int(m.maxScreenBitrate.Load()) } else { - maxBitrate = m.maxStreamBitrate + maxBitrate = int(m.maxStreamBitrate.Load()) } if bitrate <= 0 { bitrate = maxBitrate diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index b679536..6b7ed52 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -575,6 +575,7 @@ func (s *ProxyServer) Reload(config *goconf.ConfigFile) { } s.tokens.Reload(config) + s.mcu.Reload(config) } func (s *ProxyServer) setCommonHeaders(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {