Simplify API for waiter releasing.

This commit is contained in:
Joachim Bauch 2026-03-10 10:04:18 +01:00
commit 182a0b78e2
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
3 changed files with 26 additions and 26 deletions

View file

@ -58,7 +58,9 @@ type Notifier struct {
waiterMap map[string]map[*Waiter]bool
}
func (n *Notifier) NewWaiter(key string) *Waiter {
type ReleaseFunc func()
func (n *Notifier) NewWaiter(key string) (*Waiter, ReleaseFunc) {
n.Lock()
defer n.Unlock()
@ -86,7 +88,10 @@ func (n *Notifier) NewWaiter(key string) *Waiter {
ch: waiter.ch,
}
n.waiterMap[key][w] = true
return w
releaseFunc := func() {
n.release(w)
}
return w, releaseFunc
}
func (n *Notifier) Reset() {
@ -100,7 +105,7 @@ func (n *Notifier) Reset() {
n.waiterMap = nil
}
func (n *Notifier) Release(w *Waiter) {
func (n *Notifier) release(w *Waiter) {
n.Lock()
defer n.Unlock()

View file

@ -54,8 +54,8 @@ func TestNotifierWaitTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer cancel()
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
defer release()
err := waiter.Wait(ctx)
assert.ErrorIs(t, err, context.DeadlineExceeded)
@ -69,8 +69,8 @@ func TestNotifierSimple(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
defer release()
var wg sync.WaitGroup
wg.Go(func() {
@ -87,8 +87,8 @@ func TestNotifierMultiNotify(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
_, release := notifier.NewWaiter("foo")
defer release()
notifier.Notify("foo")
// The second notification will be ignored while the first is still pending.
@ -99,8 +99,8 @@ func TestNotifierWaitClosed(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter := notifier.NewWaiter("foo")
notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
release()
assert.NoError(t, waiter.Wait(context.Background()))
}
@ -109,10 +109,10 @@ func TestNotifierWaitClosedMulti(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter1 := notifier.NewWaiter("foo")
waiter2 := notifier.NewWaiter("foo")
notifier.Release(waiter1)
notifier.Release(waiter2)
waiter1, release1 := notifier.NewWaiter("foo")
waiter2, release2 := notifier.NewWaiter("foo")
release1()
release2()
assert.NoError(t, waiter1.Wait(context.Background()))
assert.NoError(t, waiter2.Wait(context.Background()))
@ -122,8 +122,8 @@ func TestNotifierResetWillNotify(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
defer release()
var wg sync.WaitGroup
wg.Go(func() {
@ -144,8 +144,8 @@ func TestNotifierDuplicate(t *testing.T) {
for range 2 {
done.Go(func() {
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
defer release()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

View file

@ -888,15 +888,10 @@ func (m *janusSFU) notifyPublisherConnected(id api.PublicSessionId, streamType s
m.publisherConnected.Notify(string(key))
}
func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, func()) {
func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, async.ReleaseFunc) {
key := sfu.GetStreamId(id, streamType)
waiter := m.publisherConnected.NewWaiter(string(key))
stopFunc := func() {
m.publisherConnected.Release(waiter)
}
return waiter, stopFunc
return m.publisherConnected.NewWaiter(string(key))
}
func (m *janusSFU) getPublisher(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janusPublisher, error) {