bridgev2/bridgestate: send transient disconnect notices if they persist

This commit is contained in:
Tulir Asokan 2025-10-20 11:38:21 +03:00
commit 7b70ec6d52

View file

@ -15,6 +15,7 @@ import (
"time"
"github.com/rs/zerolog"
"go.mau.fi/util/exfmt"
"maunium.net/go/mautrix/bridgev2/status"
"maunium.net/go/mautrix/event"
@ -29,6 +30,9 @@ type BridgeStateQueue struct {
bridge *Bridge
login *UserLogin
firstTransientDisconnect time.Time
cancelScheduledNotice atomic.Pointer[context.CancelFunc]
stopChan chan struct{}
stopReconnect atomic.Pointer[context.CancelFunc]
}
@ -74,6 +78,9 @@ func (bsq *BridgeStateQueue) StopUnknownErrorReconnect() {
if cancelFn := bsq.stopReconnect.Swap(nil); cancelFn != nil {
(*cancelFn)()
}
if cancelFn := bsq.cancelScheduledNotice.Swap(nil); cancelFn != nil {
(*cancelFn)()
}
}
func (bsq *BridgeStateQueue) loop() {
@ -91,14 +98,41 @@ func (bsq *BridgeStateQueue) loop() {
}
}
func (bsq *BridgeStateQueue) sendNotice(ctx context.Context, state status.BridgeState) {
func (bsq *BridgeStateQueue) scheduleNotice(ctx context.Context, triggeredBy status.BridgeState) {
log := bsq.login.Log.With().Str("action", "transient disconnect notice").Logger()
ctx = log.WithContext(bsq.bridge.BackgroundCtx)
if !bsq.waitForTransientDisconnectReconnect(ctx) {
return
}
prevUnsent := bsq.GetPrevUnsent()
prev := bsq.GetPrev()
if triggeredBy.Timestamp != prev.Timestamp || len(bsq.ch) > 0 || bsq.errorSent ||
prevUnsent.StateEvent != status.StateTransientDisconnect || prev.StateEvent != status.StateTransientDisconnect {
log.Trace().Any("triggered_by", triggeredBy).Msg("Not sending delayed transient disconnect notice")
return
}
log.Debug().Any("triggered_by", triggeredBy).Msg("Sending delayed transient disconnect notice")
bsq.sendNotice(ctx, triggeredBy, true)
}
func (bsq *BridgeStateQueue) sendNotice(ctx context.Context, state status.BridgeState, isDelayed bool) {
noticeConfig := bsq.bridge.Config.BridgeStatusNotices
isError := state.StateEvent == status.StateBadCredentials ||
state.StateEvent == status.StateUnknownError ||
state.UserAction == status.UserActionOpenNative
state.UserAction == status.UserActionOpenNative ||
(isDelayed && state.StateEvent == status.StateTransientDisconnect)
sendNotice := noticeConfig == "all" || (noticeConfig == "errors" &&
(isError || (bsq.errorSent && state.StateEvent == status.StateConnected)))
if state.StateEvent != status.StateTransientDisconnect && state.StateEvent != status.StateUnknownError {
bsq.firstTransientDisconnect = time.Time{}
}
if !sendNotice {
if !isDelayed && noticeConfig == "errors" && state.StateEvent == status.StateTransientDisconnect {
if bsq.firstTransientDisconnect.IsZero() {
bsq.firstTransientDisconnect = time.Now()
}
go bsq.scheduleNotice(ctx, state)
}
return
}
managementRoom, err := bsq.login.User.GetManagementRoom(ctx)
@ -114,6 +148,9 @@ func (bsq *BridgeStateQueue) sendNotice(ctx context.Context, state status.Bridge
if state.Error != "" {
message += fmt.Sprintf(" (`%s`)", state.Error)
}
if isDelayed {
message += fmt.Sprintf(" not resolved after waiting %s", exfmt.Duration(TransientDisconnectNoticeDelay))
}
if state.Message != "" {
message += fmt.Sprintf(": %s", state.Message)
}
@ -171,14 +208,30 @@ func (bsq *BridgeStateQueue) waitForUnknownErrorReconnect(ctx context.Context) b
return false
}
reconnectIn += time.Duration(rand.Int64N(int64(float64(reconnectIn)*0.4)) - int64(float64(reconnectIn)*0.2))
return bsq.waitForReconnect(ctx, reconnectIn, &bsq.stopReconnect)
}
const TransientDisconnectNoticeDelay = 3 * time.Minute
func (bsq *BridgeStateQueue) waitForTransientDisconnectReconnect(ctx context.Context) bool {
timeUntilSchedule := time.Until(bsq.firstTransientDisconnect.Add(TransientDisconnectNoticeDelay))
zerolog.Ctx(ctx).Trace().
Stringer("duration", timeUntilSchedule).
Msg("Waiting before sending notice about transient disconnect")
return bsq.waitForReconnect(ctx, timeUntilSchedule, &bsq.cancelScheduledNotice)
}
func (bsq *BridgeStateQueue) waitForReconnect(
ctx context.Context, reconnectIn time.Duration, ptr *atomic.Pointer[context.CancelFunc],
) bool {
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
if oldCancel := bsq.stopReconnect.Swap(&cancel); oldCancel != nil {
if oldCancel := ptr.Swap(&cancel); oldCancel != nil {
(*oldCancel)()
}
select {
case <-time.After(reconnectIn):
return bsq.stopReconnect.CompareAndSwap(&cancel, nil)
return ptr.CompareAndSwap(&cancel, nil)
case <-cancelCtx.Done():
return false
case <-bsq.stopChan:
@ -198,7 +251,7 @@ func (bsq *BridgeStateQueue) immediateSendBridgeState(state status.BridgeState)
}
ctx := bsq.login.Log.WithContext(context.Background())
bsq.sendNotice(ctx, state)
bsq.sendNotice(ctx, state, false)
retryIn := 2
for {