Use object to store MCU settings.

This commit is contained in:
Joachim Bauch 2024-10-31 14:46:02 +01:00
commit 46712ba589
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
6 changed files with 153 additions and 88 deletions

View file

@ -24,6 +24,9 @@ package signaling
import (
"context"
"fmt"
"log"
"sync/atomic"
"time"
"github.com/dlintw/goconf"
)
@ -33,6 +36,9 @@ const (
McuTypeProxy = "proxy"
McuTypeDefault = McuTypeJanus
defaultMaxStreamBitrate = 1024 * 1024
defaultMaxScreenBitrate = 2048 * 1024
)
var (
@ -65,6 +71,54 @@ type McuInitiator interface {
Country() string
}
type McuSettings interface {
MaxStreamBitrate() int32
MaxScreenBitrate() int32
Timeout() time.Duration
Reload(config *goconf.ConfigFile)
}
type mcuCommonSettings struct {
maxStreamBitrate atomic.Int32
maxScreenBitrate atomic.Int32
timeout atomic.Int64
}
func (s *mcuCommonSettings) MaxStreamBitrate() int32 {
return s.maxStreamBitrate.Load()
}
func (s *mcuCommonSettings) MaxScreenBitrate() int32 {
return s.maxScreenBitrate.Load()
}
func (s *mcuCommonSettings) Timeout() time.Duration {
return time.Duration(s.timeout.Load())
}
func (s *mcuCommonSettings) setTimeout(timeout time.Duration) {
s.timeout.Store(int64(timeout))
}
func (s *mcuCommonSettings) load(config *goconf.ConfigFile) error {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", maxStreamBitrate)
s.maxStreamBitrate.Store(int32(maxStreamBitrate))
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", maxScreenBitrate)
s.maxScreenBitrate.Store(int32(maxScreenBitrate))
return nil
}
type Mcu interface {
Start(ctx context.Context) error
Stop()

View file

@ -46,9 +46,6 @@ const (
initialReconnectInterval = 1 * time.Second
maxReconnectInterval = 32 * time.Second
defaultMaxStreamBitrate = 1024 * 1024
defaultMaxScreenBitrate = 2048 * 1024
)
var (
@ -133,13 +130,45 @@ type clientInterface interface {
NotifyReconnected()
}
type mcuJanusSettings struct {
mcuCommonSettings
}
func newMcuJanusSettings(config *goconf.ConfigFile) (McuSettings, error) {
settings := &mcuJanusSettings{}
if err := settings.load(config); err != nil {
return nil, err
}
return settings, nil
}
func (s *mcuJanusSettings) load(config *goconf.ConfigFile) error {
if err := s.mcuCommonSettings.load(config); err != nil {
return err
}
mcuTimeoutSeconds, _ := config.GetInt("mcu", "timeout")
if mcuTimeoutSeconds <= 0 {
mcuTimeoutSeconds = defaultMcuTimeoutSeconds
}
mcuTimeout := time.Duration(mcuTimeoutSeconds) * time.Second
log.Printf("Using a timeout of %s for MCU requests", mcuTimeout)
s.setTimeout(mcuTimeout)
return nil
}
func (s *mcuJanusSettings) Reload(config *goconf.ConfigFile) {
if err := s.load(config); err != nil {
log.Printf("Error reloading MCU settings: %s", err)
}
}
type mcuJanus struct {
url string
mu sync.Mutex
maxStreamBitrate atomic.Int32
maxScreenBitrate atomic.Int32
mcuTimeout time.Duration
settings McuSettings
gw *JanusGateway
session *JanusSession
@ -170,33 +199,22 @@ func emptyOnConnected() {}
func emptyOnDisconnected() {}
func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mcu, error) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
settings, err := newMcuJanusSettings(config)
if err != nil {
return nil, err
}
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
mcuTimeoutSeconds, _ := config.GetInt("mcu", "timeout")
if mcuTimeoutSeconds <= 0 {
mcuTimeoutSeconds = defaultMcuTimeoutSeconds
}
mcuTimeout := time.Duration(mcuTimeoutSeconds) * time.Second
mcu := &mcuJanus{
url: url,
mcuTimeout: mcuTimeout,
closeChan: make(chan struct{}, 1),
clients: make(map[clientInterface]bool),
url: url,
settings: settings,
closeChan: make(chan struct{}, 1),
clients: make(map[clientInterface]bool),
publishers: make(map[string]*mcuJanusPublisher),
remotePublishers: make(map[string]*mcuJanusRemotePublisher),
reconnectInterval: initialReconnectInterval,
}
mcu.maxStreamBitrate.Store(int32(maxStreamBitrate))
mcu.maxScreenBitrate.Store(int32(maxScreenBitrate))
mcu.onConnected.Store(emptyOnConnected)
mcu.onDisconnected.Store(emptyOnDisconnected)
@ -323,8 +341,6 @@ func (m *mcuJanus) Start(ctx context.Context) error {
} else {
log.Println("Full-Trickle is enabled")
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
if m.session, err = m.gw.Create(ctx); err != nil {
m.disconnect()
@ -378,19 +394,7 @@ func (m *mcuJanus) Stop() {
}
func (m *mcuJanus) Reload(config *goconf.ConfigFile) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
m.maxStreamBitrate.Store(int32(maxStreamBitrate))
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
m.maxScreenBitrate.Store(int32(maxScreenBitrate))
m.settings.Reload(config)
}
func (m *mcuJanus) SetOnConnected(f func()) {
@ -486,9 +490,9 @@ func (m *mcuJanus) createPublisherRoom(ctx context.Context, handle *JanusHandle,
}
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = int(m.maxScreenBitrate.Load())
maxBitrate = int(m.settings.MaxScreenBitrate())
} else {
maxBitrate = int(m.maxStreamBitrate.Load())
maxBitrate = int(m.settings.MaxStreamBitrate())
}
if bitrate <= 0 {
bitrate = maxBitrate

View file

@ -184,7 +184,7 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
// TODO Tear down previous publisher and get a new one if sid does
// not match?
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
p.sendOffer(msgctx, jsep_msg, func(err error, jsep map[string]interface{}) {
@ -221,7 +221,7 @@ func (p *mcuJanusPublisher) SendMessage(ctx context.Context, message *MessageCli
}
case "candidate":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
if data.Sid == "" || data.Sid == p.Sid() {

View file

@ -88,7 +88,7 @@ func (p *mcuJanusRemoteSubscriber) handleMedia(event *janus.MediaMsg) {
}
func (p *mcuJanusRemoteSubscriber) NotifyReconnected() {
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
if err != nil {

View file

@ -92,7 +92,7 @@ func (p *mcuJanusSubscriber) handleMedia(event *janus.MediaMsg) {
}
func (p *mcuJanusSubscriber) NotifyReconnected() {
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
ctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
handle, pub, err := p.mcu.getOrCreateSubscriberHandle(ctx, p.publisher, p.streamType)
if err != nil {
@ -256,7 +256,7 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
fallthrough
case "sendoffer":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
stream, err := parseStreamSelection(jsep_msg)
@ -273,7 +273,7 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
}
case "answer":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
if data.Sid == "" || data.Sid == p.Sid() {
@ -284,7 +284,7 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
}
case "candidate":
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
if data.Sid == "" || data.Sid == p.Sid() {
@ -309,7 +309,7 @@ func (p *mcuJanusSubscriber) SendMessage(ctx context.Context, message *MessageCl
}
p.deferred <- func() {
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.mcuTimeout)
msgctx, cancel := context.WithTimeout(context.Background(), p.mcu.settings.Timeout())
defer cancel()
p.selectStream(msgctx, stream, callback)

View file

@ -1242,6 +1242,40 @@ func (c *mcuProxyConnection) newRemoteSubscriber(ctx context.Context, listener M
return subscriber, nil
}
type mcuProxySettings struct {
mcuCommonSettings
}
func newMcuProxySettings(config *goconf.ConfigFile) (McuSettings, error) {
settings := &mcuProxySettings{}
if err := settings.load(config); err != nil {
return nil, err
}
return settings, nil
}
func (s *mcuProxySettings) load(config *goconf.ConfigFile) error {
if err := s.mcuCommonSettings.load(config); err != nil {
return err
}
proxyTimeoutSeconds, _ := config.GetInt("mcu", "proxytimeout")
if proxyTimeoutSeconds <= 0 {
proxyTimeoutSeconds = defaultProxyTimeoutSeconds
}
proxyTimeout := time.Duration(proxyTimeoutSeconds) * time.Second
log.Printf("Using a timeout of %s for proxy requests", proxyTimeout)
s.setTimeout(proxyTimeout)
return nil
}
func (s *mcuProxySettings) Reload(config *goconf.ConfigFile) {
if err := s.load(config); err != nil {
log.Printf("Error reloading proxy settings: %s", err)
}
}
type mcuProxy struct {
urlType string
tokenId string
@ -1252,12 +1286,10 @@ type mcuProxy struct {
connections []*mcuProxyConnection
connectionsMap map[string][]*mcuProxyConnection
connectionsMu sync.RWMutex
proxyTimeout time.Duration
connRequests atomic.Int64
nextSort atomic.Int64
maxStreamBitrate atomic.Int32
maxScreenBitrate atomic.Int32
settings McuSettings
mu sync.RWMutex
publishers map[string]*mcuProxyConnection
@ -1292,20 +1324,9 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *
return nil, fmt.Errorf("Could not parse private key from %s: %s", tokenKeyFilename, err)
}
proxyTimeoutSeconds, _ := config.GetInt("mcu", "proxytimeout")
if proxyTimeoutSeconds <= 0 {
proxyTimeoutSeconds = defaultProxyTimeoutSeconds
}
proxyTimeout := time.Duration(proxyTimeoutSeconds) * time.Second
log.Printf("Using a timeout of %s for proxy requests", proxyTimeout)
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
settings, err := newMcuProxySettings((config))
if err != nil {
return nil, err
}
mcu := &mcuProxy{
@ -1315,19 +1336,16 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *
dialer: &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: proxyTimeout,
HandshakeTimeout: settings.Timeout(),
},
connectionsMap: make(map[string][]*mcuProxyConnection),
proxyTimeout: proxyTimeout,
settings: settings,
publishers: make(map[string]*mcuProxyConnection),
rpcClients: rpcClients,
}
mcu.maxStreamBitrate.Store(int32(maxStreamBitrate))
mcu.maxScreenBitrate.Store(int32(maxScreenBitrate))
if err := mcu.loadContinentsMap(config); err != nil {
return nil, err
}
@ -1397,9 +1415,6 @@ func (m *mcuProxy) loadContinentsMap(config *goconf.ConfigFile) error {
}
func (m *mcuProxy) Start(ctx context.Context) error {
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
return m.config.Start()
}
@ -1557,19 +1572,11 @@ func (m *mcuProxy) KeepConnection(url string, ips ...net.IP) {
}
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
maxStreamBitrate = defaultMaxStreamBitrate
}
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate.Load())
m.maxStreamBitrate.Store(int32(maxStreamBitrate))
m.settings.Reload(config)
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
if maxScreenBitrate <= 0 {
maxScreenBitrate = defaultMaxScreenBitrate
if m.settings.Timeout() != m.dialer.HandshakeTimeout {
m.dialer.HandshakeTimeout = m.settings.Timeout()
}
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate.Load())
m.maxScreenBitrate.Store(int32(maxScreenBitrate))
if err := m.loadContinentsMap(config); err != nil {
log.Printf("Error loading continents map: %s", err)
@ -1766,9 +1773,9 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) {
func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher {
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = int(m.maxScreenBitrate.Load())
maxBitrate = int(m.settings.MaxScreenBitrate())
} else {
maxBitrate = int(m.maxStreamBitrate.Load())
maxBitrate = int(m.settings.MaxStreamBitrate())
}
if bitrate <= 0 {
bitrate = maxBitrate
@ -1781,7 +1788,7 @@ func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id
continue
}
subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout)
subctx, cancel := context.WithTimeout(ctx, m.settings.Timeout())
defer cancel()
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes)