From edc71a5ee386dbc1025b093dc9368bc7a6a5e48e Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 18 Jul 2024 16:30:17 +0300 Subject: [PATCH] bridgev2/backfill: actually run backfill queue --- bridgev2/backfillqueue.go | 16 ++++++++++++++-- bridgev2/bridge.go | 4 ++++ bridgev2/bridgeconfig/backfill.go | 7 ++++--- bridgev2/bridgeconfig/upgrade.go | 3 ++- bridgev2/matrix/mxmain/example-config.yaml | 2 ++ bridgev2/portal.go | 2 +- bridgev2/portalbackfill.go | 1 + 7 files changed, 28 insertions(+), 7 deletions(-) diff --git a/bridgev2/backfillqueue.go b/bridgev2/backfillqueue.go index d234b60d..085513f7 100644 --- a/bridgev2/backfillqueue.go +++ b/bridgev2/backfillqueue.go @@ -28,11 +28,19 @@ func (br *Bridge) WakeupBackfillQueue() { } func (br *Bridge) RunBackfillQueue() { - if !br.Matrix.GetCapabilities().BatchSending { + if !br.Config.Backfill.Queue.Enabled || !br.Config.Backfill.Enabled { return } log := br.Log.With().Str("component", "backfill queue").Logger() - ctx := log.WithContext(context.Background()) + if !br.Matrix.GetCapabilities().BatchSending { + log.Warn().Msg("Backfill queue is enabled in config, but Matrix server doesn't support batch sending") + return + } + ctx, cancel := context.WithCancel(log.WithContext(context.Background())) + go func() { + <-br.stopBackfillQueue + cancel() + }() batchDelay := time.Duration(br.Config.Backfill.Queue.BatchDelay) * time.Second afterTimer := time.NewTimer(batchDelay) for { @@ -54,6 +62,10 @@ func (br *Bridge) RunBackfillQueue() { afterTimer.Reset(nextDelay) select { case <-br.wakeupBackfillQueue: + case <-br.stopBackfillQueue: + afterTimer.Stop() + log.Info().Msg("Stopping backfill queue") + return case <-afterTimer.C: } } diff --git a/bridgev2/bridge.go b/bridgev2/bridge.go index 945a6f90..76a5d2c8 100644 --- a/bridgev2/bridge.go +++ b/bridgev2/bridge.go @@ -49,6 +49,7 @@ type Bridge struct { cacheLock sync.Mutex wakeupBackfillQueue chan struct{} + stopBackfillQueue chan struct{} } func NewBridge( @@ -76,6 +77,7 @@ func NewBridge( ghostsByID: make(map[networkid.UserID]*Ghost), wakeupBackfillQueue: make(chan struct{}), + stopBackfillQueue: make(chan struct{}), } if br.Config == nil { br.Config = &bridgeconfig.BridgeConfig{CommandPrefix: "!bridge"} @@ -149,6 +151,7 @@ func (br *Bridge) Start() error { br.Log.Info().Msg("No user logins found") br.SendGlobalBridgeState(status.BridgeState{StateEvent: status.StateUnconfigured}) } + go br.RunBackfillQueue() br.Log.Info().Msg("Bridge started") return nil @@ -156,6 +159,7 @@ func (br *Bridge) Start() error { func (br *Bridge) Stop() { br.Log.Info().Msg("Shutting down bridge") + close(br.stopBackfillQueue) br.Matrix.Stop() br.cacheLock.Lock() var wg sync.WaitGroup diff --git a/bridgev2/bridgeconfig/backfill.go b/bridgev2/bridgeconfig/backfill.go index 34218cc4..fe464569 100644 --- a/bridgev2/bridgeconfig/backfill.go +++ b/bridgev2/bridgeconfig/backfill.go @@ -21,9 +21,10 @@ type BackfillThreadsConfig struct { } type BackfillQueueConfig struct { - BatchSize int `yaml:"batch_size"` - BatchDelay int `yaml:"batch_delay"` - MaxBatches int `yaml:"max_batches"` + Enabled bool `yaml:"enabled"` + BatchSize int `yaml:"batch_size"` + BatchDelay int `yaml:"batch_delay"` + MaxBatches int `yaml:"max_batches"` MaxBatchesOverride map[string]int `yaml:"max_batches_override"` } diff --git a/bridgev2/bridgeconfig/upgrade.go b/bridgev2/bridgeconfig/upgrade.go index 1d5ee0ae..6b7493d2 100644 --- a/bridgev2/bridgeconfig/upgrade.go +++ b/bridgev2/bridgeconfig/upgrade.go @@ -96,7 +96,8 @@ func doUpgrade(helper up.Helper) { helper.Copy(up.Int, "backfill", "max_initial_messages") helper.Copy(up.Int, "backfill", "max_catchup_messages") helper.Copy(up.Int, "backfill", "unread_hours_threshold") - helper.Copy(up.Int, "backfill", "threads", "max_initial_messages") + helper.Copy(up.Bool, "backfill", "threads", "max_initial_messages") + helper.Copy(up.Int, "backfill", "queue", "enabled") helper.Copy(up.Int, "backfill", "queue", "batch_size") helper.Copy(up.Int, "backfill", "queue", "batch_delay") helper.Copy(up.Int, "backfill", "queue", "max_batches") diff --git a/bridgev2/matrix/mxmain/example-config.yaml b/bridgev2/matrix/mxmain/example-config.yaml index 3399297c..92d4647c 100644 --- a/bridgev2/matrix/mxmain/example-config.yaml +++ b/bridgev2/matrix/mxmain/example-config.yaml @@ -200,6 +200,8 @@ backfill: # Settings for the backwards backfill queue. This only applies when connecting to # Beeper as standard Matrix servers don't support inserting messages into history. queue: + # Should the backfill queue be enabled? + enabled: false # Number of messages to backfill in one batch. batch_size: 100 # Delay between batches in seconds. diff --git a/bridgev2/portal.go b/bridgev2/portal.go index 5e867665..38159e9c 100644 --- a/bridgev2/portal.go +++ b/bridgev2/portal.go @@ -1674,7 +1674,7 @@ func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLo } } backfillChecker, ok := evt.(RemoteChatResyncBackfill) - if ok { + if portal.Bridge.Config.Backfill.Enabled && ok { latestMessage, err := portal.Bridge.DB.Message.GetLastPartAtOrBeforeTime(ctx, portal.PortalKey, time.Now().Add(10*time.Second)) if err != nil { log.Err(err).Msg("Failed to get last message in portal to check if backfill is necessary") diff --git a/bridgev2/portalbackfill.go b/bridgev2/portalbackfill.go index 2c7f39be..5b0c2361 100644 --- a/bridgev2/portalbackfill.go +++ b/bridgev2/portalbackfill.go @@ -57,6 +57,7 @@ func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin, log.Debug().Msg("No messages to backfill") return } + // TODO mark backfill queue task as done if last message is nil (-> room was empty) and HasMore is false? resp.Messages = cutoffMessages(&log, resp.Messages, true, lastMessage) if len(resp.Messages) == 0 { log.Warn().Msg("No messages left to backfill after cutting off old messages")