From 2ca9fb21c4000a081dcfd525fe2f9379b054e428 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 22 Jun 2022 16:12:22 +0200 Subject: [PATCH] Add SingleNotifier class. --- notifier.go | 28 +++----- single_notifier.go | 109 +++++++++++++++++++++++++++++ single_notifier_test.go | 150 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 270 insertions(+), 17 deletions(-) create mode 100644 single_notifier.go create mode 100644 single_notifier_test.go diff --git a/notifier.go b/notifier.go index ffe7da3..94af5bd 100644 --- a/notifier.go +++ b/notifier.go @@ -29,17 +29,7 @@ import ( type Waiter struct { key string - ctx context.Context - cancel context.CancelFunc -} - -func (w *Waiter) Wait(ctx context.Context) error { - select { - case <-w.ctx.Done(): - return nil - case <-ctx.Done(): - return ctx.Err() - } + SingleWaiter } type Notifier struct { @@ -56,9 +46,11 @@ func (n *Notifier) NewWaiter(key string) *Waiter { waiter, found := n.waiters[key] if found { w := &Waiter{ - key: key, - ctx: waiter.ctx, - cancel: waiter.cancel, + key: key, + SingleWaiter: SingleWaiter{ + ctx: waiter.ctx, + cancel: waiter.cancel, + }, } n.waiterMap[key][w] = true return w @@ -66,9 +58,11 @@ func (n *Notifier) NewWaiter(key string) *Waiter { ctx, cancel := context.WithCancel(context.Background()) waiter = &Waiter{ - key: key, - ctx: ctx, - cancel: cancel, + key: key, + SingleWaiter: SingleWaiter{ + ctx: ctx, + cancel: cancel, + }, } if n.waiters == nil { n.waiters = make(map[string]*Waiter) diff --git a/single_notifier.go b/single_notifier.go new file mode 100644 index 0000000..91c4b6f --- /dev/null +++ b/single_notifier.go @@ -0,0 +1,109 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "sync" +) + +type SingleWaiter struct { + ctx context.Context + cancel context.CancelFunc +} + +func (w *SingleWaiter) Wait(ctx context.Context) error { + select { + case <-w.ctx.Done(): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +type SingleNotifier struct { + sync.Mutex + + waiter *SingleWaiter + waiters map[*SingleWaiter]bool +} + +func (n *SingleNotifier) NewWaiter() *SingleWaiter { + n.Lock() + defer n.Unlock() + + if n.waiter == nil { + ctx, cancel := context.WithCancel(context.Background()) + n.waiter = &SingleWaiter{ + ctx: ctx, + cancel: cancel, + } + } + + if n.waiters == nil { + n.waiters = make(map[*SingleWaiter]bool) + } + + w := &SingleWaiter{ + ctx: n.waiter.ctx, + cancel: n.waiter.cancel, + } + n.waiters[w] = true + return w +} + +func (n *SingleNotifier) Reset() { + n.Lock() + defer n.Unlock() + + if n.waiter != nil { + n.waiter.cancel() + n.waiter = nil + } + n.waiters = nil +} + +func (n *SingleNotifier) Release(w *SingleWaiter) { + n.Lock() + defer n.Unlock() + + if _, found := n.waiters[w]; found { + delete(n.waiters, w) + if len(n.waiters) == 0 { + n.waiters = nil + if n.waiter != nil { + n.waiter.cancel() + n.waiter = nil + } + } + } +} + +func (n *SingleNotifier) Notify() { + n.Lock() + defer n.Unlock() + + if n.waiter != nil { + n.waiter.cancel() + } + n.waiters = nil +} diff --git a/single_notifier_test.go b/single_notifier_test.go new file mode 100644 index 0000000..7d593d8 --- /dev/null +++ b/single_notifier_test.go @@ -0,0 +1,150 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "sync" + "testing" + "time" +) + +func TestSingleNotifierNoWaiter(t *testing.T) { + var notifier SingleNotifier + + // Notifications can be sent even if no waiter exists. + notifier.Notify() +} + +func TestSingleNotifierSimple(t *testing.T) { + var notifier SingleNotifier + + var wg sync.WaitGroup + wg.Add(1) + + waiter := notifier.NewWaiter() + defer notifier.Release(waiter) + + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := waiter.Wait(ctx); err != nil { + t.Error(err) + } + }() + + notifier.Notify() + wg.Wait() +} + +func TestSingleNotifierMultiNotify(t *testing.T) { + var notifier SingleNotifier + + waiter := notifier.NewWaiter() + defer notifier.Release(waiter) + + notifier.Notify() + // The second notification will be ignored while the first is still pending. + notifier.Notify() +} + +func TestSingleNotifierWaitClosed(t *testing.T) { + var notifier SingleNotifier + + waiter := notifier.NewWaiter() + notifier.Release(waiter) + + if err := waiter.Wait(context.Background()); err != nil { + t.Error(err) + } +} + +func TestSingleNotifierWaitClosedMulti(t *testing.T) { + var notifier SingleNotifier + + waiter1 := notifier.NewWaiter() + waiter2 := notifier.NewWaiter() + notifier.Release(waiter1) + notifier.Release(waiter2) + + if err := waiter1.Wait(context.Background()); err != nil { + t.Error(err) + } + if err := waiter2.Wait(context.Background()); err != nil { + t.Error(err) + } +} + +func TestSingleNotifierResetWillNotify(t *testing.T) { + var notifier SingleNotifier + + var wg sync.WaitGroup + wg.Add(1) + + waiter := notifier.NewWaiter() + defer notifier.Release(waiter) + + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := waiter.Wait(ctx); err != nil { + t.Error(err) + } + }() + + notifier.Reset() + wg.Wait() +} + +func TestSingleNotifierDuplicate(t *testing.T) { + var notifier SingleNotifier + var wgStart sync.WaitGroup + var wgEnd sync.WaitGroup + + for i := 0; i < 2; i++ { + wgStart.Add(1) + wgEnd.Add(1) + + go func() { + defer wgEnd.Done() + waiter := notifier.NewWaiter() + defer notifier.Release(waiter) + + // Goroutine has created the waiter and is ready. + wgStart.Done() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := waiter.Wait(ctx); err != nil { + t.Error(err) + } + }() + } + + wgStart.Wait() + + time.Sleep(100 * time.Millisecond) + notifier.Notify() + wgEnd.Wait() +}