Use "sync.Cond" instead of SingleNotifier.

This commit is contained in:
Joachim Bauch 2026-03-10 08:36:50 +01:00
commit f195492f8e
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02

View file

@ -390,7 +390,8 @@ type proxyConnection struct {
// +checklocks:mu
conn *websocket.Conn
helloProcessed atomic.Bool
// +checklocks:mu
helloProcessed bool
connectedSince atomic.Int64
reconnectTimer *time.Timer
reconnectInterval atomic.Int64
@ -399,7 +400,7 @@ type proxyConnection struct {
trackClose atomic.Bool
temporary atomic.Bool
connectedNotifier async.SingleNotifier
connectedCond sync.Cond
msgId atomic.Int64
helloMsgId string
@ -444,6 +445,7 @@ func newProxyConnection(proxy *proxySFU, baseUrl string, ip net.IP, token string
publisherIds: make(map[sfu.StreamId]api.PublicSessionId),
subscribers: make(map[string]*proxySubscriber),
}
conn.connectedCond.L = &conn.mu
conn.reconnectInterval.Store(int64(initialReconnectInterval))
conn.load.Store(loadNotConnected)
conn.bandwidth.Store(nil)
@ -588,7 +590,7 @@ func (c *proxyConnection) SessionId() api.PublicSessionId {
func (c *proxyConnection) IsConnected() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.conn != nil && c.helloProcessed.Load() && c.SessionId() != ""
return c.conn != nil && c.helloProcessed && c.SessionId() != ""
}
func (c *proxyConnection) IsTemporary() bool {
@ -746,12 +748,10 @@ func (c *proxyConnection) stop(ctx context.Context) {
}
func (c *proxyConnection) close() {
c.helloProcessed.Store(false)
c.mu.Lock()
defer c.mu.Unlock()
c.connectedNotifier.Reset()
c.helloProcessed = false
if c.conn != nil {
c.conn.Close()
@ -857,10 +857,10 @@ func (c *proxyConnection) reconnect() {
c.logger.Printf("Connected to %s", c)
c.closed.Store(false)
c.helloProcessed.Store(false)
c.connectedSince.Store(time.Now().UnixMicro())
c.mu.Lock()
c.helloProcessed = false
c.conn = conn
c.mu.Unlock()
@ -887,12 +887,20 @@ func (c *proxyConnection) waitUntilConnected(ctx context.Context) error {
return nil
}
waiter := c.connectedNotifier.NewWaiter()
defer c.connectedNotifier.Release(waiter)
stop := context.AfterFunc(ctx, func() {
c.connectedCond.Broadcast()
})
defer stop()
c.mu.Unlock()
defer c.mu.Lock()
return waiter.Wait(ctx)
for !c.helloProcessed {
if err := ctx.Err(); err != nil {
return err
}
c.connectedCond.Wait()
}
return nil
}
func (c *proxyConnection) removePublisher(publisher *proxyPublisher) {
@ -1057,8 +1065,10 @@ func (c *proxyConnection) processMessage(msg *proxy.ServerMessage) {
statsConnectedProxyBackendsCurrent.WithLabelValues(string(c.Country())).Inc()
}
c.helloProcessed.Store(true)
c.connectedNotifier.Notify()
c.mu.Lock()
c.helloProcessed = true
c.connectedCond.Broadcast()
c.mu.Unlock()
default:
c.logger.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c)
c.scheduleReconnect()