mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
bridgev2/backfill: allow resync events to have bundled backfill data as an optimization
The network connector can provide arbitrary data in RemoteChatResync events, which is passed to FetchMessages if the event triggers a backfill. The network connector can then read the data and avoid refetching those bundled messages.
This commit is contained in:
parent
41f0abd38a
commit
091a18d448
5 changed files with 56 additions and 18 deletions
|
|
@ -372,6 +372,12 @@ type FetchMessagesParams struct {
|
|||
// without any side effects, but the network connector should aim for this number.
|
||||
Count int
|
||||
|
||||
// When a forward backfill is triggered by a [RemoteChatResyncBackfillBundle], this will contain
|
||||
// the bundled data returned by the event. It can be used as an optimization to avoid fetching
|
||||
// messages that were already provided by the remote network, while still supporting fetching
|
||||
// more messages if the limit is higher.
|
||||
BundledData any
|
||||
|
||||
// When the messages are being fetched for a queued backfill, this is the task object.
|
||||
Task *database.BackfillTask
|
||||
}
|
||||
|
|
@ -797,6 +803,16 @@ type RemoteChatResyncBackfill interface {
|
|||
CheckNeedsBackfill(ctx context.Context, latestMessage *database.Message) (bool, error)
|
||||
}
|
||||
|
||||
type RemoteChatResyncBackfillBundle interface {
|
||||
RemoteChatResyncBackfill
|
||||
GetBundledBackfillData() any
|
||||
}
|
||||
|
||||
type RemoteBackfill interface {
|
||||
RemoteEvent
|
||||
GetBackfillData(ctx context.Context, portal *Portal) (*FetchMessagesResponse, error)
|
||||
}
|
||||
|
||||
type RemoteChatDelete interface {
|
||||
RemoteEvent
|
||||
DeleteOnlyForMe() bool
|
||||
|
|
@ -931,11 +947,6 @@ type RemoteTyping interface {
|
|||
GetTimeout() time.Duration
|
||||
}
|
||||
|
||||
type RemoteBackfill interface {
|
||||
RemoteEvent
|
||||
GetBackfillData(ctx context.Context, portal *Portal) (*FetchMessagesResponse, error)
|
||||
}
|
||||
|
||||
type TypingType int
|
||||
|
||||
const (
|
||||
|
|
|
|||
|
|
@ -302,7 +302,7 @@ func (portal *Portal) handleCreateEvent(evt *portalCreateEvent) {
|
|||
evt.cb(fmt.Errorf("portal creation panicked"))
|
||||
}
|
||||
}()
|
||||
evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info))
|
||||
evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil))
|
||||
}
|
||||
|
||||
func (portal *Portal) FindPreferredLogin(ctx context.Context, user *User, allowRelay bool) (*UserLogin, *database.UserPortal, error) {
|
||||
|
|
@ -1329,7 +1329,12 @@ func (portal *Portal) handleRemoteEvent(source *UserLogin, evt RemoteEvent) {
|
|||
log.Err(err).Msg("Failed to get chat info for portal creation from chat resync event")
|
||||
}
|
||||
}
|
||||
err = portal.createMatrixRoomInLoop(ctx, source, info)
|
||||
bundleProvider, ok := evt.(RemoteChatResyncBackfillBundle)
|
||||
var bundle any
|
||||
if ok {
|
||||
bundle = bundleProvider.GetBundledBackfillData()
|
||||
}
|
||||
err = portal.createMatrixRoomInLoop(ctx, source, info, bundle)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to create portal to handle event")
|
||||
// TODO error
|
||||
|
|
@ -2279,7 +2284,12 @@ func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLo
|
|||
} else if needsBackfill, err := backfillChecker.CheckNeedsBackfill(ctx, latestMessage); err != nil {
|
||||
log.Err(err).Msg("Failed to check if backfill is needed")
|
||||
} else if needsBackfill {
|
||||
portal.doForwardBackfill(ctx, source, latestMessage)
|
||||
bundleProvider, ok := evt.(RemoteChatResyncBackfillBundle)
|
||||
var bundle any
|
||||
if ok {
|
||||
bundle = bundleProvider.GetBundledBackfillData()
|
||||
}
|
||||
portal.doForwardBackfill(ctx, source, latestMessage, bundle)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3077,7 +3087,7 @@ func (portal *Portal) CreateMatrixRoom(ctx context.Context, source *UserLogin, i
|
|||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) createMatrixRoomInLoop(ctx context.Context, source *UserLogin, info *ChatInfo) error {
|
||||
func (portal *Portal) createMatrixRoomInLoop(ctx context.Context, source *UserLogin, info *ChatInfo, backfillBundle any) error {
|
||||
portal.roomCreateLock.Lock()
|
||||
defer portal.roomCreateLock.Unlock()
|
||||
if portal.MXID != "" {
|
||||
|
|
@ -3259,7 +3269,7 @@ func (portal *Portal) createMatrixRoomInLoop(ctx context.Context, source *UserLo
|
|||
}
|
||||
}
|
||||
}
|
||||
portal.doForwardBackfill(ctx, source, nil)
|
||||
portal.doForwardBackfill(ctx, source, nil, backfillBundle)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import (
|
|||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
||||
func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin, lastMessage *database.Message) {
|
||||
func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin, lastMessage *database.Message, bundledData any) {
|
||||
log := zerolog.Ctx(ctx).With().Str("action", "forward backfill").Logger()
|
||||
ctx = log.WithContext(ctx)
|
||||
api, ok := source.Client.(BackfillingNetworkAPI)
|
||||
|
|
@ -51,10 +51,14 @@ func (portal *Portal) doForwardBackfill(ctx context.Context, source *UserLogin,
|
|||
Forward: true,
|
||||
AnchorMessage: lastMessage,
|
||||
Count: limit,
|
||||
BundledData: bundledData,
|
||||
})
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to fetch messages for forward backfill")
|
||||
return
|
||||
} else if resp == nil {
|
||||
log.Debug().Msg("Didn't get backfill response")
|
||||
return
|
||||
} else if len(resp.Messages) == 0 {
|
||||
log.Debug().Msg("No messages to backfill")
|
||||
return
|
||||
|
|
@ -100,6 +104,10 @@ func (portal *Portal) DoBackwardsBackfill(ctx context.Context, source *UserLogin
|
|||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch messages for backward backfill: %w", err)
|
||||
} else if resp == nil {
|
||||
log.Debug().Msg("Didn't get backfill response, marking task as done")
|
||||
task.IsDone = true
|
||||
return nil
|
||||
}
|
||||
log.Debug().
|
||||
Str("new_cursor", string(resp.Cursor)).
|
||||
|
|
@ -150,6 +158,9 @@ func (portal *Portal) doThreadBackfill(ctx context.Context, source *UserLogin, t
|
|||
if err != nil {
|
||||
log.Err(err).Msg("Failed to fetch messages for thread backfill")
|
||||
return
|
||||
} else if resp == nil {
|
||||
log.Debug().Msg("Didn't get backfill response")
|
||||
return
|
||||
} else if len(resp.Messages) == 0 {
|
||||
log.Debug().Msg("No messages to backfill")
|
||||
return
|
||||
|
|
|
|||
|
|
@ -261,8 +261,8 @@ func (portal *PortalInternals) LockedUpdateInfoFromGhost(ctx context.Context, gh
|
|||
(*Portal)(portal).lockedUpdateInfoFromGhost(ctx, ghost)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) CreateMatrixRoomInLoop(ctx context.Context, source *UserLogin, info *ChatInfo) error {
|
||||
return (*Portal)(portal).createMatrixRoomInLoop(ctx, source, info)
|
||||
func (portal *PortalInternals) CreateMatrixRoomInLoop(ctx context.Context, source *UserLogin, info *ChatInfo, backfillBundle any) error {
|
||||
return (*Portal)(portal).createMatrixRoomInLoop(ctx, source, info, backfillBundle)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) UnlockedDelete(ctx context.Context) error {
|
||||
|
|
@ -273,8 +273,8 @@ func (portal *PortalInternals) UnlockedDeleteCache() {
|
|||
(*Portal)(portal).unlockedDeleteCache()
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) DoForwardBackfill(ctx context.Context, source *UserLogin, lastMessage *database.Message) {
|
||||
(*Portal)(portal).doForwardBackfill(ctx, source, lastMessage)
|
||||
func (portal *PortalInternals) DoForwardBackfill(ctx context.Context, source *UserLogin, lastMessage *database.Message, bundledData any) {
|
||||
(*Portal)(portal).doForwardBackfill(ctx, source, lastMessage, bundledData)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) DoThreadBackfill(ctx context.Context, source *UserLogin, threadID networkid.MessageID) {
|
||||
|
|
|
|||
|
|
@ -30,12 +30,14 @@ type ChatResync struct {
|
|||
|
||||
LatestMessageTS time.Time
|
||||
CheckNeedsBackfillFunc func(ctx context.Context, latestMessage *database.Message) (bool, error)
|
||||
BundledBackfillData any
|
||||
}
|
||||
|
||||
var (
|
||||
_ bridgev2.RemoteChatResync = (*ChatResync)(nil)
|
||||
_ bridgev2.RemoteChatResyncWithInfo = (*ChatResync)(nil)
|
||||
_ bridgev2.RemoteChatResyncBackfill = (*ChatResync)(nil)
|
||||
_ bridgev2.RemoteChatResync = (*ChatResync)(nil)
|
||||
_ bridgev2.RemoteChatResyncWithInfo = (*ChatResync)(nil)
|
||||
_ bridgev2.RemoteChatResyncBackfill = (*ChatResync)(nil)
|
||||
_ bridgev2.RemoteChatResyncBackfillBundle = (*ChatResync)(nil)
|
||||
)
|
||||
|
||||
func (evt *ChatResync) CheckNeedsBackfill(ctx context.Context, latestMessage *database.Message) (bool, error) {
|
||||
|
|
@ -48,6 +50,10 @@ func (evt *ChatResync) CheckNeedsBackfill(ctx context.Context, latestMessage *da
|
|||
}
|
||||
}
|
||||
|
||||
func (evt *ChatResync) GetBundledBackfillData() any {
|
||||
return evt.BundledBackfillData
|
||||
}
|
||||
|
||||
func (evt *ChatResync) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) {
|
||||
if evt.GetChatInfoFunc != nil {
|
||||
return evt.GetChatInfoFunc(ctx, portal)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue