Use "sync.Cond" to wait for publisher connection.

This commit is contained in:
Joachim Bauch 2026-03-10 09:05:59 +01:00
commit a6a1c35347
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
2 changed files with 32 additions and 40 deletions

View file

@ -46,7 +46,6 @@ import (
"github.com/gorilla/websocket"
"github.com/strukturag/nextcloud-spreed-signaling/v2/api"
"github.com/strukturag/nextcloud-spreed-signaling/v2/async"
"github.com/strukturag/nextcloud-spreed-signaling/v2/config"
"github.com/strukturag/nextcloud-spreed-signaling/v2/dns"
"github.com/strukturag/nextcloud-spreed-signaling/v2/etcd"
@ -1540,9 +1539,8 @@ type proxySFU struct {
mu sync.RWMutex
// +checklocks:mu
publishers map[sfu.StreamId]*proxyConnection
publisherWaiters async.ChannelWaiters
publishers map[sfu.StreamId]*proxyConnection
publishersCond sync.Cond
continentsMap atomic.Value
@ -1595,6 +1593,7 @@ func NewProxySFU(ctx context.Context, config *goconf.ConfigFile, etcdClient etcd
rpcClients: rpcClients,
}
mcu.publishersCond.L = &mcu.mu
if err := mcu.loadContinentsMap(config); err != nil {
return nil, err
@ -2132,9 +2131,9 @@ func (m *proxySFU) createPublisher(ctx context.Context, listener sfu.Listener, i
}
m.mu.Lock()
defer m.mu.Unlock()
m.publishers[sfu.GetStreamId(id, streamType)] = conn
m.mu.Unlock()
m.publisherWaiters.Wakeup()
m.publishersCond.Broadcast()
return publisher
}
@ -2211,25 +2210,21 @@ func (m *proxySFU) waitForPublisherConnection(ctx context.Context, publisher api
return conn
}
ch := make(chan struct{}, 1)
id := m.publisherWaiters.Add(ch)
defer m.publisherWaiters.Remove(id)
stop := context.AfterFunc(ctx, func() {
m.publishersCond.Broadcast()
})
defer stop()
sfuinternal.StatsWaitingForPublisherTotal.WithLabelValues(string(streamType)).Inc()
for {
m.mu.Unlock()
select {
case <-ch:
m.mu.Lock()
conn = m.publishers[sfu.GetStreamId(publisher, streamType)]
if conn != nil {
return conn
}
case <-ctx.Done():
m.mu.Lock()
for conn == nil {
if err := ctx.Err(); err != nil {
return nil
}
m.publishersCond.Wait()
conn = m.publishers[sfu.GetStreamId(publisher, streamType)]
}
return conn
}
type proxyPublisherInfo struct {

View file

@ -42,7 +42,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/v2/api"
"github.com/strukturag/nextcloud-spreed-signaling/v2/async"
dnstest "github.com/strukturag/nextcloud-spreed-signaling/v2/dns/test"
"github.com/strukturag/nextcloud-spreed-signaling/v2/etcd"
etcdtest "github.com/strukturag/nextcloud-spreed-signaling/v2/etcd/test"
@ -1248,14 +1247,16 @@ type publisherHub struct {
mu sync.Mutex
// +checklocks:mu
publishers map[api.PublicSessionId]*proxyPublisher
waiter async.ChannelWaiters // +checklocksignore: Has its own locking.
publishers map[api.PublicSessionId]*proxyPublisher
publishersCond sync.Cond
}
func newPublisherHub() *publisherHub {
return &publisherHub{
hub := &publisherHub{
publishers: make(map[api.PublicSessionId]*proxyPublisher),
}
hub.publishersCond.L = &hub.mu
return hub
}
func (h *publisherHub) addPublisher(publisher *proxyPublisher) {
@ -1263,30 +1264,26 @@ func (h *publisherHub) addPublisher(publisher *proxyPublisher) {
defer h.mu.Unlock()
h.publishers[publisher.PublisherId()] = publisher
h.waiter.Wakeup()
h.publishersCond.Broadcast()
}
func (h *publisherHub) GetPublisherIdForSessionId(ctx context.Context, sessionId api.PublicSessionId, streamType sfu.StreamType) (*grpc.GetPublisherIdReply, error) {
h.mu.Lock()
defer h.mu.Unlock()
pub, found := h.publishers[sessionId]
if !found {
ch := make(chan struct{}, 1)
id := h.waiter.Add(ch)
defer h.waiter.Remove(id)
stop := context.AfterFunc(ctx, func() {
h.publishersCond.Broadcast()
})
defer stop()
for !found {
h.mu.Unlock()
select {
case <-ch:
h.mu.Lock()
pub, found = h.publishers[sessionId]
case <-ctx.Done():
h.mu.Lock()
return nil, ctx.Err()
}
pub, found := h.publishers[sessionId]
for !found {
if err := ctx.Err(); err != nil {
return nil, err
}
h.publishersCond.Wait()
pub, found = h.publishers[sessionId]
}
connToken, err := pub.conn.proxy.CreateToken("")