From 091a18d448de14ce78abb08f40da15a2cea303d7 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 12 Aug 2024 19:08:58 +0300 Subject: [PATCH] bridgev2/backfill: allow resync events to have bundled backfill data as an optimization The network connector can provide arbitrary data in RemoteChatResync events, which is passed to FetchMessages if the event triggers a backfill. The network connector can then read the data and avoid refetching those bundled messages. --- bridgev2/networkinterface.go | 21 ++++++++++++++++----- bridgev2/portal.go | 20 +++++++++++++++----- bridgev2/portalbackfill.go | 13 ++++++++++++- bridgev2/portalinternal.go | 8 ++++---- bridgev2/simplevent/chat.go | 12 +++++++++--- 5 files changed, 56 insertions(+), 18 deletions(-) diff --git a/bridgev2/networkinterface.go b/bridgev2/networkinterface.go index 4b6304f5..70854158 100644 --- a/bridgev2/networkinterface.go +++ b/bridgev2/networkinterface.go @@ -372,6 +372,12 @@ type FetchMessagesParams struct { // without any side effects, but the network connector should aim for this number. Count int + // When a forward backfill is triggered by a [RemoteChatResyncBackfillBundle], this will contain + // the bundled data returned by the event. It can be used as an optimization to avoid fetching + // messages that were already provided by the remote network, while still supporting fetching + // more messages if the limit is higher. + BundledData any + // When the messages are being fetched for a queued backfill, this is the task object. Task *database.BackfillTask } @@ -797,6 +803,16 @@ type RemoteChatResyncBackfill interface { CheckNeedsBackfill(ctx context.Context, latestMessage *database.Message) (bool, error) } +type RemoteChatResyncBackfillBundle interface { + RemoteChatResyncBackfill + GetBundledBackfillData() any +} + +type RemoteBackfill interface { + RemoteEvent + GetBackfillData(ctx context.Context, portal *Portal) (*FetchMessagesResponse, error) +} + type RemoteChatDelete interface { RemoteEvent DeleteOnlyForMe() bool @@ -931,11 +947,6 @@ type RemoteTyping interface { GetTimeout() time.Duration } -type RemoteBackfill interface { - RemoteEvent - GetBackfillData(ctx context.Context, portal *Portal) (*FetchMessagesResponse, error) -} - type TypingType int const ( diff --git a/bridgev2/portal.go b/bridgev2/portal.go index 3c2db930..c8e4d3c9 100644 --- a/bridgev2/portal.go +++ b/bridgev2/portal.go @@ -302,7 +302,7 @@ func (portal *Portal) handleCreateEvent(evt *portalCreateEvent) { evt.cb(fmt.Errorf("portal creation panicked")) } }() - evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info)) + evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil)) } func (portal *Portal) FindPreferredLogin(ctx context.Context, user *User, allowRelay bool) (*UserLogin, *database.UserPortal, error) { @@ -1329,7 +1329,12 @@ func (portal *Portal) handleRemoteEvent(source *UserLogin, evt RemoteEvent) { log.Err(err).Msg("Failed to get chat info for portal creation from chat resync event") } } - err = portal.createMatrixRoomInLoop(ctx, source, info) + bundleProvider, ok := evt.(RemoteChatResyncBackfillBundle) + var bundle any + if ok { + bundle = bundleProvider.GetBundledBackfillData() + } + err = portal.createMatrixRoomInLoop(ctx, source, info, bundle) if err != nil { log.Err(err).Msg("Failed to create portal to handle event") // TODO error @@ -2279,7 +2284,12 @@ func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLo } else if needsBackfill, err := backfillChecker.CheckNeedsBackfill(ctx, latestMessage); err != nil { log.Err(err).Msg("Failed to check if backfill is needed") } else if needsBackfill { - portal.doForwardBackfill(ctx, source, latestMessage) + bundleProvider, ok := evt.(RemoteChatResyncBackfillBundle) + var bundle any + if ok { + bundle = bundleProvider.GetBundledBackfillData() + } + portal.doForwardBackfill(ctx, source, latestMessage, bundle) } } } @@ -3077,7 +3087,7 @@ func (portal *Portal) CreateMatrixRoom(ctx context.Context, source *UserLogin, i } } -func (portal *Portal) createMatrixRoomInLoop(ctx context.Context, source *UserLogin, info *ChatInfo) error { +func (portal *Portal) createMatrixRoomInLoop(ctx context.Context, source *UserLogin, info *ChatInfo, backfillBundle any) error { portal.roomCreateLock.Lock() defer portal.roomCreateLock.Unlock() if portal.MXID != "" { @@ -3259,7 +3269,7 @@ func (portal *Portal) createMatrixRoomInLoop(ctx context.Context, source *UserLo } } } - portal.doForwardBackfill(ctx, source, nil) + portal.doForwardBackfill(ctx, source, nil, backfillBundle) return nil } diff --git a/bridgev2/portalbackfill.go b/bridgev2/portalbackfill.go index b5ac08bb..f8f4bdb2 100644 --- a/bridgev2/portalbackfill.go +++ b/bridgev2/portalbackfill.go @@ -23,7 +23,7 @@ import ( "maunium.net/go/mautrix/id" ) -func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin, lastMessage *database.Message) { +func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin, lastMessage *database.Message, bundledData any) { log := zerolog.Ctx(ctx).With().Str("action", "forward backfill").Logger() ctx = log.WithContext(ctx) api, ok := source.Client.(BackfillingNetworkAPI) @@ -51,10 +51,14 @@ func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin, Forward: true, AnchorMessage: lastMessage, Count: limit, + BundledData: bundledData, }) if err != nil { log.Err(err).Msg("Failed to fetch messages for forward backfill") return + } else if resp == nil { + log.Debug().Msg("Didn't get backfill response") + return } else if len(resp.Messages) == 0 { log.Debug().Msg("No messages to backfill") return @@ -100,6 +104,10 @@ func (portal *Portal) DoBackwardsBackfill(ctx context.Context, source *UserLogin }) if err != nil { return fmt.Errorf("failed to fetch messages for backward backfill: %w", err) + } else if resp == nil { + log.Debug().Msg("Didn't get backfill response, marking task as done") + task.IsDone = true + return nil } log.Debug(). Str("new_cursor", string(resp.Cursor)). @@ -150,6 +158,9 @@ func (portal *Portal) doThreadBackfill(ctx context.Context, source *UserLogin, t if err != nil { log.Err(err).Msg("Failed to fetch messages for thread backfill") return + } else if resp == nil { + log.Debug().Msg("Didn't get backfill response") + return } else if len(resp.Messages) == 0 { log.Debug().Msg("No messages to backfill") return diff --git a/bridgev2/portalinternal.go b/bridgev2/portalinternal.go index f7fe658a..dcd9174b 100644 --- a/bridgev2/portalinternal.go +++ b/bridgev2/portalinternal.go @@ -261,8 +261,8 @@ func (portal *PortalInternals) LockedUpdateInfoFromGhost(ctx context.Context, gh (*Portal)(portal).lockedUpdateInfoFromGhost(ctx, ghost) } -func (portal *PortalInternals) CreateMatrixRoomInLoop(ctx context.Context, source *UserLogin, info *ChatInfo) error { - return (*Portal)(portal).createMatrixRoomInLoop(ctx, source, info) +func (portal *PortalInternals) CreateMatrixRoomInLoop(ctx context.Context, source *UserLogin, info *ChatInfo, backfillBundle any) error { + return (*Portal)(portal).createMatrixRoomInLoop(ctx, source, info, backfillBundle) } func (portal *PortalInternals) UnlockedDelete(ctx context.Context) error { @@ -273,8 +273,8 @@ func (portal *PortalInternals) UnlockedDeleteCache() { (*Portal)(portal).unlockedDeleteCache() } -func (portal *PortalInternals) DoForwardBackfill(ctx context.Context, source *UserLogin, lastMessage *database.Message) { - (*Portal)(portal).doForwardBackfill(ctx, source, lastMessage) +func (portal *PortalInternals) DoForwardBackfill(ctx context.Context, source *UserLogin, lastMessage *database.Message, bundledData any) { + (*Portal)(portal).doForwardBackfill(ctx, source, lastMessage, bundledData) } func (portal *PortalInternals) DoThreadBackfill(ctx context.Context, source *UserLogin, threadID networkid.MessageID) { diff --git a/bridgev2/simplevent/chat.go b/bridgev2/simplevent/chat.go index e7b13fef..c725141b 100644 --- a/bridgev2/simplevent/chat.go +++ b/bridgev2/simplevent/chat.go @@ -30,12 +30,14 @@ type ChatResync struct { LatestMessageTS time.Time CheckNeedsBackfillFunc func(ctx context.Context, latestMessage *database.Message) (bool, error) + BundledBackfillData any } var ( - _ bridgev2.RemoteChatResync = (*ChatResync)(nil) - _ bridgev2.RemoteChatResyncWithInfo = (*ChatResync)(nil) - _ bridgev2.RemoteChatResyncBackfill = (*ChatResync)(nil) + _ bridgev2.RemoteChatResync = (*ChatResync)(nil) + _ bridgev2.RemoteChatResyncWithInfo = (*ChatResync)(nil) + _ bridgev2.RemoteChatResyncBackfill = (*ChatResync)(nil) + _ bridgev2.RemoteChatResyncBackfillBundle = (*ChatResync)(nil) ) func (evt *ChatResync) CheckNeedsBackfill(ctx context.Context, latestMessage *database.Message) (bool, error) { @@ -48,6 +50,10 @@ func (evt *ChatResync) CheckNeedsBackfill(ctx context.Context, latestMessage *da } } +func (evt *ChatResync) GetBundledBackfillData() any { + return evt.BundledBackfillData +} + func (evt *ChatResync) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) { if evt.GetChatInfoFunc != nil { return evt.GetChatInfoFunc(ctx, portal)