mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
bridgev2/backfill: actually run backfill queue
This commit is contained in:
parent
18bca337a5
commit
edc71a5ee3
7 changed files with 28 additions and 7 deletions
|
|
@ -28,11 +28,19 @@ func (br *Bridge) WakeupBackfillQueue() {
|
|||
}
|
||||
|
||||
func (br *Bridge) RunBackfillQueue() {
|
||||
if !br.Matrix.GetCapabilities().BatchSending {
|
||||
if !br.Config.Backfill.Queue.Enabled || !br.Config.Backfill.Enabled {
|
||||
return
|
||||
}
|
||||
log := br.Log.With().Str("component", "backfill queue").Logger()
|
||||
ctx := log.WithContext(context.Background())
|
||||
if !br.Matrix.GetCapabilities().BatchSending {
|
||||
log.Warn().Msg("Backfill queue is enabled in config, but Matrix server doesn't support batch sending")
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(log.WithContext(context.Background()))
|
||||
go func() {
|
||||
<-br.stopBackfillQueue
|
||||
cancel()
|
||||
}()
|
||||
batchDelay := time.Duration(br.Config.Backfill.Queue.BatchDelay) * time.Second
|
||||
afterTimer := time.NewTimer(batchDelay)
|
||||
for {
|
||||
|
|
@ -54,6 +62,10 @@ func (br *Bridge) RunBackfillQueue() {
|
|||
afterTimer.Reset(nextDelay)
|
||||
select {
|
||||
case <-br.wakeupBackfillQueue:
|
||||
case <-br.stopBackfillQueue:
|
||||
afterTimer.Stop()
|
||||
log.Info().Msg("Stopping backfill queue")
|
||||
return
|
||||
case <-afterTimer.C:
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ type Bridge struct {
|
|||
cacheLock sync.Mutex
|
||||
|
||||
wakeupBackfillQueue chan struct{}
|
||||
stopBackfillQueue chan struct{}
|
||||
}
|
||||
|
||||
func NewBridge(
|
||||
|
|
@ -76,6 +77,7 @@ func NewBridge(
|
|||
ghostsByID: make(map[networkid.UserID]*Ghost),
|
||||
|
||||
wakeupBackfillQueue: make(chan struct{}),
|
||||
stopBackfillQueue: make(chan struct{}),
|
||||
}
|
||||
if br.Config == nil {
|
||||
br.Config = &bridgeconfig.BridgeConfig{CommandPrefix: "!bridge"}
|
||||
|
|
@ -149,6 +151,7 @@ func (br *Bridge) Start() error {
|
|||
br.Log.Info().Msg("No user logins found")
|
||||
br.SendGlobalBridgeState(status.BridgeState{StateEvent: status.StateUnconfigured})
|
||||
}
|
||||
go br.RunBackfillQueue()
|
||||
|
||||
br.Log.Info().Msg("Bridge started")
|
||||
return nil
|
||||
|
|
@ -156,6 +159,7 @@ func (br *Bridge) Start() error {
|
|||
|
||||
func (br *Bridge) Stop() {
|
||||
br.Log.Info().Msg("Shutting down bridge")
|
||||
close(br.stopBackfillQueue)
|
||||
br.Matrix.Stop()
|
||||
br.cacheLock.Lock()
|
||||
var wg sync.WaitGroup
|
||||
|
|
|
|||
|
|
@ -21,9 +21,10 @@ type BackfillThreadsConfig struct {
|
|||
}
|
||||
|
||||
type BackfillQueueConfig struct {
|
||||
BatchSize int `yaml:"batch_size"`
|
||||
BatchDelay int `yaml:"batch_delay"`
|
||||
MaxBatches int `yaml:"max_batches"`
|
||||
Enabled bool `yaml:"enabled"`
|
||||
BatchSize int `yaml:"batch_size"`
|
||||
BatchDelay int `yaml:"batch_delay"`
|
||||
MaxBatches int `yaml:"max_batches"`
|
||||
|
||||
MaxBatchesOverride map[string]int `yaml:"max_batches_override"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -96,7 +96,8 @@ func doUpgrade(helper up.Helper) {
|
|||
helper.Copy(up.Int, "backfill", "max_initial_messages")
|
||||
helper.Copy(up.Int, "backfill", "max_catchup_messages")
|
||||
helper.Copy(up.Int, "backfill", "unread_hours_threshold")
|
||||
helper.Copy(up.Int, "backfill", "threads", "max_initial_messages")
|
||||
helper.Copy(up.Bool, "backfill", "threads", "max_initial_messages")
|
||||
helper.Copy(up.Int, "backfill", "queue", "enabled")
|
||||
helper.Copy(up.Int, "backfill", "queue", "batch_size")
|
||||
helper.Copy(up.Int, "backfill", "queue", "batch_delay")
|
||||
helper.Copy(up.Int, "backfill", "queue", "max_batches")
|
||||
|
|
|
|||
|
|
@ -200,6 +200,8 @@ backfill:
|
|||
# Settings for the backwards backfill queue. This only applies when connecting to
|
||||
# Beeper as standard Matrix servers don't support inserting messages into history.
|
||||
queue:
|
||||
# Should the backfill queue be enabled?
|
||||
enabled: false
|
||||
# Number of messages to backfill in one batch.
|
||||
batch_size: 100
|
||||
# Delay between batches in seconds.
|
||||
|
|
|
|||
|
|
@ -1674,7 +1674,7 @@ func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLo
|
|||
}
|
||||
}
|
||||
backfillChecker, ok := evt.(RemoteChatResyncBackfill)
|
||||
if ok {
|
||||
if portal.Bridge.Config.Backfill.Enabled && ok {
|
||||
latestMessage, err := portal.Bridge.DB.Message.GetLastPartAtOrBeforeTime(ctx, portal.PortalKey, time.Now().Add(10*time.Second))
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get last message in portal to check if backfill is necessary")
|
||||
|
|
|
|||
|
|
@ -57,6 +57,7 @@ func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin,
|
|||
log.Debug().Msg("No messages to backfill")
|
||||
return
|
||||
}
|
||||
// TODO mark backfill queue task as done if last message is nil (-> room was empty) and HasMore is false?
|
||||
resp.Messages = cutoffMessages(&log, resp.Messages, true, lastMessage)
|
||||
if len(resp.Messages) == 0 {
|
||||
log.Warn().Msg("No messages left to backfill after cutting off old messages")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue