From f195492f8e42a92314feb69611d4751e5e9ef191 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 10 Mar 2026 08:36:50 +0100 Subject: [PATCH 1/7] Use "sync.Cond" instead of SingleNotifier. --- sfu/proxy/proxy.go | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/sfu/proxy/proxy.go b/sfu/proxy/proxy.go index 12ea56f..77c60a4 100644 --- a/sfu/proxy/proxy.go +++ b/sfu/proxy/proxy.go @@ -390,7 +390,8 @@ type proxyConnection struct { // +checklocks:mu conn *websocket.Conn - helloProcessed atomic.Bool + // +checklocks:mu + helloProcessed bool connectedSince atomic.Int64 reconnectTimer *time.Timer reconnectInterval atomic.Int64 @@ -399,7 +400,7 @@ type proxyConnection struct { trackClose atomic.Bool temporary atomic.Bool - connectedNotifier async.SingleNotifier + connectedCond sync.Cond msgId atomic.Int64 helloMsgId string @@ -444,6 +445,7 @@ func newProxyConnection(proxy *proxySFU, baseUrl string, ip net.IP, token string publisherIds: make(map[sfu.StreamId]api.PublicSessionId), subscribers: make(map[string]*proxySubscriber), } + conn.connectedCond.L = &conn.mu conn.reconnectInterval.Store(int64(initialReconnectInterval)) conn.load.Store(loadNotConnected) conn.bandwidth.Store(nil) @@ -588,7 +590,7 @@ func (c *proxyConnection) SessionId() api.PublicSessionId { func (c *proxyConnection) IsConnected() bool { c.mu.Lock() defer c.mu.Unlock() - return c.conn != nil && c.helloProcessed.Load() && c.SessionId() != "" + return c.conn != nil && c.helloProcessed && c.SessionId() != "" } func (c *proxyConnection) IsTemporary() bool { @@ -746,12 +748,10 @@ func (c *proxyConnection) stop(ctx context.Context) { } func (c *proxyConnection) close() { - c.helloProcessed.Store(false) - c.mu.Lock() defer c.mu.Unlock() - c.connectedNotifier.Reset() + c.helloProcessed = false if c.conn != nil { c.conn.Close() @@ -857,10 +857,10 @@ func (c *proxyConnection) reconnect() { c.logger.Printf("Connected to %s", c) c.closed.Store(false) - c.helloProcessed.Store(false) c.connectedSince.Store(time.Now().UnixMicro()) c.mu.Lock() + c.helloProcessed = false c.conn = conn c.mu.Unlock() @@ -887,12 +887,20 @@ func (c *proxyConnection) waitUntilConnected(ctx context.Context) error { return nil } - waiter := c.connectedNotifier.NewWaiter() - defer c.connectedNotifier.Release(waiter) + stop := context.AfterFunc(ctx, func() { + c.connectedCond.Broadcast() + }) + defer stop() - c.mu.Unlock() - defer c.mu.Lock() - return waiter.Wait(ctx) + for !c.helloProcessed { + if err := ctx.Err(); err != nil { + return err + } + + c.connectedCond.Wait() + } + + return nil } func (c *proxyConnection) removePublisher(publisher *proxyPublisher) { @@ -1057,8 +1065,10 @@ func (c *proxyConnection) processMessage(msg *proxy.ServerMessage) { statsConnectedProxyBackendsCurrent.WithLabelValues(string(c.Country())).Inc() } - c.helloProcessed.Store(true) - c.connectedNotifier.Notify() + c.mu.Lock() + c.helloProcessed = true + c.connectedCond.Broadcast() + c.mu.Unlock() default: c.logger.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c) c.scheduleReconnect() From 9c10675867fa2af161d1472c034f65955447b3ee Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 10 Mar 2026 08:50:45 +0100 Subject: [PATCH 2/7] Simplify notifier code and remove unused. --- async/notifier.go | 68 ++++++++++------- async/notifier_test.go | 26 +++++++ async/single_notifier.go | 128 -------------------------------- async/single_notifier_test.go | 134 ---------------------------------- 4 files changed, 67 insertions(+), 289 deletions(-) delete mode 100644 async/single_notifier.go delete mode 100644 async/single_notifier_test.go diff --git a/async/notifier.go b/async/notifier.go index 06305aa..ab9138a 100644 --- a/async/notifier.go +++ b/async/notifier.go @@ -26,21 +26,34 @@ import ( "sync" ) +type rootWaiter struct { + key string + ch chan struct{} +} + +func (w *rootWaiter) notify() { + close(w.ch) +} + type Waiter struct { key string - - sw *SingleWaiter + ch <-chan struct{} } func (w *Waiter) Wait(ctx context.Context) error { - return w.sw.Wait(ctx) + select { + case <-w.ch: + return nil + case <-ctx.Done(): + return ctx.Err() + } } type Notifier struct { sync.Mutex // +checklocks:Mutex - waiters map[string]*Waiter + waiters map[string]*rootWaiter // +checklocks:Mutex waiterMap map[string]map[*Waiter]bool } @@ -50,31 +63,30 @@ func (n *Notifier) NewWaiter(key string) *Waiter { defer n.Unlock() waiter, found := n.waiters[key] - if found { - w := &Waiter{ + if !found { + waiter = &rootWaiter{ key: key, - sw: waiter.sw, + ch: make(chan struct{}), + } + + if n.waiters == nil { + n.waiters = make(map[string]*rootWaiter) + } + if n.waiterMap == nil { + n.waiterMap = make(map[string]map[*Waiter]bool) + } + n.waiters[key] = waiter + if _, found := n.waiterMap[key]; !found { + n.waiterMap[key] = make(map[*Waiter]bool) } - n.waiterMap[key][w] = true - return w } - waiter = &Waiter{ + w := &Waiter{ key: key, - sw: newSingleWaiter(), + ch: waiter.ch, } - if n.waiters == nil { - n.waiters = make(map[string]*Waiter) - } - if n.waiterMap == nil { - n.waiterMap = make(map[string]map[*Waiter]bool) - } - n.waiters[key] = waiter - if _, found := n.waiterMap[key]; !found { - n.waiterMap[key] = make(map[*Waiter]bool) - } - n.waiterMap[key][waiter] = true - return waiter + n.waiterMap[key][w] = true + return w } func (n *Notifier) Reset() { @@ -82,7 +94,7 @@ func (n *Notifier) Reset() { defer n.Unlock() for _, w := range n.waiters { - w.sw.cancel() + w.notify() } n.waiters = nil n.waiterMap = nil @@ -96,8 +108,10 @@ func (n *Notifier) Release(w *Waiter) { if _, found := waiters[w]; found { delete(waiters, w) if len(waiters) == 0 { - delete(n.waiters, w.key) - w.sw.cancel() + if root, found := n.waiters[w.key]; found { + delete(n.waiters, w.key) + root.notify() + } } } } @@ -108,7 +122,7 @@ func (n *Notifier) Notify(key string) { defer n.Unlock() if w, found := n.waiters[key]; found { - w.sw.cancel() + w.notify() delete(n.waiters, w.key) delete(n.waiterMap, w.key) } diff --git a/async/notifier_test.go b/async/notifier_test.go index 21ba219..0596767 100644 --- a/async/notifier_test.go +++ b/async/notifier_test.go @@ -39,6 +39,32 @@ func TestNotifierNoWaiter(t *testing.T) { notifier.Notify("foo") } +func TestNotifierWaitTimeout(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + var notifier Notifier + + notified := make(chan struct{}) + go func() { + defer close(notified) + time.Sleep(time.Second) + notifier.Notify("foo") + }() + + ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) + defer cancel() + + waiter := notifier.NewWaiter("foo") + defer notifier.Release(waiter) + + err := waiter.Wait(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + <-notified + + assert.NoError(t, waiter.Wait(t.Context())) + }) +} + func TestNotifierSimple(t *testing.T) { t.Parallel() diff --git a/async/single_notifier.go b/async/single_notifier.go deleted file mode 100644 index 28c3a45..0000000 --- a/async/single_notifier.go +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Standalone signaling server for the Nextcloud Spreed app. - * Copyright (C) 2022 struktur AG - * - * @author Joachim Bauch - * - * @license GNU AGPL version 3 or any later version - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package async - -import ( - "context" - "sync" -) - -type SingleWaiter struct { - root bool - ch chan struct{} - once sync.Once -} - -func newSingleWaiter() *SingleWaiter { - return &SingleWaiter{ - root: true, - ch: make(chan struct{}), - } -} - -func (w *SingleWaiter) subWaiter() *SingleWaiter { - return &SingleWaiter{ - ch: w.ch, - } -} - -func (w *SingleWaiter) Wait(ctx context.Context) error { - select { - case <-w.ch: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func (w *SingleWaiter) cancel() { - if !w.root { - return - } - - w.once.Do(func() { - close(w.ch) - }) -} - -type SingleNotifier struct { - sync.Mutex - - // +checklocks:Mutex - waiter *SingleWaiter - // +checklocks:Mutex - waiters map[*SingleWaiter]bool -} - -func (n *SingleNotifier) NewWaiter() *SingleWaiter { - n.Lock() - defer n.Unlock() - - if n.waiter == nil { - n.waiter = newSingleWaiter() - } - - if n.waiters == nil { - n.waiters = make(map[*SingleWaiter]bool) - } - - w := n.waiter.subWaiter() - n.waiters[w] = true - return w -} - -func (n *SingleNotifier) Reset() { - n.Lock() - defer n.Unlock() - - if n.waiter != nil { - n.waiter.cancel() - n.waiter = nil - } - n.waiters = nil -} - -func (n *SingleNotifier) Release(w *SingleWaiter) { - n.Lock() - defer n.Unlock() - - if _, found := n.waiters[w]; found { - delete(n.waiters, w) - if len(n.waiters) == 0 { - n.waiters = nil - if n.waiter != nil { - n.waiter.cancel() - n.waiter = nil - } - } - } -} - -func (n *SingleNotifier) Notify() { - n.Lock() - defer n.Unlock() - - if n.waiter != nil { - n.waiter.cancel() - } - n.waiters = nil -} diff --git a/async/single_notifier_test.go b/async/single_notifier_test.go deleted file mode 100644 index 83de3e6..0000000 --- a/async/single_notifier_test.go +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Standalone signaling server for the Nextcloud Spreed app. - * Copyright (C) 2022 struktur AG - * - * @author Joachim Bauch - * - * @license GNU AGPL version 3 or any later version - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package async - -import ( - "context" - "sync" - "testing" - "testing/synctest" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestSingleNotifierNoWaiter(t *testing.T) { - t.Parallel() - var notifier SingleNotifier - - // Notifications can be sent even if no waiter exists. - notifier.Notify() -} - -func TestSingleNotifierSimple(t *testing.T) { - t.Parallel() - - var notifier SingleNotifier - waiter := notifier.NewWaiter() - defer notifier.Release(waiter) - - var wg sync.WaitGroup - wg.Go(func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - assert.NoError(t, waiter.Wait(ctx)) - }) - - notifier.Notify() - wg.Wait() -} - -func TestSingleNotifierMultiNotify(t *testing.T) { - t.Parallel() - var notifier SingleNotifier - - waiter := notifier.NewWaiter() - defer notifier.Release(waiter) - - notifier.Notify() - // The second notification will be ignored while the first is still pending. - notifier.Notify() -} - -func TestSingleNotifierWaitClosed(t *testing.T) { - t.Parallel() - var notifier SingleNotifier - - waiter := notifier.NewWaiter() - notifier.Release(waiter) - - assert.NoError(t, waiter.Wait(context.Background())) -} - -func TestSingleNotifierWaitClosedMulti(t *testing.T) { - t.Parallel() - var notifier SingleNotifier - - waiter1 := notifier.NewWaiter() - waiter2 := notifier.NewWaiter() - notifier.Release(waiter1) - notifier.Release(waiter2) - - assert.NoError(t, waiter1.Wait(context.Background())) - assert.NoError(t, waiter2.Wait(context.Background())) -} - -func TestSingleNotifierResetWillNotify(t *testing.T) { - t.Parallel() - - var notifier SingleNotifier - waiter := notifier.NewWaiter() - defer notifier.Release(waiter) - - var wg sync.WaitGroup - wg.Go(func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - assert.NoError(t, waiter.Wait(ctx)) - }) - - notifier.Reset() - wg.Wait() -} - -func TestSingleNotifierDuplicate(t *testing.T) { - t.Parallel() - synctest.Test(t, func(t *testing.T) { - var notifier SingleNotifier - var done sync.WaitGroup - - for range 2 { - done.Go(func() { - waiter := notifier.NewWaiter() - defer notifier.Release(waiter) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - assert.NoError(t, waiter.Wait(ctx)) - }) - } - - synctest.Wait() - notifier.Notify() - done.Wait() - }) -} From a9e58ec60ae019d5feed8303e0f2c8198ffb6be4 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 10 Mar 2026 08:59:43 +0100 Subject: [PATCH 3/7] Use "sync.Cond" to wait for publisher to be created. --- server/clientsession.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/server/clientsession.go b/server/clientsession.go index e087366..2ad59c1 100644 --- a/server/clientsession.go +++ b/server/clientsession.go @@ -36,7 +36,6 @@ import ( "github.com/pion/sdp/v3" "github.com/strukturag/nextcloud-spreed-signaling/v2/api" - "github.com/strukturag/nextcloud-spreed-signaling/v2/async" "github.com/strukturag/nextcloud-spreed-signaling/v2/async/events" "github.com/strukturag/nextcloud-spreed-signaling/v2/internal" "github.com/strukturag/nextcloud-spreed-signaling/v2/log" @@ -98,7 +97,7 @@ type ClientSession struct { // +checklocks:roomSessionIdLock roomSessionId api.RoomSessionId - publisherWaiters async.ChannelWaiters // +checklocksignore + publishersCond sync.Cond // +checklocks:mu publishers map[sfu.StreamType]sfu.Publisher @@ -148,6 +147,7 @@ func NewClientSession(hub *Hub, privateId api.PrivateSessionId, publicId api.Pub backend: backend, asyncCh: make(events.AsyncChannel, events.DefaultAsyncChannelSize), } + s.publishersCond.L = &s.mu if s.clientType == api.HelloClientTypeInternal { s.backendUrl = hello.Auth.InternalParams.Backend s.parsedBackendUrl = hello.Auth.InternalParams.ParsedBackend @@ -991,7 +991,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu sfu.SFU, s s.publishers[streamType] = publisher } s.logger.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId()) - s.publisherWaiters.Wakeup() + s.publishersCond.Broadcast() } else { publisher.SetMedia(mediaTypes) } @@ -1020,24 +1020,20 @@ func (s *ClientSession) GetOrWaitForPublisher(ctx context.Context, streamType sf return publisher } - ch := make(chan struct{}, 1) - id := s.publisherWaiters.Add(ch) - defer s.publisherWaiters.Remove(id) + stop := context.AfterFunc(ctx, func() { + s.publishersCond.Broadcast() + }) + defer stop() - for { - s.mu.Unlock() - select { - case <-ch: - s.mu.Lock() - publisher := s.getPublisherLocked(streamType) - if publisher != nil { - return publisher - } - case <-ctx.Done(): - s.mu.Lock() + for publisher == nil { + if err := ctx.Err(); err != nil { return nil } + + s.publishersCond.Wait() + publisher = s.getPublisherLocked(streamType) } + return publisher } func (s *ClientSession) GetOrCreateSubscriber(ctx context.Context, mcu sfu.SFU, id api.PublicSessionId, streamType sfu.StreamType) (sfu.Subscriber, error) { From a6a1c3534746452087b75ef615dd922a406b9482 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 10 Mar 2026 09:05:59 +0100 Subject: [PATCH 4/7] Use "sync.Cond" to wait for publisher connection. --- sfu/proxy/proxy.go | 35 +++++++++++++++-------------------- sfu/proxy/proxy_test.go | 37 +++++++++++++++++-------------------- 2 files changed, 32 insertions(+), 40 deletions(-) diff --git a/sfu/proxy/proxy.go b/sfu/proxy/proxy.go index 77c60a4..52278c9 100644 --- a/sfu/proxy/proxy.go +++ b/sfu/proxy/proxy.go @@ -46,7 +46,6 @@ import ( "github.com/gorilla/websocket" "github.com/strukturag/nextcloud-spreed-signaling/v2/api" - "github.com/strukturag/nextcloud-spreed-signaling/v2/async" "github.com/strukturag/nextcloud-spreed-signaling/v2/config" "github.com/strukturag/nextcloud-spreed-signaling/v2/dns" "github.com/strukturag/nextcloud-spreed-signaling/v2/etcd" @@ -1540,9 +1539,8 @@ type proxySFU struct { mu sync.RWMutex // +checklocks:mu - publishers map[sfu.StreamId]*proxyConnection - - publisherWaiters async.ChannelWaiters + publishers map[sfu.StreamId]*proxyConnection + publishersCond sync.Cond continentsMap atomic.Value @@ -1595,6 +1593,7 @@ func NewProxySFU(ctx context.Context, config *goconf.ConfigFile, etcdClient etcd rpcClients: rpcClients, } + mcu.publishersCond.L = &mcu.mu if err := mcu.loadContinentsMap(config); err != nil { return nil, err @@ -2132,9 +2131,9 @@ func (m *proxySFU) createPublisher(ctx context.Context, listener sfu.Listener, i } m.mu.Lock() + defer m.mu.Unlock() m.publishers[sfu.GetStreamId(id, streamType)] = conn - m.mu.Unlock() - m.publisherWaiters.Wakeup() + m.publishersCond.Broadcast() return publisher } @@ -2211,25 +2210,21 @@ func (m *proxySFU) waitForPublisherConnection(ctx context.Context, publisher api return conn } - ch := make(chan struct{}, 1) - id := m.publisherWaiters.Add(ch) - defer m.publisherWaiters.Remove(id) + stop := context.AfterFunc(ctx, func() { + m.publishersCond.Broadcast() + }) + defer stop() sfuinternal.StatsWaitingForPublisherTotal.WithLabelValues(string(streamType)).Inc() - for { - m.mu.Unlock() - select { - case <-ch: - m.mu.Lock() - conn = m.publishers[sfu.GetStreamId(publisher, streamType)] - if conn != nil { - return conn - } - case <-ctx.Done(): - m.mu.Lock() + for conn == nil { + if err := ctx.Err(); err != nil { return nil } + + m.publishersCond.Wait() + conn = m.publishers[sfu.GetStreamId(publisher, streamType)] } + return conn } type proxyPublisherInfo struct { diff --git a/sfu/proxy/proxy_test.go b/sfu/proxy/proxy_test.go index 932177e..9bab3e9 100644 --- a/sfu/proxy/proxy_test.go +++ b/sfu/proxy/proxy_test.go @@ -42,7 +42,6 @@ import ( "github.com/stretchr/testify/require" "github.com/strukturag/nextcloud-spreed-signaling/v2/api" - "github.com/strukturag/nextcloud-spreed-signaling/v2/async" dnstest "github.com/strukturag/nextcloud-spreed-signaling/v2/dns/test" "github.com/strukturag/nextcloud-spreed-signaling/v2/etcd" etcdtest "github.com/strukturag/nextcloud-spreed-signaling/v2/etcd/test" @@ -1248,14 +1247,16 @@ type publisherHub struct { mu sync.Mutex // +checklocks:mu - publishers map[api.PublicSessionId]*proxyPublisher - waiter async.ChannelWaiters // +checklocksignore: Has its own locking. + publishers map[api.PublicSessionId]*proxyPublisher + publishersCond sync.Cond } func newPublisherHub() *publisherHub { - return &publisherHub{ + hub := &publisherHub{ publishers: make(map[api.PublicSessionId]*proxyPublisher), } + hub.publishersCond.L = &hub.mu + return hub } func (h *publisherHub) addPublisher(publisher *proxyPublisher) { @@ -1263,30 +1264,26 @@ func (h *publisherHub) addPublisher(publisher *proxyPublisher) { defer h.mu.Unlock() h.publishers[publisher.PublisherId()] = publisher - h.waiter.Wakeup() + h.publishersCond.Broadcast() } func (h *publisherHub) GetPublisherIdForSessionId(ctx context.Context, sessionId api.PublicSessionId, streamType sfu.StreamType) (*grpc.GetPublisherIdReply, error) { h.mu.Lock() defer h.mu.Unlock() - pub, found := h.publishers[sessionId] - if !found { - ch := make(chan struct{}, 1) - id := h.waiter.Add(ch) - defer h.waiter.Remove(id) + stop := context.AfterFunc(ctx, func() { + h.publishersCond.Broadcast() + }) + defer stop() - for !found { - h.mu.Unlock() - select { - case <-ch: - h.mu.Lock() - pub, found = h.publishers[sessionId] - case <-ctx.Done(): - h.mu.Lock() - return nil, ctx.Err() - } + pub, found := h.publishers[sessionId] + for !found { + if err := ctx.Err(); err != nil { + return nil, err } + + h.publishersCond.Wait() + pub, found = h.publishers[sessionId] } connToken, err := pub.conn.proxy.CreateToken("") From 2bf6d00a13335efa3a3cef25245196cd866a9234 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 10 Mar 2026 09:06:23 +0100 Subject: [PATCH 5/7] Remove deprecated ChannelWaiter. --- async/channel_waiter.go | 64 --------------------------------- async/channel_waiter_test.go | 69 ------------------------------------ 2 files changed, 133 deletions(-) delete mode 100644 async/channel_waiter.go delete mode 100644 async/channel_waiter_test.go diff --git a/async/channel_waiter.go b/async/channel_waiter.go deleted file mode 100644 index 928c7d5..0000000 --- a/async/channel_waiter.go +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Standalone signaling server for the Nextcloud Spreed app. - * Copyright (C) 2023 struktur AG - * - * @author Joachim Bauch - * - * @license GNU AGPL version 3 or any later version - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package async - -import ( - "sync" -) - -type ChannelWaiters struct { - mu sync.RWMutex - // +checklocks:mu - id uint64 - // +checklocks:mu - waiters map[uint64]chan struct{} -} - -func (w *ChannelWaiters) Wakeup() { - w.mu.RLock() - defer w.mu.RUnlock() - for _, ch := range w.waiters { - select { - case ch <- struct{}{}: - default: - // Receiver is still processing previous wakeup. - } - } -} - -func (w *ChannelWaiters) Add(ch chan struct{}) uint64 { - w.mu.Lock() - defer w.mu.Unlock() - if w.waiters == nil { - w.waiters = make(map[uint64]chan struct{}) - } - id := w.id - w.id++ - w.waiters[id] = ch - return id -} - -func (w *ChannelWaiters) Remove(id uint64) { - w.mu.Lock() - defer w.mu.Unlock() - delete(w.waiters, id) -} diff --git a/async/channel_waiter_test.go b/async/channel_waiter_test.go deleted file mode 100644 index 3528e64..0000000 --- a/async/channel_waiter_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Standalone signaling server for the Nextcloud Spreed app. - * Copyright (C) 2023 struktur AG - * - * @author Joachim Bauch - * - * @license GNU AGPL version 3 or any later version - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package async - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestChannelWaiters(t *testing.T) { - t.Parallel() - var waiters ChannelWaiters - - ch1 := make(chan struct{}, 1) - id1 := waiters.Add(ch1) - defer waiters.Remove(id1) - - ch2 := make(chan struct{}, 1) - id2 := waiters.Add(ch2) - defer waiters.Remove(id2) - - waiters.Wakeup() - <-ch1 - <-ch2 - - select { - case <-ch1: - assert.Fail(t, "should have not received another event") - case <-ch2: - assert.Fail(t, "should have not received another event") - default: - } - - ch3 := make(chan struct{}, 1) - id3 := waiters.Add(ch3) - waiters.Remove(id3) - - // Multiple wakeups work even without processing. - waiters.Wakeup() - waiters.Wakeup() - waiters.Wakeup() - <-ch1 - <-ch2 - select { - case <-ch3: - assert.Fail(t, "should have not received another event") - default: - } -} From bd3c06c9ebf1efcff19baa1c630eab0aa32bf50e Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 10 Mar 2026 09:38:26 +0100 Subject: [PATCH 6/7] Use "sync.Cond" to wait for created publishers. --- sfu/janus/janus.go | 96 ++++++++++++++++++++++++++--------- sfu/janus/publisher.go | 2 +- sfu/janus/remote_publisher.go | 2 +- sfu/janus/subscriber.go | 4 +- 4 files changed, 77 insertions(+), 27 deletions(-) diff --git a/sfu/janus/janus.go b/sfu/janus/janus.go index 6f1baac..aacdf76 100644 --- a/sfu/janus/janus.go +++ b/sfu/janus/janus.go @@ -231,6 +231,11 @@ func (s *prometheusJanusStats) DecSubscriber(streamType sfu.StreamType) { sfuinternal.StatsSubscribersCurrent.WithLabelValues(string(streamType)).Dec() } +type refcountCond struct { + ref int + cond *sync.Cond +} + type janusSFU struct { logger log.Logger @@ -257,8 +262,9 @@ type janusSFU struct { clientId atomic.Uint64 // +checklocks:mu - publishers map[sfu.StreamId]*janusPublisher - publisherCreated async.Notifier + publishers map[sfu.StreamId]*janusPublisher + // +checklocks:mu + publisherCreated map[sfu.StreamId]*refcountCond publisherConnected async.Notifier // +checklocks:mu remotePublishers map[sfu.StreamId]*janusRemotePublisher @@ -289,6 +295,7 @@ func NewJanusSFU(ctx context.Context, url string, config *goconf.ConfigFile) (sf clients: make(map[uint64]clientInterface), publishers: make(map[sfu.StreamId]*janusPublisher), + publisherCreated: make(map[sfu.StreamId]*refcountCond), remotePublishers: make(map[sfu.StreamId]*janusRemotePublisher), createJanusGateway: func(ctx context.Context, wsURL string, listener janus.GatewayListener) (janus.GatewayInterface, error) { @@ -438,7 +445,10 @@ func (m *janusSFU) doReconnect(ctx context.Context) { m.logger.Println("Reconnection to Janus gateway successful") m.mu.Lock() clear(m.publishers) - m.publisherCreated.Reset() + for _, c := range m.publisherCreated { + c.cond.Broadcast() + } + clear(m.publisherCreated) m.publisherConnected.Reset() m.reconnectInterval = initialReconnectInterval m.mu.Unlock() @@ -854,40 +864,80 @@ func (m *janusSFU) NewPublisher(ctx context.Context, listener sfu.Listener, id a m.registerClient(client) m.logger.Printf("Publisher %s is using handle %d", client.id, handle.Id) go client.run(handle, client.closeChan) - m.mu.Lock() - m.publishers[sfu.GetStreamId(id, streamType)] = client - m.publisherCreated.Notify(string(sfu.GetStreamId(id, streamType))) - m.mu.Unlock() + + m.notifyPublisherCreated(id, streamType, client) sfuinternal.StatsPublishersCurrent.WithLabelValues(string(streamType)).Inc() sfuinternal.StatsPublishersTotal.WithLabelValues(string(streamType)).Inc() return client, nil } +func (m *janusSFU) notifyPublisherCreated(id api.PublicSessionId, streamType sfu.StreamType, client *janusPublisher) { + key := sfu.GetStreamId(id, streamType) + m.mu.Lock() + defer m.mu.Unlock() + + m.publishers[key] = client + if c, found := m.publisherCreated[key]; found { + c.cond.Broadcast() + delete(m.publisherCreated, key) + } +} + +func (m *janusSFU) notifyPublisherConnected(id api.PublicSessionId, streamType sfu.StreamType) { + key := sfu.GetStreamId(id, streamType) + m.publisherConnected.Notify(string(key)) +} + +func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, func()) { + key := sfu.GetStreamId(id, streamType) + + waiter := m.publisherConnected.NewWaiter(string(key)) + stopFunc := func() { + m.publisherConnected.Release(waiter) + } + + return waiter, stopFunc +} + func (m *janusSFU) getPublisher(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janusPublisher, error) { // Do the direct check immediately as this should be the normal case. key := sfu.GetStreamId(publisher, streamType) m.mu.Lock() - if result, found := m.publishers[key]; found { - m.mu.Unlock() + defer m.mu.Unlock() + result, found := m.publishers[key] + if found { return result, nil } - waiter := m.publisherCreated.NewWaiter(string(key)) - m.mu.Unlock() - defer m.publisherCreated.Release(waiter) - - for { - m.mu.Lock() - result := m.publishers[key] - m.mu.Unlock() - if result != nil { - return result, nil - } - - if err := waiter.Wait(ctx); err != nil { - return nil, err + c, found := m.publisherCreated[key] + if !found { + c = &refcountCond{ + cond: sync.NewCond(&m.mu), } + m.publisherCreated[key] = c } + c.ref++ + + stop := context.AfterFunc(ctx, func() { + c.cond.Broadcast() + }) + defer stop() + + for result == nil && ctx.Err() == nil { + c.cond.Wait() + result = m.publishers[key] + } + + c.ref-- + if c.ref == 0 { + delete(m.publisherCreated, key) + } + + if err := ctx.Err(); err != nil { + return nil, err + } + + return result, nil } func (m *janusSFU) getOrCreateSubscriberHandle(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janus.Handle, *janusPublisher, error) { diff --git a/sfu/janus/publisher.go b/sfu/janus/publisher.go index a7b2fab..3a2a511 100644 --- a/sfu/janus/publisher.go +++ b/sfu/janus/publisher.go @@ -93,7 +93,7 @@ func (p *janusPublisher) handleDetached(event *janus.DetachedMsg) { func (p *janusPublisher) handleConnected(event *janus.WebRTCUpMsg) { p.logger.Printf("Publisher %d received connected", p.handleId.Load()) - p.mcu.publisherConnected.Notify(string(sfu.GetStreamId(p.id, p.streamType))) + p.mcu.notifyPublisherConnected(p.id, p.streamType) } func (p *janusPublisher) handleSlowLink(event *janus.SlowLinkMsg) { diff --git a/sfu/janus/remote_publisher.go b/sfu/janus/remote_publisher.go index 6276d2e..dd85657 100644 --- a/sfu/janus/remote_publisher.go +++ b/sfu/janus/remote_publisher.go @@ -86,7 +86,7 @@ func (p *janusRemotePublisher) handleDetached(event *janus.DetachedMsg) { func (p *janusRemotePublisher) handleConnected(event *janus.WebRTCUpMsg) { p.logger.Printf("Remote publisher %d received connected", p.handleId.Load()) - p.mcu.publisherConnected.Notify(string(sfu.GetStreamId(p.id, p.streamType))) + p.mcu.notifyPublisherConnected(p.id, p.streamType) } func (p *janusRemotePublisher) handleSlowLink(event *janus.SlowLinkMsg) { diff --git a/sfu/janus/subscriber.go b/sfu/janus/subscriber.go index e74c5aa..0d3bd73 100644 --- a/sfu/janus/subscriber.go +++ b/sfu/janus/subscriber.go @@ -169,8 +169,8 @@ func (p *janusSubscriber) joinRoom(ctx context.Context, stream *streamSelection, return } - waiter := p.mcu.publisherConnected.NewWaiter(string(sfu.GetStreamId(p.publisher, p.streamType))) - defer p.mcu.publisherConnected.Release(waiter) + waiter, stop := p.mcu.newPublisherConnectedWaiter(p.publisher, p.streamType) + defer stop() loggedNotPublishingYet := false retry: From 182a0b78e2f6607694d169e279488e5632046429 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 10 Mar 2026 10:04:18 +0100 Subject: [PATCH 7/7] Simplify API for waiter releasing. --- async/notifier.go | 11 ++++++++--- async/notifier_test.go | 32 ++++++++++++++++---------------- sfu/janus/janus.go | 9 ++------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/async/notifier.go b/async/notifier.go index ab9138a..747ec5e 100644 --- a/async/notifier.go +++ b/async/notifier.go @@ -58,7 +58,9 @@ type Notifier struct { waiterMap map[string]map[*Waiter]bool } -func (n *Notifier) NewWaiter(key string) *Waiter { +type ReleaseFunc func() + +func (n *Notifier) NewWaiter(key string) (*Waiter, ReleaseFunc) { n.Lock() defer n.Unlock() @@ -86,7 +88,10 @@ func (n *Notifier) NewWaiter(key string) *Waiter { ch: waiter.ch, } n.waiterMap[key][w] = true - return w + releaseFunc := func() { + n.release(w) + } + return w, releaseFunc } func (n *Notifier) Reset() { @@ -100,7 +105,7 @@ func (n *Notifier) Reset() { n.waiterMap = nil } -func (n *Notifier) Release(w *Waiter) { +func (n *Notifier) release(w *Waiter) { n.Lock() defer n.Unlock() diff --git a/async/notifier_test.go b/async/notifier_test.go index 0596767..74f88ed 100644 --- a/async/notifier_test.go +++ b/async/notifier_test.go @@ -54,8 +54,8 @@ func TestNotifierWaitTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) defer cancel() - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + defer release() err := waiter.Wait(ctx) assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -69,8 +69,8 @@ func TestNotifierSimple(t *testing.T) { t.Parallel() var notifier Notifier - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + defer release() var wg sync.WaitGroup wg.Go(func() { @@ -87,8 +87,8 @@ func TestNotifierMultiNotify(t *testing.T) { t.Parallel() var notifier Notifier - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + _, release := notifier.NewWaiter("foo") + defer release() notifier.Notify("foo") // The second notification will be ignored while the first is still pending. @@ -99,8 +99,8 @@ func TestNotifierWaitClosed(t *testing.T) { t.Parallel() var notifier Notifier - waiter := notifier.NewWaiter("foo") - notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + release() assert.NoError(t, waiter.Wait(context.Background())) } @@ -109,10 +109,10 @@ func TestNotifierWaitClosedMulti(t *testing.T) { t.Parallel() var notifier Notifier - waiter1 := notifier.NewWaiter("foo") - waiter2 := notifier.NewWaiter("foo") - notifier.Release(waiter1) - notifier.Release(waiter2) + waiter1, release1 := notifier.NewWaiter("foo") + waiter2, release2 := notifier.NewWaiter("foo") + release1() + release2() assert.NoError(t, waiter1.Wait(context.Background())) assert.NoError(t, waiter2.Wait(context.Background())) @@ -122,8 +122,8 @@ func TestNotifierResetWillNotify(t *testing.T) { t.Parallel() var notifier Notifier - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + defer release() var wg sync.WaitGroup wg.Go(func() { @@ -144,8 +144,8 @@ func TestNotifierDuplicate(t *testing.T) { for range 2 { done.Go(func() { - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + defer release() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/sfu/janus/janus.go b/sfu/janus/janus.go index aacdf76..9efbd9b 100644 --- a/sfu/janus/janus.go +++ b/sfu/janus/janus.go @@ -888,15 +888,10 @@ func (m *janusSFU) notifyPublisherConnected(id api.PublicSessionId, streamType s m.publisherConnected.Notify(string(key)) } -func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, func()) { +func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, async.ReleaseFunc) { key := sfu.GetStreamId(id, streamType) - waiter := m.publisherConnected.NewWaiter(string(key)) - stopFunc := func() { - m.publisherConnected.Release(waiter) - } - - return waiter, stopFunc + return m.publisherConnected.NewWaiter(string(key)) } func (m *janusSFU) getPublisher(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janusPublisher, error) {