Merge pull request #1220 from strukturag/simplify-notifier

Simplify async notifier code
This commit is contained in:
Joachim Bauch 2026-03-10 10:11:42 +01:00 committed by GitHub
commit 9d2cda499e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 227 additions and 534 deletions

View file

@ -1,64 +0,0 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2023 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package async
import (
"sync"
)
type ChannelWaiters struct {
mu sync.RWMutex
// +checklocks:mu
id uint64
// +checklocks:mu
waiters map[uint64]chan struct{}
}
func (w *ChannelWaiters) Wakeup() {
w.mu.RLock()
defer w.mu.RUnlock()
for _, ch := range w.waiters {
select {
case ch <- struct{}{}:
default:
// Receiver is still processing previous wakeup.
}
}
}
func (w *ChannelWaiters) Add(ch chan struct{}) uint64 {
w.mu.Lock()
defer w.mu.Unlock()
if w.waiters == nil {
w.waiters = make(map[uint64]chan struct{})
}
id := w.id
w.id++
w.waiters[id] = ch
return id
}
func (w *ChannelWaiters) Remove(id uint64) {
w.mu.Lock()
defer w.mu.Unlock()
delete(w.waiters, id)
}

View file

@ -1,69 +0,0 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2023 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package async
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestChannelWaiters(t *testing.T) {
t.Parallel()
var waiters ChannelWaiters
ch1 := make(chan struct{}, 1)
id1 := waiters.Add(ch1)
defer waiters.Remove(id1)
ch2 := make(chan struct{}, 1)
id2 := waiters.Add(ch2)
defer waiters.Remove(id2)
waiters.Wakeup()
<-ch1
<-ch2
select {
case <-ch1:
assert.Fail(t, "should have not received another event")
case <-ch2:
assert.Fail(t, "should have not received another event")
default:
}
ch3 := make(chan struct{}, 1)
id3 := waiters.Add(ch3)
waiters.Remove(id3)
// Multiple wakeups work even without processing.
waiters.Wakeup()
waiters.Wakeup()
waiters.Wakeup()
<-ch1
<-ch2
select {
case <-ch3:
assert.Fail(t, "should have not received another event")
default:
}
}

View file

@ -26,55 +26,72 @@ import (
"sync"
)
type rootWaiter struct {
key string
ch chan struct{}
}
func (w *rootWaiter) notify() {
close(w.ch)
}
type Waiter struct {
key string
sw *SingleWaiter
ch <-chan struct{}
}
func (w *Waiter) Wait(ctx context.Context) error {
return w.sw.Wait(ctx)
select {
case <-w.ch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
type Notifier struct {
sync.Mutex
// +checklocks:Mutex
waiters map[string]*Waiter
waiters map[string]*rootWaiter
// +checklocks:Mutex
waiterMap map[string]map[*Waiter]bool
}
func (n *Notifier) NewWaiter(key string) *Waiter {
type ReleaseFunc func()
func (n *Notifier) NewWaiter(key string) (*Waiter, ReleaseFunc) {
n.Lock()
defer n.Unlock()
waiter, found := n.waiters[key]
if found {
w := &Waiter{
if !found {
waiter = &rootWaiter{
key: key,
sw: waiter.sw,
ch: make(chan struct{}),
}
if n.waiters == nil {
n.waiters = make(map[string]*rootWaiter)
}
if n.waiterMap == nil {
n.waiterMap = make(map[string]map[*Waiter]bool)
}
n.waiters[key] = waiter
if _, found := n.waiterMap[key]; !found {
n.waiterMap[key] = make(map[*Waiter]bool)
}
n.waiterMap[key][w] = true
return w
}
waiter = &Waiter{
w := &Waiter{
key: key,
sw: newSingleWaiter(),
ch: waiter.ch,
}
if n.waiters == nil {
n.waiters = make(map[string]*Waiter)
n.waiterMap[key][w] = true
releaseFunc := func() {
n.release(w)
}
if n.waiterMap == nil {
n.waiterMap = make(map[string]map[*Waiter]bool)
}
n.waiters[key] = waiter
if _, found := n.waiterMap[key]; !found {
n.waiterMap[key] = make(map[*Waiter]bool)
}
n.waiterMap[key][waiter] = true
return waiter
return w, releaseFunc
}
func (n *Notifier) Reset() {
@ -82,13 +99,13 @@ func (n *Notifier) Reset() {
defer n.Unlock()
for _, w := range n.waiters {
w.sw.cancel()
w.notify()
}
n.waiters = nil
n.waiterMap = nil
}
func (n *Notifier) Release(w *Waiter) {
func (n *Notifier) release(w *Waiter) {
n.Lock()
defer n.Unlock()
@ -96,8 +113,10 @@ func (n *Notifier) Release(w *Waiter) {
if _, found := waiters[w]; found {
delete(waiters, w)
if len(waiters) == 0 {
delete(n.waiters, w.key)
w.sw.cancel()
if root, found := n.waiters[w.key]; found {
delete(n.waiters, w.key)
root.notify()
}
}
}
}
@ -108,7 +127,7 @@ func (n *Notifier) Notify(key string) {
defer n.Unlock()
if w, found := n.waiters[key]; found {
w.sw.cancel()
w.notify()
delete(n.waiters, w.key)
delete(n.waiterMap, w.key)
}

View file

@ -39,12 +39,38 @@ func TestNotifierNoWaiter(t *testing.T) {
notifier.Notify("foo")
}
func TestNotifierWaitTimeout(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
var notifier Notifier
notified := make(chan struct{})
go func() {
defer close(notified)
time.Sleep(time.Second)
notifier.Notify("foo")
}()
ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer cancel()
waiter, release := notifier.NewWaiter("foo")
defer release()
err := waiter.Wait(ctx)
assert.ErrorIs(t, err, context.DeadlineExceeded)
<-notified
assert.NoError(t, waiter.Wait(t.Context()))
})
}
func TestNotifierSimple(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
defer release()
var wg sync.WaitGroup
wg.Go(func() {
@ -61,8 +87,8 @@ func TestNotifierMultiNotify(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
_, release := notifier.NewWaiter("foo")
defer release()
notifier.Notify("foo")
// The second notification will be ignored while the first is still pending.
@ -73,8 +99,8 @@ func TestNotifierWaitClosed(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter := notifier.NewWaiter("foo")
notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
release()
assert.NoError(t, waiter.Wait(context.Background()))
}
@ -83,10 +109,10 @@ func TestNotifierWaitClosedMulti(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter1 := notifier.NewWaiter("foo")
waiter2 := notifier.NewWaiter("foo")
notifier.Release(waiter1)
notifier.Release(waiter2)
waiter1, release1 := notifier.NewWaiter("foo")
waiter2, release2 := notifier.NewWaiter("foo")
release1()
release2()
assert.NoError(t, waiter1.Wait(context.Background()))
assert.NoError(t, waiter2.Wait(context.Background()))
@ -96,8 +122,8 @@ func TestNotifierResetWillNotify(t *testing.T) {
t.Parallel()
var notifier Notifier
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
defer release()
var wg sync.WaitGroup
wg.Go(func() {
@ -118,8 +144,8 @@ func TestNotifierDuplicate(t *testing.T) {
for range 2 {
done.Go(func() {
waiter := notifier.NewWaiter("foo")
defer notifier.Release(waiter)
waiter, release := notifier.NewWaiter("foo")
defer release()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

View file

@ -1,128 +0,0 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package async
import (
"context"
"sync"
)
type SingleWaiter struct {
root bool
ch chan struct{}
once sync.Once
}
func newSingleWaiter() *SingleWaiter {
return &SingleWaiter{
root: true,
ch: make(chan struct{}),
}
}
func (w *SingleWaiter) subWaiter() *SingleWaiter {
return &SingleWaiter{
ch: w.ch,
}
}
func (w *SingleWaiter) Wait(ctx context.Context) error {
select {
case <-w.ch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (w *SingleWaiter) cancel() {
if !w.root {
return
}
w.once.Do(func() {
close(w.ch)
})
}
type SingleNotifier struct {
sync.Mutex
// +checklocks:Mutex
waiter *SingleWaiter
// +checklocks:Mutex
waiters map[*SingleWaiter]bool
}
func (n *SingleNotifier) NewWaiter() *SingleWaiter {
n.Lock()
defer n.Unlock()
if n.waiter == nil {
n.waiter = newSingleWaiter()
}
if n.waiters == nil {
n.waiters = make(map[*SingleWaiter]bool)
}
w := n.waiter.subWaiter()
n.waiters[w] = true
return w
}
func (n *SingleNotifier) Reset() {
n.Lock()
defer n.Unlock()
if n.waiter != nil {
n.waiter.cancel()
n.waiter = nil
}
n.waiters = nil
}
func (n *SingleNotifier) Release(w *SingleWaiter) {
n.Lock()
defer n.Unlock()
if _, found := n.waiters[w]; found {
delete(n.waiters, w)
if len(n.waiters) == 0 {
n.waiters = nil
if n.waiter != nil {
n.waiter.cancel()
n.waiter = nil
}
}
}
}
func (n *SingleNotifier) Notify() {
n.Lock()
defer n.Unlock()
if n.waiter != nil {
n.waiter.cancel()
}
n.waiters = nil
}

View file

@ -1,134 +0,0 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package async
import (
"context"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/stretchr/testify/assert"
)
func TestSingleNotifierNoWaiter(t *testing.T) {
t.Parallel()
var notifier SingleNotifier
// Notifications can be sent even if no waiter exists.
notifier.Notify()
}
func TestSingleNotifierSimple(t *testing.T) {
t.Parallel()
var notifier SingleNotifier
waiter := notifier.NewWaiter()
defer notifier.Release(waiter)
var wg sync.WaitGroup
wg.Go(func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
assert.NoError(t, waiter.Wait(ctx))
})
notifier.Notify()
wg.Wait()
}
func TestSingleNotifierMultiNotify(t *testing.T) {
t.Parallel()
var notifier SingleNotifier
waiter := notifier.NewWaiter()
defer notifier.Release(waiter)
notifier.Notify()
// The second notification will be ignored while the first is still pending.
notifier.Notify()
}
func TestSingleNotifierWaitClosed(t *testing.T) {
t.Parallel()
var notifier SingleNotifier
waiter := notifier.NewWaiter()
notifier.Release(waiter)
assert.NoError(t, waiter.Wait(context.Background()))
}
func TestSingleNotifierWaitClosedMulti(t *testing.T) {
t.Parallel()
var notifier SingleNotifier
waiter1 := notifier.NewWaiter()
waiter2 := notifier.NewWaiter()
notifier.Release(waiter1)
notifier.Release(waiter2)
assert.NoError(t, waiter1.Wait(context.Background()))
assert.NoError(t, waiter2.Wait(context.Background()))
}
func TestSingleNotifierResetWillNotify(t *testing.T) {
t.Parallel()
var notifier SingleNotifier
waiter := notifier.NewWaiter()
defer notifier.Release(waiter)
var wg sync.WaitGroup
wg.Go(func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
assert.NoError(t, waiter.Wait(ctx))
})
notifier.Reset()
wg.Wait()
}
func TestSingleNotifierDuplicate(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
var notifier SingleNotifier
var done sync.WaitGroup
for range 2 {
done.Go(func() {
waiter := notifier.NewWaiter()
defer notifier.Release(waiter)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
assert.NoError(t, waiter.Wait(ctx))
})
}
synctest.Wait()
notifier.Notify()
done.Wait()
})
}

View file

@ -36,7 +36,6 @@ import (
"github.com/pion/sdp/v3"
"github.com/strukturag/nextcloud-spreed-signaling/v2/api"
"github.com/strukturag/nextcloud-spreed-signaling/v2/async"
"github.com/strukturag/nextcloud-spreed-signaling/v2/async/events"
"github.com/strukturag/nextcloud-spreed-signaling/v2/internal"
"github.com/strukturag/nextcloud-spreed-signaling/v2/log"
@ -98,7 +97,7 @@ type ClientSession struct {
// +checklocks:roomSessionIdLock
roomSessionId api.RoomSessionId
publisherWaiters async.ChannelWaiters // +checklocksignore
publishersCond sync.Cond
// +checklocks:mu
publishers map[sfu.StreamType]sfu.Publisher
@ -148,6 +147,7 @@ func NewClientSession(hub *Hub, privateId api.PrivateSessionId, publicId api.Pub
backend: backend,
asyncCh: make(events.AsyncChannel, events.DefaultAsyncChannelSize),
}
s.publishersCond.L = &s.mu
if s.clientType == api.HelloClientTypeInternal {
s.backendUrl = hello.Auth.InternalParams.Backend
s.parsedBackendUrl = hello.Auth.InternalParams.ParsedBackend
@ -991,7 +991,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu sfu.SFU, s
s.publishers[streamType] = publisher
}
s.logger.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId())
s.publisherWaiters.Wakeup()
s.publishersCond.Broadcast()
} else {
publisher.SetMedia(mediaTypes)
}
@ -1020,24 +1020,20 @@ func (s *ClientSession) GetOrWaitForPublisher(ctx context.Context, streamType sf
return publisher
}
ch := make(chan struct{}, 1)
id := s.publisherWaiters.Add(ch)
defer s.publisherWaiters.Remove(id)
stop := context.AfterFunc(ctx, func() {
s.publishersCond.Broadcast()
})
defer stop()
for {
s.mu.Unlock()
select {
case <-ch:
s.mu.Lock()
publisher := s.getPublisherLocked(streamType)
if publisher != nil {
return publisher
}
case <-ctx.Done():
s.mu.Lock()
for publisher == nil {
if err := ctx.Err(); err != nil {
return nil
}
s.publishersCond.Wait()
publisher = s.getPublisherLocked(streamType)
}
return publisher
}
func (s *ClientSession) GetOrCreateSubscriber(ctx context.Context, mcu sfu.SFU, id api.PublicSessionId, streamType sfu.StreamType) (sfu.Subscriber, error) {

View file

@ -231,6 +231,11 @@ func (s *prometheusJanusStats) DecSubscriber(streamType sfu.StreamType) {
sfuinternal.StatsSubscribersCurrent.WithLabelValues(string(streamType)).Dec()
}
type refcountCond struct {
ref int
cond *sync.Cond
}
type janusSFU struct {
logger log.Logger
@ -257,8 +262,9 @@ type janusSFU struct {
clientId atomic.Uint64
// +checklocks:mu
publishers map[sfu.StreamId]*janusPublisher
publisherCreated async.Notifier
publishers map[sfu.StreamId]*janusPublisher
// +checklocks:mu
publisherCreated map[sfu.StreamId]*refcountCond
publisherConnected async.Notifier
// +checklocks:mu
remotePublishers map[sfu.StreamId]*janusRemotePublisher
@ -289,6 +295,7 @@ func NewJanusSFU(ctx context.Context, url string, config *goconf.ConfigFile) (sf
clients: make(map[uint64]clientInterface),
publishers: make(map[sfu.StreamId]*janusPublisher),
publisherCreated: make(map[sfu.StreamId]*refcountCond),
remotePublishers: make(map[sfu.StreamId]*janusRemotePublisher),
createJanusGateway: func(ctx context.Context, wsURL string, listener janus.GatewayListener) (janus.GatewayInterface, error) {
@ -438,7 +445,10 @@ func (m *janusSFU) doReconnect(ctx context.Context) {
m.logger.Println("Reconnection to Janus gateway successful")
m.mu.Lock()
clear(m.publishers)
m.publisherCreated.Reset()
for _, c := range m.publisherCreated {
c.cond.Broadcast()
}
clear(m.publisherCreated)
m.publisherConnected.Reset()
m.reconnectInterval = initialReconnectInterval
m.mu.Unlock()
@ -854,40 +864,75 @@ func (m *janusSFU) NewPublisher(ctx context.Context, listener sfu.Listener, id a
m.registerClient(client)
m.logger.Printf("Publisher %s is using handle %d", client.id, handle.Id)
go client.run(handle, client.closeChan)
m.mu.Lock()
m.publishers[sfu.GetStreamId(id, streamType)] = client
m.publisherCreated.Notify(string(sfu.GetStreamId(id, streamType)))
m.mu.Unlock()
m.notifyPublisherCreated(id, streamType, client)
sfuinternal.StatsPublishersCurrent.WithLabelValues(string(streamType)).Inc()
sfuinternal.StatsPublishersTotal.WithLabelValues(string(streamType)).Inc()
return client, nil
}
func (m *janusSFU) notifyPublisherCreated(id api.PublicSessionId, streamType sfu.StreamType, client *janusPublisher) {
key := sfu.GetStreamId(id, streamType)
m.mu.Lock()
defer m.mu.Unlock()
m.publishers[key] = client
if c, found := m.publisherCreated[key]; found {
c.cond.Broadcast()
delete(m.publisherCreated, key)
}
}
func (m *janusSFU) notifyPublisherConnected(id api.PublicSessionId, streamType sfu.StreamType) {
key := sfu.GetStreamId(id, streamType)
m.publisherConnected.Notify(string(key))
}
func (m *janusSFU) newPublisherConnectedWaiter(id api.PublicSessionId, streamType sfu.StreamType) (*async.Waiter, async.ReleaseFunc) {
key := sfu.GetStreamId(id, streamType)
return m.publisherConnected.NewWaiter(string(key))
}
func (m *janusSFU) getPublisher(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janusPublisher, error) {
// Do the direct check immediately as this should be the normal case.
key := sfu.GetStreamId(publisher, streamType)
m.mu.Lock()
if result, found := m.publishers[key]; found {
m.mu.Unlock()
defer m.mu.Unlock()
result, found := m.publishers[key]
if found {
return result, nil
}
waiter := m.publisherCreated.NewWaiter(string(key))
m.mu.Unlock()
defer m.publisherCreated.Release(waiter)
for {
m.mu.Lock()
result := m.publishers[key]
m.mu.Unlock()
if result != nil {
return result, nil
}
if err := waiter.Wait(ctx); err != nil {
return nil, err
c, found := m.publisherCreated[key]
if !found {
c = &refcountCond{
cond: sync.NewCond(&m.mu),
}
m.publisherCreated[key] = c
}
c.ref++
stop := context.AfterFunc(ctx, func() {
c.cond.Broadcast()
})
defer stop()
for result == nil && ctx.Err() == nil {
c.cond.Wait()
result = m.publishers[key]
}
c.ref--
if c.ref == 0 {
delete(m.publisherCreated, key)
}
if err := ctx.Err(); err != nil {
return nil, err
}
return result, nil
}
func (m *janusSFU) getOrCreateSubscriberHandle(ctx context.Context, publisher api.PublicSessionId, streamType sfu.StreamType) (*janus.Handle, *janusPublisher, error) {

View file

@ -93,7 +93,7 @@ func (p *janusPublisher) handleDetached(event *janus.DetachedMsg) {
func (p *janusPublisher) handleConnected(event *janus.WebRTCUpMsg) {
p.logger.Printf("Publisher %d received connected", p.handleId.Load())
p.mcu.publisherConnected.Notify(string(sfu.GetStreamId(p.id, p.streamType)))
p.mcu.notifyPublisherConnected(p.id, p.streamType)
}
func (p *janusPublisher) handleSlowLink(event *janus.SlowLinkMsg) {

View file

@ -86,7 +86,7 @@ func (p *janusRemotePublisher) handleDetached(event *janus.DetachedMsg) {
func (p *janusRemotePublisher) handleConnected(event *janus.WebRTCUpMsg) {
p.logger.Printf("Remote publisher %d received connected", p.handleId.Load())
p.mcu.publisherConnected.Notify(string(sfu.GetStreamId(p.id, p.streamType)))
p.mcu.notifyPublisherConnected(p.id, p.streamType)
}
func (p *janusRemotePublisher) handleSlowLink(event *janus.SlowLinkMsg) {

View file

@ -169,8 +169,8 @@ func (p *janusSubscriber) joinRoom(ctx context.Context, stream *streamSelection,
return
}
waiter := p.mcu.publisherConnected.NewWaiter(string(sfu.GetStreamId(p.publisher, p.streamType)))
defer p.mcu.publisherConnected.Release(waiter)
waiter, stop := p.mcu.newPublisherConnectedWaiter(p.publisher, p.streamType)
defer stop()
loggedNotPublishingYet := false
retry:

View file

@ -46,7 +46,6 @@ import (
"github.com/gorilla/websocket"
"github.com/strukturag/nextcloud-spreed-signaling/v2/api"
"github.com/strukturag/nextcloud-spreed-signaling/v2/async"
"github.com/strukturag/nextcloud-spreed-signaling/v2/config"
"github.com/strukturag/nextcloud-spreed-signaling/v2/dns"
"github.com/strukturag/nextcloud-spreed-signaling/v2/etcd"
@ -390,7 +389,8 @@ type proxyConnection struct {
// +checklocks:mu
conn *websocket.Conn
helloProcessed atomic.Bool
// +checklocks:mu
helloProcessed bool
connectedSince atomic.Int64
reconnectTimer *time.Timer
reconnectInterval atomic.Int64
@ -399,7 +399,7 @@ type proxyConnection struct {
trackClose atomic.Bool
temporary atomic.Bool
connectedNotifier async.SingleNotifier
connectedCond sync.Cond
msgId atomic.Int64
helloMsgId string
@ -444,6 +444,7 @@ func newProxyConnection(proxy *proxySFU, baseUrl string, ip net.IP, token string
publisherIds: make(map[sfu.StreamId]api.PublicSessionId),
subscribers: make(map[string]*proxySubscriber),
}
conn.connectedCond.L = &conn.mu
conn.reconnectInterval.Store(int64(initialReconnectInterval))
conn.load.Store(loadNotConnected)
conn.bandwidth.Store(nil)
@ -588,7 +589,7 @@ func (c *proxyConnection) SessionId() api.PublicSessionId {
func (c *proxyConnection) IsConnected() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.conn != nil && c.helloProcessed.Load() && c.SessionId() != ""
return c.conn != nil && c.helloProcessed && c.SessionId() != ""
}
func (c *proxyConnection) IsTemporary() bool {
@ -746,12 +747,10 @@ func (c *proxyConnection) stop(ctx context.Context) {
}
func (c *proxyConnection) close() {
c.helloProcessed.Store(false)
c.mu.Lock()
defer c.mu.Unlock()
c.connectedNotifier.Reset()
c.helloProcessed = false
if c.conn != nil {
c.conn.Close()
@ -857,10 +856,10 @@ func (c *proxyConnection) reconnect() {
c.logger.Printf("Connected to %s", c)
c.closed.Store(false)
c.helloProcessed.Store(false)
c.connectedSince.Store(time.Now().UnixMicro())
c.mu.Lock()
c.helloProcessed = false
c.conn = conn
c.mu.Unlock()
@ -887,12 +886,20 @@ func (c *proxyConnection) waitUntilConnected(ctx context.Context) error {
return nil
}
waiter := c.connectedNotifier.NewWaiter()
defer c.connectedNotifier.Release(waiter)
stop := context.AfterFunc(ctx, func() {
c.connectedCond.Broadcast()
})
defer stop()
c.mu.Unlock()
defer c.mu.Lock()
return waiter.Wait(ctx)
for !c.helloProcessed {
if err := ctx.Err(); err != nil {
return err
}
c.connectedCond.Wait()
}
return nil
}
func (c *proxyConnection) removePublisher(publisher *proxyPublisher) {
@ -1057,8 +1064,10 @@ func (c *proxyConnection) processMessage(msg *proxy.ServerMessage) {
statsConnectedProxyBackendsCurrent.WithLabelValues(string(c.Country())).Inc()
}
c.helloProcessed.Store(true)
c.connectedNotifier.Notify()
c.mu.Lock()
c.helloProcessed = true
c.connectedCond.Broadcast()
c.mu.Unlock()
default:
c.logger.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c)
c.scheduleReconnect()
@ -1530,9 +1539,8 @@ type proxySFU struct {
mu sync.RWMutex
// +checklocks:mu
publishers map[sfu.StreamId]*proxyConnection
publisherWaiters async.ChannelWaiters
publishers map[sfu.StreamId]*proxyConnection
publishersCond sync.Cond
continentsMap atomic.Value
@ -1585,6 +1593,7 @@ func NewProxySFU(ctx context.Context, config *goconf.ConfigFile, etcdClient etcd
rpcClients: rpcClients,
}
mcu.publishersCond.L = &mcu.mu
if err := mcu.loadContinentsMap(config); err != nil {
return nil, err
@ -2122,9 +2131,9 @@ func (m *proxySFU) createPublisher(ctx context.Context, listener sfu.Listener, i
}
m.mu.Lock()
defer m.mu.Unlock()
m.publishers[sfu.GetStreamId(id, streamType)] = conn
m.mu.Unlock()
m.publisherWaiters.Wakeup()
m.publishersCond.Broadcast()
return publisher
}
@ -2201,25 +2210,21 @@ func (m *proxySFU) waitForPublisherConnection(ctx context.Context, publisher api
return conn
}
ch := make(chan struct{}, 1)
id := m.publisherWaiters.Add(ch)
defer m.publisherWaiters.Remove(id)
stop := context.AfterFunc(ctx, func() {
m.publishersCond.Broadcast()
})
defer stop()
sfuinternal.StatsWaitingForPublisherTotal.WithLabelValues(string(streamType)).Inc()
for {
m.mu.Unlock()
select {
case <-ch:
m.mu.Lock()
conn = m.publishers[sfu.GetStreamId(publisher, streamType)]
if conn != nil {
return conn
}
case <-ctx.Done():
m.mu.Lock()
for conn == nil {
if err := ctx.Err(); err != nil {
return nil
}
m.publishersCond.Wait()
conn = m.publishers[sfu.GetStreamId(publisher, streamType)]
}
return conn
}
type proxyPublisherInfo struct {

View file

@ -42,7 +42,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/v2/api"
"github.com/strukturag/nextcloud-spreed-signaling/v2/async"
dnstest "github.com/strukturag/nextcloud-spreed-signaling/v2/dns/test"
"github.com/strukturag/nextcloud-spreed-signaling/v2/etcd"
etcdtest "github.com/strukturag/nextcloud-spreed-signaling/v2/etcd/test"
@ -1248,14 +1247,16 @@ type publisherHub struct {
mu sync.Mutex
// +checklocks:mu
publishers map[api.PublicSessionId]*proxyPublisher
waiter async.ChannelWaiters // +checklocksignore: Has its own locking.
publishers map[api.PublicSessionId]*proxyPublisher
publishersCond sync.Cond
}
func newPublisherHub() *publisherHub {
return &publisherHub{
hub := &publisherHub{
publishers: make(map[api.PublicSessionId]*proxyPublisher),
}
hub.publishersCond.L = &hub.mu
return hub
}
func (h *publisherHub) addPublisher(publisher *proxyPublisher) {
@ -1263,30 +1264,26 @@ func (h *publisherHub) addPublisher(publisher *proxyPublisher) {
defer h.mu.Unlock()
h.publishers[publisher.PublisherId()] = publisher
h.waiter.Wakeup()
h.publishersCond.Broadcast()
}
func (h *publisherHub) GetPublisherIdForSessionId(ctx context.Context, sessionId api.PublicSessionId, streamType sfu.StreamType) (*grpc.GetPublisherIdReply, error) {
h.mu.Lock()
defer h.mu.Unlock()
pub, found := h.publishers[sessionId]
if !found {
ch := make(chan struct{}, 1)
id := h.waiter.Add(ch)
defer h.waiter.Remove(id)
stop := context.AfterFunc(ctx, func() {
h.publishersCond.Broadcast()
})
defer stop()
for !found {
h.mu.Unlock()
select {
case <-ch:
h.mu.Lock()
pub, found = h.publishers[sessionId]
case <-ctx.Done():
h.mu.Lock()
return nil, ctx.Err()
}
pub, found := h.publishers[sessionId]
for !found {
if err := ctx.Err(); err != nil {
return nil, err
}
h.publishersCond.Wait()
pub, found = h.publishers[sessionId]
}
connToken, err := pub.conn.proxy.CreateToken("")