Use "sync.Cond" to wait for created publishers.

This commit is contained in:
Joachim Bauch 2026-03-10 09:38:26 +01:00
commit bd3c06c9eb
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
4 changed files with 76 additions and 26 deletions

View file

@ -231,6 +231,11 @@ func (s *prometheusJanusStats) DecSubscriber(streamType sfu.StreamType) {
sfuinternal.StatsSubscribersCurrent.WithLabelValues(string(streamType)).Dec()
}
type refcountCond struct {
ref int
cond *sync.Cond
}
type janusSFU struct {
logger log.Logger
@ -257,8 +262,9 @@ type janusSFU struct {
clientId atomic.Uint64
// +checklocks:mu
publishers map[sfu.StreamId]*janusPublisher
publisherCreated async.Notifier
publishers map[sfu.StreamId]*janusPublisher
// +checklocks:mu
publisherCreated map[sfu.StreamId]*refcountCond
publisherConnected async.Notifier
// +checklocks:mu
remotePublishers map[sfu.StreamId]*janusRemotePublisher
@ -289,6 +295,7 @@ func NewJanusSFU(ctx context.Context, url string, config *goconf.ConfigFile) (sf
clients: make(map[uint64]clientInterface),
publishers: make(map[sfu.StreamId]*janusPublisher),
publisherCreated: make(map[sfu.StreamId]*refcountCond),
remotePublishers: make(map[sfu.StreamId]*janusRemotePublisher),
createJanusGateway: func(ctx context.Context, wsURL string, listener janus.GatewayListener) (janus.GatewayInterface, error) {
@ -438,7 +445,10 @@ func (m *janusSFU) doReconnect(ctx context.Context) {
m.logger.Println("Reconnection to Janus gateway successful")
m.mu.Lock()
clear(m.publishers)
m.publisherCreated.Reset()
for _, c := range m.publisherCreated {
c.cond.Broadcast()
}
clear(m.publisherCreated)
m.publisherConnected.Reset()
m.reconnectInterval = initialReconnectInterval
m.mu.Unlock()
@ -854,40 +864,80 @@ func (m *janusSFU) NewPublisher(ctx context.Context, listener sfu.Listener, id a
m.registerClient(client)
m.logger.Printf("Publisher %s is using handle %d", client.id, handle.Id)
go client.run(handle, client.closeChan)
m.mu.Lock()
m.publishers[sfu.GetStreamId(id, streamType)] = client
m.publisherCreated.Notify(string(sfu.GetStreamId(id, streamType)))
m.mu.Unlock()
m.notifyPublisherCreated(id, streamType, client)
sfuinternal.StatsPublishersCurrent.WithLabelValues(string(streamType)).Inc()
sfuinternal.StatsPublishersTotal.WithLabelValues(string(streamType)).Inc()
return client, nil
}
func (m *janusSFU) notifyPublisherCreated(id api.PublicSessionId, streamType sfu.StreamType, client *janusPublisher) {
key := sfu.GetStreamId(id, streamType)
m.mu.Lock()
defer m.mu.Unlock()
m.publishers[key] = client
if c, found := m.publisherCreated[key]; found {
c.cond.Broadcast()
delete(m.publisherCreated, key)
}
}
func (m *janusSFU) notifyPublisherConnected(id api.PublicSessionId, streamType sfu.StreamType) {
key := sfu.GetStreamId(id, streamType)
m.publisherConnected.Notify(string(key))
}
func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, func()) {
key := sfu.GetStreamId(id, streamType)
waiter := m.publisherConnected.NewWaiter(string(key))
stopFunc := func() {
m.publisherConnected.Release(waiter)
}
return waiter, stopFunc
}
func (m *janusSFU) getPublisher(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janusPublisher, error) {
// Do the direct check immediately as this should be the normal case.
key := sfu.GetStreamId(publisher, streamType)
m.mu.Lock()
if result, found := m.publishers[key]; found {
m.mu.Unlock()
defer m.mu.Unlock()
result, found := m.publishers[key]
if found {
return result, nil
}
waiter := m.publisherCreated.NewWaiter(string(key))
m.mu.Unlock()
defer m.publisherCreated.Release(waiter)
for {
m.mu.Lock()
result := m.publishers[key]
m.mu.Unlock()
if result != nil {
return result, nil
}
if err := waiter.Wait(ctx); err != nil {
return nil, err
c, found := m.publisherCreated[key]
if !found {
c = &refcountCond{
cond: sync.NewCond(&m.mu),
}
m.publisherCreated[key] = c
}
c.ref++
stop := context.AfterFunc(ctx, func() {
c.cond.Broadcast()
})
defer stop()
for result == nil && ctx.Err() == nil {
c.cond.Wait()
result = m.publishers[key]
}
c.ref--
if c.ref == 0 {
delete(m.publisherCreated, key)
}
if err := ctx.Err(); err != nil {
return nil, err
}
return result, nil
}
func (m *janusSFU) getOrCreateSubscriberHandle(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janus.Handle, *janusPublisher, error) {

View file

@ -93,7 +93,7 @@ func (p *janusPublisher) handleDetached(event *janus.DetachedMsg) {
func (p *janusPublisher) handleConnected(event *janus.WebRTCUpMsg) {
p.logger.Printf("Publisher %d received connected", p.handleId.Load())
p.mcu.publisherConnected.Notify(string(sfu.GetStreamId(p.id, p.streamType)))
p.mcu.notifyPublisherConnected(p.id, p.streamType)
}
func (p *janusPublisher) handleSlowLink(event *janus.SlowLinkMsg) {

View file

@ -86,7 +86,7 @@ func (p *janusRemotePublisher) handleDetached(event *janus.DetachedMsg) {
func (p *janusRemotePublisher) handleConnected(event *janus.WebRTCUpMsg) {
p.logger.Printf("Remote publisher %d received connected", p.handleId.Load())
p.mcu.publisherConnected.Notify(string(sfu.GetStreamId(p.id, p.streamType)))
p.mcu.notifyPublisherConnected(p.id, p.streamType)
}
func (p *janusRemotePublisher) handleSlowLink(event *janus.SlowLinkMsg) {

View file

@ -169,8 +169,8 @@ func (p *janusSubscriber) joinRoom(ctx context.Context, stream *streamSelection,
return
}
waiter := p.mcu.publisherConnected.NewWaiter(string(sfu.GetStreamId(p.publisher, p.streamType)))
defer p.mcu.publisherConnected.Release(waiter)
waiter, stop := p.mcu.newPublisherConnectedWaiter(p.publisher, p.streamType)
defer stop()
loggedNotPublishingYet := false
retry: