From d7bd809c543db1cac7cda62b937a9b30802125b6 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Fri, 4 Jun 2021 14:26:03 +0200 Subject: [PATCH] Add class no notify named waiters. --- notifier.go | 100 +++++++++++++++++++++++++++++++++++++++ notifier_test.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 notifier.go create mode 100644 notifier_test.go diff --git a/notifier.go b/notifier.go new file mode 100644 index 0000000..0c46d6a --- /dev/null +++ b/notifier.go @@ -0,0 +1,100 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "sync" +) + +type Waiter struct { + key string + ch chan bool +} + +func (w *Waiter) Wait(ctx context.Context) error { + select { + case <-w.ch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +type Notifier struct { + sync.Mutex + + waiters map[string]*Waiter +} + +func (n *Notifier) NewWaiter(key string) *Waiter { + n.Lock() + defer n.Unlock() + + _, found := n.waiters[key] + if found { + panic("already waiting") + } + + waiter := &Waiter{ + key: key, + ch: make(chan bool, 1), + } + if n.waiters == nil { + n.waiters = make(map[string]*Waiter) + } + n.waiters[key] = waiter + return waiter +} + +func (n *Notifier) Reset() { + n.Lock() + defer n.Unlock() + + for _, w := range n.waiters { + close(w.ch) + } + n.waiters = nil +} + +func (n *Notifier) Release(w *Waiter) { + n.Lock() + defer n.Unlock() + + if _, found := n.waiters[w.key]; found { + delete(n.waiters, w.key) + close(w.ch) + } +} + +func (n *Notifier) Notify(key string) { + n.Lock() + defer n.Unlock() + + if w, found := n.waiters[key]; found { + select { + case w.ch <- true: + default: + // Ignore, already notified + } + } +} diff --git a/notifier_test.go b/notifier_test.go new file mode 100644 index 0000000..bade2d7 --- /dev/null +++ b/notifier_test.go @@ -0,0 +1,120 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2021 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "sync" + "testing" + "time" +) + +func TestNotifierNoWaiter(t *testing.T) { + var notifier Notifier + + // Notifications can be sent even if no waiter exists. + notifier.Notify("foo") +} + +func TestNotifierSimple(t *testing.T) { + var notifier Notifier + + var wg sync.WaitGroup + wg.Add(1) + + waiter := notifier.NewWaiter("foo") + defer notifier.Release(waiter) + + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := waiter.Wait(ctx); err != nil { + t.Error(err) + } + }() + + notifier.Notify("foo") + wg.Wait() +} + +func TestNotifierMultiNotify(t *testing.T) { + var notifier Notifier + + waiter := notifier.NewWaiter("foo") + defer notifier.Release(waiter) + + notifier.Notify("foo") + // The second notification will be ignored while the first is still pending. + notifier.Notify("foo") +} + +func TestNotifierWaitClosed(t *testing.T) { + var notifier Notifier + + waiter := notifier.NewWaiter("foo") + notifier.Release(waiter) + + if err := waiter.Wait(context.Background()); err != nil { + t.Error(err) + } +} + +func TestNotifierResetWillNotify(t *testing.T) { + var notifier Notifier + + var wg sync.WaitGroup + wg.Add(1) + + waiter := notifier.NewWaiter("foo") + defer notifier.Release(waiter) + + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := waiter.Wait(ctx); err != nil { + t.Error(err) + } + }() + + notifier.Reset() + wg.Wait() +} + +func TestNotifierDuplicate(t *testing.T) { + var notifier Notifier + + waiter := notifier.NewWaiter("foo") + defer notifier.Release(waiter) + + defer func() { + if e := recover(); e != nil { + if e.(string) != "already waiting" { + t.Errorf("Expected error about already waiting, got %+v", e) + } + } + }() + + // Creating a waiter for an existing key will panic. + notifier.NewWaiter("foo") +}