From 182a0b78e2f6607694d169e279488e5632046429 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 10 Mar 2026 10:04:18 +0100 Subject: [PATCH] Simplify API for waiter releasing. --- async/notifier.go | 11 ++++++++--- async/notifier_test.go | 32 ++++++++++++++++---------------- sfu/janus/janus.go | 9 ++------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/async/notifier.go b/async/notifier.go index ab9138a..747ec5e 100644 --- a/async/notifier.go +++ b/async/notifier.go @@ -58,7 +58,9 @@ type Notifier struct { waiterMap map[string]map[*Waiter]bool } -func (n *Notifier) NewWaiter(key string) *Waiter { +type ReleaseFunc func() + +func (n *Notifier) NewWaiter(key string) (*Waiter, ReleaseFunc) { n.Lock() defer n.Unlock() @@ -86,7 +88,10 @@ func (n *Notifier) NewWaiter(key string) *Waiter { ch: waiter.ch, } n.waiterMap[key][w] = true - return w + releaseFunc := func() { + n.release(w) + } + return w, releaseFunc } func (n *Notifier) Reset() { @@ -100,7 +105,7 @@ func (n *Notifier) Reset() { n.waiterMap = nil } -func (n *Notifier) Release(w *Waiter) { +func (n *Notifier) release(w *Waiter) { n.Lock() defer n.Unlock() diff --git a/async/notifier_test.go b/async/notifier_test.go index 0596767..74f88ed 100644 --- a/async/notifier_test.go +++ b/async/notifier_test.go @@ -54,8 +54,8 @@ func TestNotifierWaitTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) defer cancel() - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + defer release() err := waiter.Wait(ctx) assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -69,8 +69,8 @@ func TestNotifierSimple(t *testing.T) { t.Parallel() var notifier Notifier - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + defer release() var wg sync.WaitGroup wg.Go(func() { @@ -87,8 +87,8 @@ func TestNotifierMultiNotify(t *testing.T) { t.Parallel() var notifier Notifier - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + _, release := notifier.NewWaiter("foo") + defer release() notifier.Notify("foo") // The second notification will be ignored while the first is still pending. @@ -99,8 +99,8 @@ func TestNotifierWaitClosed(t *testing.T) { t.Parallel() var notifier Notifier - waiter := notifier.NewWaiter("foo") - notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + release() assert.NoError(t, waiter.Wait(context.Background())) } @@ -109,10 +109,10 @@ func TestNotifierWaitClosedMulti(t *testing.T) { t.Parallel() var notifier Notifier - waiter1 := notifier.NewWaiter("foo") - waiter2 := notifier.NewWaiter("foo") - notifier.Release(waiter1) - notifier.Release(waiter2) + waiter1, release1 := notifier.NewWaiter("foo") + waiter2, release2 := notifier.NewWaiter("foo") + release1() + release2() assert.NoError(t, waiter1.Wait(context.Background())) assert.NoError(t, waiter2.Wait(context.Background())) @@ -122,8 +122,8 @@ func TestNotifierResetWillNotify(t *testing.T) { t.Parallel() var notifier Notifier - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + defer release() var wg sync.WaitGroup wg.Go(func() { @@ -144,8 +144,8 @@ func TestNotifierDuplicate(t *testing.T) { for range 2 { done.Go(func() { - waiter := notifier.NewWaiter("foo") - defer notifier.Release(waiter) + waiter, release := notifier.NewWaiter("foo") + defer release() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/sfu/janus/janus.go b/sfu/janus/janus.go index aacdf76..9efbd9b 100644 --- a/sfu/janus/janus.go +++ b/sfu/janus/janus.go @@ -888,15 +888,10 @@ func (m *janusSFU) notifyPublisherConnected(id api.PublicSessionId, streamType s m.publisherConnected.Notify(string(key)) } -func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, func()) { +func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, async.ReleaseFunc) { key := sfu.GetStreamId(id, streamType) - waiter := m.publisherConnected.NewWaiter(string(key)) - stopFunc := func() { - m.publisherConnected.Release(waiter) - } - - return waiter, stopFunc + return m.publisherConnected.NewWaiter(string(key)) } func (m *janusSFU) getPublisher(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janusPublisher, error) {