Use "sync.Cond" to wait for publisher to be created.

This commit is contained in:
Joachim Bauch 2026-03-10 08:59:43 +01:00
commit a9e58ec60a
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02

View file

@ -36,7 +36,6 @@ import (
"github.com/pion/sdp/v3"
"github.com/strukturag/nextcloud-spreed-signaling/v2/api"
"github.com/strukturag/nextcloud-spreed-signaling/v2/async"
"github.com/strukturag/nextcloud-spreed-signaling/v2/async/events"
"github.com/strukturag/nextcloud-spreed-signaling/v2/internal"
"github.com/strukturag/nextcloud-spreed-signaling/v2/log"
@ -98,7 +97,7 @@ type ClientSession struct {
// +checklocks:roomSessionIdLock
roomSessionId api.RoomSessionId
publisherWaiters async.ChannelWaiters // +checklocksignore
publishersCond sync.Cond
// +checklocks:mu
publishers map[sfu.StreamType]sfu.Publisher
@ -148,6 +147,7 @@ func NewClientSession(hub *Hub, privateId api.PrivateSessionId, publicId api.Pub
backend: backend,
asyncCh: make(events.AsyncChannel, events.DefaultAsyncChannelSize),
}
s.publishersCond.L = &s.mu
if s.clientType == api.HelloClientTypeInternal {
s.backendUrl = hello.Auth.InternalParams.Backend
s.parsedBackendUrl = hello.Auth.InternalParams.ParsedBackend
@ -991,7 +991,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu sfu.SFU, s
s.publishers[streamType] = publisher
}
s.logger.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId())
s.publisherWaiters.Wakeup()
s.publishersCond.Broadcast()
} else {
publisher.SetMedia(mediaTypes)
}
@ -1020,24 +1020,20 @@ func (s *ClientSession) GetOrWaitForPublisher(ctx context.Context, streamType sf
return publisher
}
ch := make(chan struct{}, 1)
id := s.publisherWaiters.Add(ch)
defer s.publisherWaiters.Remove(id)
stop := context.AfterFunc(ctx, func() {
s.publishersCond.Broadcast()
})
defer stop()
for {
s.mu.Unlock()
select {
case <-ch:
s.mu.Lock()
publisher := s.getPublisherLocked(streamType)
if publisher != nil {
return publisher
}
case <-ctx.Done():
s.mu.Lock()
for publisher == nil {
if err := ctx.Err(); err != nil {
return nil
}
s.publishersCond.Wait()
publisher = s.getPublisherLocked(streamType)
}
return publisher
}
func (s *ClientSession) GetOrCreateSubscriber(ctx context.Context, mcu sfu.SFU, id api.PublicSessionId, streamType sfu.StreamType) (sfu.Subscriber, error) {