Support multiple waiters for the same key.

This commit is contained in:
Joachim Bauch 2021-06-07 14:36:52 +02:00
parent 04d315b0a4
commit 60a88b327b
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
2 changed files with 79 additions and 27 deletions

View File

@ -28,12 +28,14 @@ import (
type Waiter struct { type Waiter struct {
key string key string
ch chan bool
ctx context.Context
cancel context.CancelFunc
} }
func (w *Waiter) Wait(ctx context.Context) error { func (w *Waiter) Wait(ctx context.Context) error {
select { select {
case <-w.ch: case <-w.ctx.Done():
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -43,26 +45,42 @@ func (w *Waiter) Wait(ctx context.Context) error {
type Notifier struct { type Notifier struct {
sync.Mutex sync.Mutex
waiters map[string]*Waiter waiters map[string]*Waiter
waiterMap map[string]map[*Waiter]bool
} }
func (n *Notifier) NewWaiter(key string) *Waiter { func (n *Notifier) NewWaiter(key string) *Waiter {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
_, found := n.waiters[key] waiter, found := n.waiters[key]
if found { if found {
panic("already waiting") w := &Waiter{
key: key,
ctx: waiter.ctx,
cancel: waiter.cancel,
}
n.waiterMap[key][w] = true
return w
} }
waiter := &Waiter{ ctx, cancel := context.WithCancel(context.Background())
key: key, waiter = &Waiter{
ch: make(chan bool, 1), key: key,
ctx: ctx,
cancel: cancel,
} }
if n.waiters == nil { if n.waiters == nil {
n.waiters = make(map[string]*Waiter) n.waiters = make(map[string]*Waiter)
} }
if n.waiterMap == nil {
n.waiterMap = make(map[string]map[*Waiter]bool)
}
n.waiters[key] = waiter n.waiters[key] = waiter
if _, found := n.waiterMap[key]; !found {
n.waiterMap[key] = make(map[*Waiter]bool)
}
n.waiterMap[key][waiter] = true
return waiter return waiter
} }
@ -71,18 +89,24 @@ func (n *Notifier) Reset() {
defer n.Unlock() defer n.Unlock()
for _, w := range n.waiters { for _, w := range n.waiters {
close(w.ch) w.cancel()
} }
n.waiters = nil n.waiters = nil
n.waiterMap = nil
} }
func (n *Notifier) Release(w *Waiter) { func (n *Notifier) Release(w *Waiter) {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
if _, found := n.waiters[w.key]; found { if waiters, found := n.waiterMap[w.key]; found {
delete(n.waiters, w.key) if _, found := waiters[w]; found {
close(w.ch) delete(waiters, w)
if len(waiters) == 0 {
delete(n.waiters, w.key)
w.cancel()
}
}
} }
} }
@ -91,10 +115,8 @@ func (n *Notifier) Notify(key string) {
defer n.Unlock() defer n.Unlock()
if w, found := n.waiters[key]; found { if w, found := n.waiters[key]; found {
select { w.cancel()
case w.ch <- true: delete(n.waiters, w.key)
default: delete(n.waiterMap, w.key)
// Ignore, already notified
}
} }
} }

View File

@ -79,6 +79,22 @@ func TestNotifierWaitClosed(t *testing.T) {
} }
} }
func TestNotifierWaitClosedMulti(t *testing.T) {
var notifier Notifier
waiter1 := notifier.NewWaiter("foo")
waiter2 := notifier.NewWaiter("foo")
notifier.Release(waiter1)
notifier.Release(waiter2)
if err := waiter1.Wait(context.Background()); err != nil {
t.Error(err)
}
if err := waiter2.Wait(context.Background()); err != nil {
t.Error(err)
}
}
func TestNotifierResetWillNotify(t *testing.T) { func TestNotifierResetWillNotify(t *testing.T) {
var notifier Notifier var notifier Notifier
@ -103,18 +119,32 @@ func TestNotifierResetWillNotify(t *testing.T) {
func TestNotifierDuplicate(t *testing.T) { func TestNotifierDuplicate(t *testing.T) {
var notifier Notifier var notifier Notifier
var wgStart sync.WaitGroup
var wgEnd sync.WaitGroup
waiter := notifier.NewWaiter("foo") for i := 0; i < 2; i++ {
defer notifier.Release(waiter) wgStart.Add(1)
wgEnd.Add(1)
defer func() { go func() {
if e := recover(); e != nil { defer wgEnd.Done()
if e.(string) != "already waiting" { waiter := notifier.NewWaiter("foo")
t.Errorf("Expected error about already waiting, got %+v", e) defer notifier.Release(waiter)
// Goroutine has created the waiter and is ready.
wgStart.Done()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := waiter.Wait(ctx); err != nil {
t.Error(err)
} }
} }()
}() }
// Creating a waiter for an existing key will panic. wgStart.Wait()
notifier.NewWaiter("foo")
time.Sleep(100 * time.Millisecond)
notifier.Notify("foo")
wgEnd.Wait()
} }