From 8115db9af61c674606607f38edad0d986f73e62d Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 17 Jun 2025 12:45:58 +0300 Subject: [PATCH] bridgev2/portal: return result of handling remote events --- bridgev2/portal.go | 360 ++++++++++++++++++++++++------------- bridgev2/portalbackfill.go | 22 ++- bridgev2/portalinternal.go | 88 ++++----- bridgev2/queue.go | 21 ++- 4 files changed, 316 insertions(+), 175 deletions(-) diff --git a/bridgev2/portal.go b/bridgev2/portal.go index 6dd5711f..5473291b 100644 --- a/bridgev2/portal.go +++ b/bridgev2/portal.go @@ -283,19 +283,21 @@ func (br *Bridge) GetExistingPortalByKey(ctx context.Context, key networkid.Port return br.loadPortal(ctx, db, err, nil) } -func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) { +func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) EventHandlingResult { if PortalEventBuffer == 0 { portal.eventsLock.Lock() defer portal.eventsLock.Unlock() portal.eventIdx++ - portal.handleSingleEventAsync(portal.eventIdx, evt) + return portal.handleSingleEventAsync(portal.eventIdx, evt) } else { select { case portal.events <- evt: + return EventHandlingResultQueued default: zerolog.Ctx(ctx).Error(). Str("portal_id", string(portal.ID)). Msg("Portal event channel is full") + return EventHandlingResultFailed } } } @@ -313,19 +315,23 @@ func (portal *Portal) eventLoop() { } } -func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) { +func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) (outerRes EventHandlingResult) { ctx := portal.getEventCtxWithLog(rawEvt, idx) if _, isCreate := rawEvt.(*portalCreateEvent); isCreate { - portal.handleSingleEvent(ctx, rawEvt, func() {}) + portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) { + outerRes = res + }) } else if portal.Bridge.Config.AsyncEvents { - go portal.handleSingleEvent(ctx, rawEvt, func() {}) + outerRes = EventHandlingResultQueued + go portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) {}) } else { log := zerolog.Ctx(ctx) doneCh := make(chan struct{}) var backgrounded atomic.Bool start := time.Now() var handleDuration time.Duration - go portal.handleSingleEvent(ctx, rawEvt, func() { + go portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) { + outerRes = res handleDuration = time.Since(start) close(doneCh) if backgrounded.Load() { @@ -358,6 +364,7 @@ func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) { Msg("Event handling is taking too long, continuing in background") backgrounded.Store(true) } + return } func (portal *Portal) getEventCtxWithLog(rawEvt any, idx int) context.Context { @@ -404,10 +411,11 @@ func (portal *Portal) getEventCtxWithLog(rawEvt any, idx int) context.Context { return logWith.Logger().WithContext(portal.Bridge.BackgroundCtx) } -func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCallback func()) { +func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCallback func(res EventHandlingResult)) { log := zerolog.Ctx(ctx) + var res EventHandlingResult defer func() { - doneCallback() + doneCallback(res) if err := recover(); err != nil { logEvt := log.Error() if realErr, ok := err.(error); ok { @@ -432,9 +440,11 @@ func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCal case *portalMatrixEvent: portal.handleMatrixEvent(ctx, evt.sender, evt.evt) case *portalRemoteEvent: - portal.handleRemoteEvent(ctx, evt.source, evt.evtType, evt.evt) + res = portal.handleRemoteEvent(ctx, evt.source, evt.evtType, evt.evt) case *portalCreateEvent: - evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil)) + err := portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil) + res.Success = err == nil + evt.cb(err) default: panic(fmt.Errorf("illegal type %T in eventLoop", evt)) } @@ -627,7 +637,7 @@ func (portal *Portal) handleMatrixReceipts(ctx context.Context, evt *event.Event for userID, receipt := range readReceipts { sender, err := portal.Bridge.GetUserByMXID(ctx, userID) if err != nil { - // TODO log + zerolog.Ctx(ctx).Err(err).Msg("Failed to get user to handle read receipt") return } portal.handleMatrixReadReceipt(ctx, sender, evtID, receipt) @@ -1752,13 +1762,13 @@ func (portal *Portal) handleMatrixRedaction(ctx context.Context, sender *UserLog portal.sendSuccessStatus(ctx, evt, 0, "") } -func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) { +func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) (res EventHandlingResult) { log := zerolog.Ctx(ctx) if portal.MXID == "" { mcp, ok := evt.(RemoteEventThatMayCreatePortal) if !ok || !mcp.ShouldCreatePortal() { log.Debug().Msg("Dropping event as portal doesn't exist") - return + return EventHandlingResultIgnored } infoProvider, ok := mcp.(RemoteChatResyncWithInfo) var info *ChatInfo @@ -1777,8 +1787,7 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, err = portal.createMatrixRoomInLoop(ctx, source, info, bundle) if err != nil { log.Err(err).Msg("Failed to create portal to handle event") - // TODO error - return + return EventHandlingResultFailed } if evtType == RemoteEventChatResync { log.Debug().Msg("Not handling chat resync event further as portal was created by it") @@ -1786,7 +1795,7 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, if ok { postHandler.PostHandle(ctx, portal) } - return + return EventHandlingResultSuccess } } preHandler, ok := evt.(RemotePreHandler) @@ -1798,33 +1807,33 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, case RemoteEventUnknown: log.Debug().Msg("Ignoring remote event with type unknown") case RemoteEventMessage, RemoteEventMessageUpsert: - portal.handleRemoteMessage(ctx, source, evt.(RemoteMessage)) + res = portal.handleRemoteMessage(ctx, source, evt.(RemoteMessage)) case RemoteEventEdit: - portal.handleRemoteEdit(ctx, source, evt.(RemoteEdit)) + res = portal.handleRemoteEdit(ctx, source, evt.(RemoteEdit)) case RemoteEventReaction: - portal.handleRemoteReaction(ctx, source, evt.(RemoteReaction)) + res = portal.handleRemoteReaction(ctx, source, evt.(RemoteReaction)) case RemoteEventReactionRemove: - portal.handleRemoteReactionRemove(ctx, source, evt.(RemoteReactionRemove)) + res = portal.handleRemoteReactionRemove(ctx, source, evt.(RemoteReactionRemove)) case RemoteEventReactionSync: - portal.handleRemoteReactionSync(ctx, source, evt.(RemoteReactionSync)) + res = portal.handleRemoteReactionSync(ctx, source, evt.(RemoteReactionSync)) case RemoteEventMessageRemove: - portal.handleRemoteMessageRemove(ctx, source, evt.(RemoteMessageRemove)) + res = portal.handleRemoteMessageRemove(ctx, source, evt.(RemoteMessageRemove)) case RemoteEventReadReceipt: - portal.handleRemoteReadReceipt(ctx, source, evt.(RemoteReadReceipt)) + res = portal.handleRemoteReadReceipt(ctx, source, evt.(RemoteReadReceipt)) case RemoteEventMarkUnread: - portal.handleRemoteMarkUnread(ctx, source, evt.(RemoteMarkUnread)) + res = portal.handleRemoteMarkUnread(ctx, source, evt.(RemoteMarkUnread)) case RemoteEventDeliveryReceipt: - portal.handleRemoteDeliveryReceipt(ctx, source, evt.(RemoteDeliveryReceipt)) + res = portal.handleRemoteDeliveryReceipt(ctx, source, evt.(RemoteDeliveryReceipt)) case RemoteEventTyping: - portal.handleRemoteTyping(ctx, source, evt.(RemoteTyping)) + res = portal.handleRemoteTyping(ctx, source, evt.(RemoteTyping)) case RemoteEventChatInfoChange: - portal.handleRemoteChatInfoChange(ctx, source, evt.(RemoteChatInfoChange)) + res = portal.handleRemoteChatInfoChange(ctx, source, evt.(RemoteChatInfoChange)) case RemoteEventChatResync: - portal.handleRemoteChatResync(ctx, source, evt.(RemoteChatResync)) + res = portal.handleRemoteChatResync(ctx, source, evt.(RemoteChatResync)) case RemoteEventChatDelete: - portal.handleRemoteChatDelete(ctx, source, evt.(RemoteChatDelete)) + res = portal.handleRemoteChatDelete(ctx, source, evt.(RemoteChatDelete)) case RemoteEventBackfill: - portal.handleRemoteBackfill(ctx, source, evt.(RemoteBackfill)) + res = portal.handleRemoteBackfill(ctx, source, evt.(RemoteBackfill)) default: log.Warn().Msg("Got remote event with unknown type") } @@ -1832,9 +1841,10 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, if ok { postHandler.PostHandle(ctx, portal) } + return } -func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID) { +func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID, err error) { var ghost *Ghost if !sender.IsFromMe && sender.ForceDMUser && portal.OtherUserID != "" && sender.Sender != portal.OtherUserID { zerolog.Ctx(ctx).Warn(). @@ -1843,21 +1853,20 @@ func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventS Msg("Overriding event sender with primary other user in DM portal") // Ensure the ghost row exists anyway to prevent foreign key errors when saving messages // TODO it'd probably be better to override the sender in the saved message, but that's more effort - _, err := portal.Bridge.GetGhostByID(ctx, sender.Sender) + _, err = portal.Bridge.GetGhostByID(ctx, sender.Sender) if err != nil { zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get ghost with original user ID") + return } sender.Sender = portal.OtherUserID } if sender.Sender != "" { - var err error ghost, err = portal.Bridge.GetGhostByID(ctx, sender.Sender) if err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to get ghost for message sender") return - } else { - ghost.UpdateInfoIfNecessary(ctx, source, evtType) } + ghost.UpdateInfoIfNecessary(ctx, source, evtType) } if sender.IsFromMe { intent = source.User.DoublePuppet(ctx) @@ -1892,15 +1901,21 @@ func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventS return } -func (portal *Portal) GetIntentFor(ctx context.Context, sender EventSender, source *UserLogin, evtType RemoteEventType) MatrixAPI { - intent, _ := portal.getIntentAndUserMXIDFor(ctx, sender, source, nil, evtType) +func (portal *Portal) GetIntentFor(ctx context.Context, sender EventSender, source *UserLogin, evtType RemoteEventType) (MatrixAPI, bool) { + intent, _, err := portal.getIntentAndUserMXIDFor(ctx, sender, source, nil, evtType) + if err != nil { + return nil, false + } if intent == nil { // TODO this is very hacky - we should either insert an empty ghost row automatically // (and not fetch it at runtime) or make the message sender column nullable. portal.Bridge.GetGhostByID(ctx, "") intent = portal.Bridge.Bot + if intent == nil { + panic(fmt.Errorf("bridge bot is nil")) + } } - return intent + return intent, true } func (portal *Portal) getRelationMeta(ctx context.Context, currentMsg networkid.MessageID, replyToPtr *networkid.MessageOptionalPartID, threadRootPtr *networkid.MessageID, isBatchSend bool) (replyTo, threadRoot, prevThreadEvent *database.Message) { @@ -1982,7 +1997,7 @@ func (portal *Portal) sendConvertedMessage( ts time.Time, streamOrder int64, logContext func(*zerolog.Event) *zerolog.Event, -) []*database.Message { +) ([]*database.Message, EventHandlingResult) { if logContext == nil { logContext = func(e *zerolog.Event) *zerolog.Event { return e @@ -1991,6 +2006,7 @@ func (portal *Portal) sendConvertedMessage( log := zerolog.Ctx(ctx) replyTo, threadRoot, prevThreadEvent := portal.getRelationMeta(ctx, id, converted.ReplyTo, converted.ThreadRoot, false) output := make([]*database.Message, 0, len(converted.Parts)) + allSuccess := true for i, part := range converted.Parts { portal.applyRelationMeta(ctx, part.Content, replyTo, threadRoot, prevThreadEvent) dbMessage := &database.Message{ @@ -2023,6 +2039,7 @@ func (portal *Portal) sendConvertedMessage( }) if err != nil { logContext(log.Err(err)).Str("part_id", string(part.ID)).Msg("Failed to send message part to Matrix") + allSuccess = false continue } logContext(log.Debug()). @@ -2034,12 +2051,13 @@ func (portal *Portal) sendConvertedMessage( err := portal.Bridge.DB.Message.Insert(ctx, dbMessage) if err != nil { logContext(log.Err(err)).Str("part_id", string(part.ID)).Msg("Failed to save message part to database") + allSuccess = false } if converted.Disappear.Type != database.DisappearingTypeNone && !dbMessage.HasFakeMXID() { if converted.Disappear.Type == database.DisappearingTypeAfterSend && converted.Disappear.DisappearAt.IsZero() { converted.Disappear.DisappearAt = dbMessage.Timestamp.Add(converted.Disappear.Timer) } - go portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{ + portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{ RoomID: portal.MXID, EventID: dbMessage.MXID, DisappearingSetting: converted.Disappear, @@ -2050,7 +2068,10 @@ func (portal *Portal) sendConvertedMessage( } output = append(output, dbMessage) } - return output + if !allSuccess { + return output, EventHandlingResultFailed + } + return output, EventHandlingResultSuccess } func (portal *Portal) checkPendingMessage(ctx context.Context, evt RemoteMessage) (bool, *database.Message) { @@ -2110,21 +2131,24 @@ func (portal *Portal) checkPendingMessage(ctx context.Context, evt RemoteMessage return true, pending.db } -func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) bool { +func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) (handleRes EventHandlingResult, continueHandling bool) { log := zerolog.Ctx(ctx) - intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageUpsert) - if intent == nil { - return false + intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageUpsert) + if !ok { + return } res, err := evt.HandleExisting(ctx, portal, intent, existing) if err != nil { log.Err(err).Msg("Failed to handle existing message in upsert event after receiving remote echo") + } else { + handleRes = EventHandlingResultSuccess } if res.SaveParts { for _, part := range existing { err = portal.Bridge.DB.Message.Update(ctx, part) if err != nil { log.Err(err).Str("part_id", string(part.PartID)).Msg("Failed to update message part in database") + handleRes = EventHandlingResultFailed } } } @@ -2136,19 +2160,25 @@ func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin, Str("action", "handle remote subevent"). Stringer("bridge_evt_type", subType). Logger() - portal.handleRemoteEvent(log.WithContext(ctx), source, subType, subEvt) + subRes := portal.handleRemoteEvent(log.WithContext(ctx), source, subType, subEvt) + if !subRes.Success { + handleRes.Success = false + } } } - return res.ContinueMessageHandling + continueHandling = res.ContinueMessageHandling + return } -func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) { +func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) (res EventHandlingResult) { log := zerolog.Ctx(ctx) upsertEvt, isUpsert := evt.(RemoteMessageUpsert) isUpsert = isUpsert && evt.GetType() == RemoteEventMessageUpsert if wasPending, dbMessage := portal.checkPendingMessage(ctx, evt); wasPending { if isUpsert && dbMessage != nil { - portal.handleRemoteUpsert(ctx, source, upsertEvt, []*database.Message{dbMessage}) + res, _ = portal.handleRemoteUpsert(ctx, source, upsertEvt, []*database.Message{dbMessage}) + } else { + res = EventHandlingResultIgnored } return } @@ -2157,35 +2187,42 @@ func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin log.Err(err).Msg("Failed to check if message is a duplicate") } else if len(existing) > 0 { if isUpsert { - if portal.handleRemoteUpsert(ctx, source, upsertEvt, existing) { + var continueHandling bool + res, continueHandling = portal.handleRemoteUpsert(ctx, source, upsertEvt, existing) + if continueHandling { log.Debug().Msg("Upsert handler said to continue message handling normally") } else { - return + return res } } else { log.Debug().Stringer("existing_mxid", existing[0].MXID).Msg("Ignoring duplicate message") - return + return EventHandlingResultIgnored } } - intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessage) - if intent == nil { - return + intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessage) + if !ok { + return EventHandlingResultFailed } ts := getEventTS(evt) converted, err := evt.ConvertMessage(ctx, portal, intent) if err != nil { if errors.Is(err, ErrIgnoringRemoteEvent) { log.Debug().Err(err).Msg("Remote message handling was cancelled by convert function") + return EventHandlingResultIgnored } else { log.Err(err).Msg("Failed to convert remote message") portal.sendRemoteErrorNotice(ctx, intent, err, ts, "message") + return EventHandlingResultFailed } - return } - portal.sendConvertedMessage(ctx, evt.GetID(), intent, evt.GetSender().Sender, converted, ts, getStreamOrder(evt), nil) + _, res = portal.sendConvertedMessage(ctx, evt.GetID(), intent, evt.GetSender().Sender, converted, ts, getStreamOrder(evt), nil) if portal.currentlyTypingGhosts.Pop(intent.GetMXID()) { - intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0) + err = intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0) + if err != nil { + log.Warn().Err(err).Msg("Failed to send stop typing event after bridging message") + } } + return } func (portal *Portal) sendRemoteErrorNotice(ctx context.Context, intent MatrixAPI, err error, ts time.Time, evtTypeName string) { @@ -2208,7 +2245,7 @@ func (portal *Portal) sendRemoteErrorNotice(ctx context.Context, intent MatrixAP } } -func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) { +func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) EventHandlingResult { log := zerolog.Ctx(ctx) var existing []*database.Message if bundledEvt, ok := evt.(RemoteEventWithBundledParts); ok { @@ -2220,37 +2257,41 @@ func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, e existing, err = portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, targetID) if err != nil { log.Err(err).Msg("Failed to get edit target message") - return + return EventHandlingResultFailed } } if existing == nil { log.Warn().Msg("Edit target message not found") - return + return EventHandlingResultIgnored } - intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventEdit) - if intent == nil { - return + intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventEdit) + if !ok { + return EventHandlingResultFailed } else if intent.GetMXID() != existing[0].SenderMXID { log.Warn(). Stringer("edit_sender_mxid", intent.GetMXID()). Stringer("original_sender_mxid", existing[0].SenderMXID). Msg("Not bridging edit: sender doesn't match original message sender") - return + return EventHandlingResultIgnored } ts := getEventTS(evt) converted, err := evt.ConvertEdit(ctx, portal, intent, existing) if errors.Is(err, ErrIgnoringRemoteEvent) { log.Debug().Err(err).Msg("Remote edit handling was cancelled by convert function") - return + return EventHandlingResultIgnored } else if err != nil { log.Err(err).Msg("Failed to convert remote edit") portal.sendRemoteErrorNotice(ctx, intent, err, ts, "edit") - return + return EventHandlingResultFailed } - portal.sendConvertedEdit(ctx, existing[0].ID, evt.GetSender().Sender, converted, intent, ts, getStreamOrder(evt)) + res := portal.sendConvertedEdit(ctx, existing[0].ID, evt.GetSender().Sender, converted, intent, ts, getStreamOrder(evt)) if portal.currentlyTypingGhosts.Pop(intent.GetMXID()) { - intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0) + err = intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0) + if err != nil { + log.Warn().Err(err).Msg("Failed to send stop typing event after bridging edit") + } } + return res } func (portal *Portal) sendConvertedEdit( @@ -2261,8 +2302,9 @@ func (portal *Portal) sendConvertedEdit( intent MatrixAPI, ts time.Time, streamOrder int64, -) { +) EventHandlingResult { log := zerolog.Ctx(ctx) + allSuccess := true for i, part := range converted.ModifiedParts { if part.Content.Mentions == nil { part.Content.Mentions = &event.Mentions{} @@ -2298,6 +2340,7 @@ func (portal *Portal) sendConvertedEdit( }) if err != nil { log.Err(err).Stringer("part_mxid", part.Part.MXID).Msg("Failed to edit message part") + allSuccess = false continue } else { log.Debug(). @@ -2312,6 +2355,7 @@ func (portal *Portal) sendConvertedEdit( err := portal.Bridge.DB.Message.Update(ctx, part.Part) if err != nil { log.Err(err).Int64("part_rowid", part.Part.RowID).Msg("Failed to update message part in database") + allSuccess = false } } for _, part := range converted.DeletedParts { @@ -2325,6 +2369,7 @@ func (portal *Portal) sendConvertedEdit( }) if err != nil { log.Err(err).Stringer("part_mxid", part.MXID).Msg("Failed to redact message part deleted in edit") + allSuccess = false } else { log.Debug(). Stringer("redaction_event_id", resp.EventID). @@ -2335,11 +2380,19 @@ func (portal *Portal) sendConvertedEdit( err = portal.Bridge.DB.Message.Delete(ctx, part.RowID) if err != nil { log.Err(err).Int64("part_rowid", part.RowID).Msg("Failed to delete message part from database") + allSuccess = false } } if converted.AddedParts != nil { - portal.sendConvertedMessage(ctx, targetID, intent, senderID, converted.AddedParts, ts, streamOrder, nil) + _, res := portal.sendConvertedMessage(ctx, targetID, intent, senderID, converted.AddedParts, ts, streamOrder, nil) + if !res.Success { + allSuccess = false + } } + if !allSuccess { + return EventHandlingResultFailed + } + return EventHandlingResultSuccess } func (portal *Portal) getTargetMessagePart(ctx context.Context, evt RemoteEventWithTargetMessage) (*database.Message, error) { @@ -2372,17 +2425,17 @@ func getStreamOrder(evt RemoteEvent) int64 { return 0 } -func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) { +func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) EventHandlingResult { log := zerolog.Ctx(ctx) eventTS := getEventTS(evt) targetMessage, err := portal.getTargetMessagePart(ctx, evt) if err != nil { log.Err(err).Msg("Failed to get target message for reaction") - return + return EventHandlingResultFailed } else if targetMessage == nil { // TODO use deterministic event ID as target if applicable? log.Warn().Msg("Target message for reaction not found") - return + return EventHandlingResultIgnored } var existingReactions []*database.Reaction if partTargeter, ok := evt.(RemoteEventWithTargetPart); ok { @@ -2390,6 +2443,10 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User } else { existingReactions, err = portal.Bridge.DB.Reaction.GetAllToMessage(ctx, portal.Receiver, evt.GetTargetMessage()) } + if err != nil { + log.Err(err).Msg("Failed to get existing reactions for reaction sync") + return EventHandlingResultFailed + } existing := make(map[networkid.UserID]map[networkid.EmojiID]*database.Reaction) for _, existingReaction := range existingReactions { if existing[existingReaction.SenderID] == nil { @@ -2398,9 +2455,13 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User existing[existingReaction.SenderID][existingReaction.EmojiID] = existingReaction } - doAddReaction := func(new *BackfillReaction, intent MatrixAPI) MatrixAPI { + doAddReaction := func(new *BackfillReaction, intent MatrixAPI) { if intent == nil { - intent = portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync) + var ok bool + intent, ok = portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync) + if !ok { + return + } } portal.sendConvertedReaction( ctx, new.Sender.Sender, intent, targetMessage, new.EmojiID, new.Emoji, @@ -2411,7 +2472,6 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User Time("reaction_ts", new.Timestamp) }, ) - return intent } doRemoveReaction := func(old *database.Reaction, intent MatrixAPI, deleteRow bool) { if intent == nil && old.SenderMXID != "" { @@ -2445,7 +2505,10 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User } } doOverwriteReaction := func(new *BackfillReaction, old *database.Reaction) { - intent := portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync) + intent, ok := portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync) + if !ok { + return + } doRemoveReaction(old, intent, false) doAddReaction(new, intent) } @@ -2496,30 +2559,34 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User } } } + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) { +func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) EventHandlingResult { log := zerolog.Ctx(ctx) targetMessage, err := portal.getTargetMessagePart(ctx, evt) if err != nil { log.Err(err).Msg("Failed to get target message for reaction") - return + return EventHandlingResultFailed } else if targetMessage == nil { // TODO use deterministic event ID as target if applicable? log.Warn().Msg("Target message for reaction not found") - return + return EventHandlingResultIgnored } emoji, emojiID := evt.GetReactionEmoji() existingReaction, err := portal.Bridge.DB.Reaction.GetByID(ctx, portal.Receiver, targetMessage.ID, targetMessage.PartID, evt.GetSender().Sender, emojiID) if err != nil { log.Err(err).Msg("Failed to check if reaction is a duplicate") - return + return EventHandlingResultFailed } else if existingReaction != nil && (emojiID != "" || existingReaction.Emoji == emoji) { log.Debug().Msg("Ignoring duplicate reaction") - return + return EventHandlingResultIgnored } ts := getEventTS(evt) - intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReaction) + intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReaction) + if !ok { + return EventHandlingResultFailed + } var extra map[string]any if extraContentProvider, ok := evt.(RemoteReactionWithExtraContent); ok { extra = extraContentProvider.GetReactionExtraContent() @@ -2538,14 +2605,14 @@ func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogi log.Err(err).Msg("Failed to redact old reaction") } } - portal.sendConvertedReaction(ctx, evt.GetSender().Sender, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extra, nil) + return portal.sendConvertedReaction(ctx, evt.GetSender().Sender, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extra, nil) } func (portal *Portal) sendConvertedReaction( ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message, emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any, logContext func(*zerolog.Event) *zerolog.Event, -) { +) EventHandlingResult { if logContext == nil { logContext = func(e *zerolog.Event) *zerolog.Event { return e @@ -2580,7 +2647,7 @@ func (portal *Portal) sendConvertedReaction( }) if err != nil { logContext(log.Err(err)).Msg("Failed to send reaction to Matrix") - return + return EventHandlingResultFailed } logContext(log.Debug()). Stringer("event_id", resp.EventID). @@ -2589,7 +2656,9 @@ func (portal *Portal) sendConvertedReaction( err = portal.Bridge.DB.Reaction.Upsert(ctx, dbReaction) if err != nil { logContext(log.Err(err)).Msg("Failed to save reaction to database") + return EventHandlingResultFailed } + return EventHandlingResultSuccess } func (portal *Portal) getIntentForMXID(ctx context.Context, userID id.UserID) (MatrixAPI, error) { @@ -2608,22 +2677,26 @@ func (portal *Portal) getIntentForMXID(ctx context.Context, userID id.UserID) (M } } -func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) { +func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) EventHandlingResult { log := zerolog.Ctx(ctx) targetReaction, err := portal.getTargetReaction(ctx, evt) if err != nil { log.Err(err).Msg("Failed to get target reaction for removal") - return + return EventHandlingResultFailed } else if targetReaction == nil { log.Warn().Msg("Target reaction not found") - return + return EventHandlingResultIgnored } intent, err := portal.getIntentForMXID(ctx, targetReaction.SenderMXID) if err != nil { log.Err(err).Stringer("sender_mxid", targetReaction.SenderMXID).Msg("Failed to get intent for removing reaction") } if intent == nil { - intent = portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReactionRemove) + var ok bool + intent, ok = portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReactionRemove) + if !ok { + return EventHandlingResultFailed + } } ts := getEventTS(evt) _, err = intent.SendMessage(ctx, portal.MXID, event.EventRedaction, &event.Content{ @@ -2633,22 +2706,24 @@ func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *Us }, &MatrixSendExtra{Timestamp: ts, ReactionMeta: targetReaction}) if err != nil { log.Err(err).Stringer("reaction_mxid", targetReaction.MXID).Msg("Failed to redact reaction") + return EventHandlingResultFailed } err = portal.Bridge.DB.Reaction.Delete(ctx, targetReaction) if err != nil { log.Err(err).Msg("Failed to delete target reaction from database") } + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) { +func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) EventHandlingResult { log := zerolog.Ctx(ctx) targetParts, err := portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, evt.GetTargetMessage()) if err != nil { log.Err(err).Msg("Failed to get target message for removal") - return + return EventHandlingResultFailed } else if len(targetParts) == 0 { log.Debug().Msg("Target message not found") - return + return EventHandlingResultIgnored } onlyForMeProvider, ok := evt.(RemoteDeleteOnlyForMe) onlyForMe := ok && onlyForMeProvider.DeleteOnlyForMe() @@ -2656,7 +2731,10 @@ func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *Use // TODO check if there are other user logins before deleting } - intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageRemove) + intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageRemove) + if !ok { + return EventHandlingResultFailed + } if intent == portal.Bridge.Bot && len(targetParts) > 0 { senderIntent, err := portal.getIntentForMXID(ctx, targetParts[0].SenderMXID) if err != nil { @@ -2665,15 +2743,17 @@ func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *Use intent = senderIntent } } - portal.redactMessageParts(ctx, targetParts, intent, getEventTS(evt)) + res := portal.redactMessageParts(ctx, targetParts, intent, getEventTS(evt)) err = portal.Bridge.DB.Message.DeleteAllParts(ctx, portal.Receiver, evt.GetTargetMessage()) if err != nil { log.Err(err).Msg("Failed to delete target message from database") } + return res } -func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) { +func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) EventHandlingResult { log := zerolog.Ctx(ctx) + var anyFailed bool for _, part := range parts { if part.HasFakeMXID() { continue @@ -2685,6 +2765,7 @@ func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database. }, &MatrixSendExtra{Timestamp: ts, MessageMeta: part}) if err != nil { log.Err(err).Stringer("part_mxid", part.MXID).Msg("Failed to redact message part") + anyFailed = true } else { log.Debug(). Stringer("redaction_event_id", resp.EventID). @@ -2693,9 +2774,13 @@ func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database. Msg("Sent redaction of message part to Matrix") } } + if anyFailed { + return EventHandlingResultFailed + } + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) { +func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) EventHandlingResult { log := zerolog.Ctx(ctx) var err error var lastTarget *database.Message @@ -2705,7 +2790,7 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL if err != nil { log.Err(err).Str("last_target_id", string(lastTargetID)). Msg("Failed to get last target message for read receipt") - return + return EventHandlingResultFailed } else if lastTarget == nil { log.Debug().Str("last_target_id", string(lastTargetID)). Msg("Last target message not found") @@ -2724,7 +2809,7 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL if err != nil { log.Err(err).Str("target_id", string(targetID)). Msg("Failed to get target message for read receipt") - return + return EventHandlingResultFailed } else if target != nil && !target.HasFakeMXID() && (lastTarget == nil || target.Timestamp.After(lastTarget.Timestamp)) { lastTarget = target } @@ -2737,14 +2822,17 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL } } sender := evt.GetSender() - intent := portal.GetIntentFor(ctx, sender, source, RemoteEventReadReceipt) + intent, ok := portal.GetIntentFor(ctx, sender, source, RemoteEventReadReceipt) + if !ok { + return EventHandlingResultFailed + } var addTargetLog func(evt *zerolog.Event) *zerolog.Event if lastTarget == nil { sevt, evtOK := evt.(RemoteReadReceiptWithStreamOrder) soIntent, soIntentOK := intent.(StreamOrderReadingMatrixAPI) if !evtOK || !soIntentOK || sevt.GetReadUpToStreamOrder() == 0 { log.Warn().Msg("No target message found for read receipt") - return + return EventHandlingResultIgnored } targetStreamOrder := sevt.GetReadUpToStreamOrder() addTargetLog = func(evt *zerolog.Event) *zerolog.Event { @@ -2759,40 +2847,47 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL } if err != nil { addTargetLog(log.Err(err)).Msg("Failed to bridge read receipt") + return EventHandlingResultFailed } else { addTargetLog(log.Debug()).Msg("Bridged read receipt") } if sender.IsFromMe { portal.Bridge.DisappearLoop.StartAll(ctx, portal.MXID) } + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) { +func (portal *Portal) handleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) EventHandlingResult { if !evt.GetSender().IsFromMe { zerolog.Ctx(ctx).Warn().Msg("Ignoring mark unread event from non-self user") - return + return EventHandlingResultIgnored } dp := source.User.DoublePuppet(ctx) if dp == nil { - return + return EventHandlingResultIgnored } err := dp.MarkUnread(ctx, portal.MXID, evt.GetUnread()) if err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to bridge mark unread event") + return EventHandlingResultFailed } + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) { +func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) EventHandlingResult { if portal.RoomType != database.RoomTypeDM || evt.GetSender().Sender != portal.OtherUserID { - return + return EventHandlingResultIgnored + } + intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventDeliveryReceipt) + if !ok { + return EventHandlingResultFailed } - intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventDeliveryReceipt) log := zerolog.Ctx(ctx) for _, target := range evt.GetReceiptTargets() { targetParts, err := portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, target) if err != nil { log.Err(err).Str("target_id", string(target)).Msg("Failed to get target message for delivery receipt") - continue + return EventHandlingResultFailed } else if len(targetParts) == 0 { continue } else if _, sentByGhost := portal.Bridge.Matrix.ParseGhostMXID(targetParts[0].SenderMXID); sentByGhost { @@ -2811,36 +2906,43 @@ func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *U }) } } + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) { +func (portal *Portal) handleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) EventHandlingResult { var typingType TypingType if typedEvt, ok := evt.(RemoteTypingWithType); ok { typingType = typedEvt.GetTypingType() } - intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventTyping) + intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventTyping) + if !ok { + return EventHandlingResultFailed + } timeout := evt.GetTimeout() err := intent.MarkTyping(ctx, portal.MXID, typingType, timeout) if err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to bridge typing event") + return EventHandlingResultFailed } if timeout == 0 { portal.currentlyTypingGhosts.Remove(intent.GetMXID()) } else { portal.currentlyTypingGhosts.Add(intent.GetMXID()) } + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) { +func (portal *Portal) handleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) EventHandlingResult { info, err := evt.GetChatInfoChange(ctx) if err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to get chat info change") - return + return EventHandlingResultFailed } portal.ProcessChatInfoChange(ctx, evt.GetSender(), source, info, getEventTS(evt)) + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) { +func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) EventHandlingResult { log := zerolog.Ctx(ctx) infoProvider, ok := evt.(RemoteChatResyncWithInfo) if ok { @@ -2869,15 +2971,16 @@ func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLo portal.doForwardBackfill(ctx, source, latestMessage, bundle) } } + return EventHandlingResultSuccess } -func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) { +func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) EventHandlingResult { log := zerolog.Ctx(ctx) if portal.Receiver == "" && evt.DeleteOnlyForMe() { logins, err := portal.Bridge.DB.UserPortal.GetAllInPortal(ctx, portal.PortalKey) if err != nil { log.Err(err).Msg("Failed to check if portal has other logins") - return + return EventHandlingResultFailed } var ownUP *database.UserPortal logins = slices.DeleteFunc(logins, func(up *database.UserPortal) bool { @@ -2907,31 +3010,35 @@ func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLo ) if err != nil { log.Err(err).Msg("Failed to send leave state event for user after remote chat delete") + return EventHandlingResultFailed } else { log.Debug().Msg("Sent leave state event for user after remote chat delete") + return EventHandlingResultSuccess } - return } } err := portal.Delete(ctx) if err != nil { log.Err(err).Msg("Failed to delete portal from database") - return + return EventHandlingResultFailed } err = portal.Bridge.Bot.DeleteRoom(ctx, portal.MXID, false) if err != nil { log.Err(err).Msg("Failed to delete Matrix room") + return EventHandlingResultFailed } else { log.Info().Msg("Deleted room after remote chat delete event") + return EventHandlingResultSuccess } } -func (portal *Portal) handleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) { +func (portal *Portal) handleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) (res EventHandlingResult) { //data, err := backfill.GetBackfillData(ctx, portal) //if err != nil { // zerolog.Ctx(ctx).Err(err).Msg("Failed to get backfill data") // return //} + return } type ChatInfoChange struct { @@ -2944,7 +3051,10 @@ type ChatInfoChange struct { } func (portal *Portal) ProcessChatInfoChange(ctx context.Context, sender EventSender, source *UserLogin, change *ChatInfoChange, ts time.Time) { - intent := portal.GetIntentFor(ctx, sender, source, RemoteEventChatInfoChange) + intent, ok := portal.GetIntentFor(ctx, sender, source, RemoteEventChatInfoChange) + if !ok { + return + } if change.ChatInfo != nil { portal.UpdateInfo(ctx, change.ChatInfo, source, intent, ts) } @@ -3339,7 +3449,10 @@ func (portal *Portal) getInitialMemberList(ctx context.Context, members *ChatMem ghost.UpdateInfo(ctx, member.UserInfo) } } - intent, extraUserID := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0) + intent, extraUserID, err := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0) + if err != nil { + return nil, nil, err + } if extraUserID != "" { invite = append(invite, extraUserID) if member.PowerLevel != nil { @@ -3535,7 +3648,10 @@ func (portal *Portal) syncParticipants( ghost.UpdateInfo(ctx, member.UserInfo) } } - intent, extraUserID := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0) + intent, extraUserID, err := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0) + if err != nil { + return err + } if intent != nil { syncIntent(intent, member) } diff --git a/bridgev2/portalbackfill.go b/bridgev2/portalbackfill.go index 3953a043..74b75df2 100644 --- a/bridgev2/portalbackfill.go +++ b/bridgev2/portalbackfill.go @@ -323,7 +323,10 @@ func (portal *Portal) compileBatchMessage(ctx context.Context, source *UserLogin if len(msg.Parts) == 0 { return } - intent := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage) + intent, ok := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage) + if !ok { + return + } replyTo, threadRoot, prevThreadEvent := portal.getRelationMeta(ctx, msg.ID, msg.ReplyTo, msg.ThreadRoot, true) if threadRoot != nil && out.PrevThreadEvents[*msg.ThreadRoot] != "" { prevThreadEvent.MXID = out.PrevThreadEvents[*msg.ThreadRoot] @@ -387,7 +390,10 @@ func (portal *Portal) compileBatchMessage(ctx context.Context, source *UserLogin } slices.Sort(partIDs) for _, reaction := range msg.Reactions { - reactionIntent := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReactionRemove) + reactionIntent, ok := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReactionRemove) + if !ok { + continue + } if reaction.TargetPart == nil { reaction.TargetPart = &partIDs[0] } @@ -513,8 +519,11 @@ func (portal *Portal) sendBatch(ctx context.Context, source *UserLogin, messages func (portal *Portal) sendLegacyBackfill(ctx context.Context, source *UserLogin, messages []*BackfillMessage, markRead bool) { var lastPart id.EventID for _, msg := range messages { - intent := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage) - dbMessages := portal.sendConvertedMessage(ctx, msg.ID, intent, msg.Sender.Sender, msg.ConvertedMessage, msg.Timestamp, msg.StreamOrder, func(z *zerolog.Event) *zerolog.Event { + intent, ok := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage) + if !ok { + continue + } + dbMessages, _ := portal.sendConvertedMessage(ctx, msg.ID, intent, msg.Sender.Sender, msg.ConvertedMessage, msg.Timestamp, msg.StreamOrder, func(z *zerolog.Event) *zerolog.Event { return z. Str("message_id", string(msg.ID)). Any("sender_id", msg.Sender). @@ -523,7 +532,10 @@ func (portal *Portal) sendLegacyBackfill(ctx context.Context, source *UserLogin, if len(dbMessages) > 0 { lastPart = dbMessages[len(dbMessages)-1].MXID for _, reaction := range msg.Reactions { - reactionIntent := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReaction) + reactionIntent, ok := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReaction) + if !ok { + continue + } targetPart := dbMessages[0] if reaction.TargetPart != nil { targetPartIdx := slices.IndexFunc(dbMessages, func(dbMsg *database.Message) bool { diff --git a/bridgev2/portalinternal.go b/bridgev2/portalinternal.go index bde0b170..2b25f0cf 100644 --- a/bridgev2/portalinternal.go +++ b/bridgev2/portalinternal.go @@ -29,23 +29,23 @@ func (portal *PortalInternals) UpdateLogger() { (*Portal)(portal).updateLogger() } -func (portal *PortalInternals) QueueEvent(ctx context.Context, evt portalEvent) { - (*Portal)(portal).queueEvent(ctx, evt) +func (portal *PortalInternals) QueueEvent(ctx context.Context, evt portalEvent) EventHandlingResult { + return (*Portal)(portal).queueEvent(ctx, evt) } func (portal *PortalInternals) EventLoop() { (*Portal)(portal).eventLoop() } -func (portal *PortalInternals) HandleSingleEventAsync(idx int, rawEvt any) { - (*Portal)(portal).handleSingleEventAsync(idx, rawEvt) +func (portal *PortalInternals) HandleSingleEventAsync(idx int, rawEvt any) (outerRes EventHandlingResult) { + return (*Portal)(portal).handleSingleEventAsync(idx, rawEvt) } func (portal *PortalInternals) GetEventCtxWithLog(rawEvt any, idx int) context.Context { return (*Portal)(portal).getEventCtxWithLog(rawEvt, idx) } -func (portal *PortalInternals) HandleSingleEvent(ctx context.Context, rawEvt any, doneCallback func()) { +func (portal *PortalInternals) HandleSingleEvent(ctx context.Context, rawEvt any, doneCallback func(EventHandlingResult)) { (*Portal)(portal).handleSingleEvent(ctx, rawEvt, doneCallback) } @@ -129,11 +129,11 @@ func (portal *PortalInternals) HandleMatrixRedaction(ctx context.Context, sender (*Portal)(portal).handleMatrixRedaction(ctx, sender, origSender, evt) } -func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) { - (*Portal)(portal).handleRemoteEvent(ctx, source, evtType, evt) +func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) (res EventHandlingResult) { + return (*Portal)(portal).handleRemoteEvent(ctx, source, evtType, evt) } -func (portal *PortalInternals) GetIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID) { +func (portal *PortalInternals) GetIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID, err error) { return (*Portal)(portal).getIntentAndUserMXIDFor(ctx, sender, source, otherLogins, evtType) } @@ -145,7 +145,7 @@ func (portal *PortalInternals) ApplyRelationMeta(ctx context.Context, content *e (*Portal)(portal).applyRelationMeta(ctx, content, replyTo, threadRoot, prevThreadEvent) } -func (portal *PortalInternals) SendConvertedMessage(ctx context.Context, id networkid.MessageID, intent MatrixAPI, senderID networkid.UserID, converted *ConvertedMessage, ts time.Time, streamOrder int64, logContext func(*zerolog.Event) *zerolog.Event) []*database.Message { +func (portal *PortalInternals) SendConvertedMessage(ctx context.Context, id networkid.MessageID, intent MatrixAPI, senderID networkid.UserID, converted *ConvertedMessage, ts time.Time, streamOrder int64, logContext func(*zerolog.Event) *zerolog.Event) ([]*database.Message, EventHandlingResult) { return (*Portal)(portal).sendConvertedMessage(ctx, id, intent, senderID, converted, ts, streamOrder, logContext) } @@ -153,24 +153,24 @@ func (portal *PortalInternals) CheckPendingMessage(ctx context.Context, evt Remo return (*Portal)(portal).checkPendingMessage(ctx, evt) } -func (portal *PortalInternals) HandleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) bool { +func (portal *PortalInternals) HandleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) (handleRes EventHandlingResult, continueHandling bool) { return (*Portal)(portal).handleRemoteUpsert(ctx, source, evt, existing) } -func (portal *PortalInternals) HandleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) { - (*Portal)(portal).handleRemoteMessage(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) (res EventHandlingResult) { + return (*Portal)(portal).handleRemoteMessage(ctx, source, evt) } func (portal *PortalInternals) SendRemoteErrorNotice(ctx context.Context, intent MatrixAPI, err error, ts time.Time, evtTypeName string) { (*Portal)(portal).sendRemoteErrorNotice(ctx, intent, err, ts, evtTypeName) } -func (portal *PortalInternals) HandleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) { - (*Portal)(portal).handleRemoteEdit(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) EventHandlingResult { + return (*Portal)(portal).handleRemoteEdit(ctx, source, evt) } -func (portal *PortalInternals) SendConvertedEdit(ctx context.Context, targetID networkid.MessageID, senderID networkid.UserID, converted *ConvertedEdit, intent MatrixAPI, ts time.Time, streamOrder int64) { - (*Portal)(portal).sendConvertedEdit(ctx, targetID, senderID, converted, intent, ts, streamOrder) +func (portal *PortalInternals) SendConvertedEdit(ctx context.Context, targetID networkid.MessageID, senderID networkid.UserID, converted *ConvertedEdit, intent MatrixAPI, ts time.Time, streamOrder int64) EventHandlingResult { + return (*Portal)(portal).sendConvertedEdit(ctx, targetID, senderID, converted, intent, ts, streamOrder) } func (portal *PortalInternals) GetTargetMessagePart(ctx context.Context, evt RemoteEventWithTargetMessage) (*database.Message, error) { @@ -181,64 +181,64 @@ func (portal *PortalInternals) GetTargetReaction(ctx context.Context, evt Remote return (*Portal)(portal).getTargetReaction(ctx, evt) } -func (portal *PortalInternals) HandleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) { - (*Portal)(portal).handleRemoteReactionSync(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) EventHandlingResult { + return (*Portal)(portal).handleRemoteReactionSync(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) { - (*Portal)(portal).handleRemoteReaction(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) EventHandlingResult { + return (*Portal)(portal).handleRemoteReaction(ctx, source, evt) } -func (portal *PortalInternals) SendConvertedReaction(ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message, emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any, logContext func(*zerolog.Event) *zerolog.Event) { - (*Portal)(portal).sendConvertedReaction(ctx, senderID, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extraContent, logContext) +func (portal *PortalInternals) SendConvertedReaction(ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message, emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any, logContext func(*zerolog.Event) *zerolog.Event) EventHandlingResult { + return (*Portal)(portal).sendConvertedReaction(ctx, senderID, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extraContent, logContext) } func (portal *PortalInternals) GetIntentForMXID(ctx context.Context, userID id.UserID) (MatrixAPI, error) { return (*Portal)(portal).getIntentForMXID(ctx, userID) } -func (portal *PortalInternals) HandleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) { - (*Portal)(portal).handleRemoteReactionRemove(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) EventHandlingResult { + return (*Portal)(portal).handleRemoteReactionRemove(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) { - (*Portal)(portal).handleRemoteMessageRemove(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) EventHandlingResult { + return (*Portal)(portal).handleRemoteMessageRemove(ctx, source, evt) } -func (portal *PortalInternals) RedactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) { - (*Portal)(portal).redactMessageParts(ctx, parts, intent, ts) +func (portal *PortalInternals) RedactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) EventHandlingResult { + return (*Portal)(portal).redactMessageParts(ctx, parts, intent, ts) } -func (portal *PortalInternals) HandleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) { - (*Portal)(portal).handleRemoteReadReceipt(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) EventHandlingResult { + return (*Portal)(portal).handleRemoteReadReceipt(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) { - (*Portal)(portal).handleRemoteMarkUnread(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) EventHandlingResult { + return (*Portal)(portal).handleRemoteMarkUnread(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) { - (*Portal)(portal).handleRemoteDeliveryReceipt(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) EventHandlingResult { + return (*Portal)(portal).handleRemoteDeliveryReceipt(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) { - (*Portal)(portal).handleRemoteTyping(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) EventHandlingResult { + return (*Portal)(portal).handleRemoteTyping(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) { - (*Portal)(portal).handleRemoteChatInfoChange(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) EventHandlingResult { + return (*Portal)(portal).handleRemoteChatInfoChange(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) { - (*Portal)(portal).handleRemoteChatResync(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) EventHandlingResult { + return (*Portal)(portal).handleRemoteChatResync(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) { - (*Portal)(portal).handleRemoteChatDelete(ctx, source, evt) +func (portal *PortalInternals) HandleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) EventHandlingResult { + return (*Portal)(portal).handleRemoteChatDelete(ctx, source, evt) } -func (portal *PortalInternals) HandleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) { - (*Portal)(portal).handleRemoteBackfill(ctx, source, backfill) +func (portal *PortalInternals) HandleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) (res EventHandlingResult) { + return (*Portal)(portal).handleRemoteBackfill(ctx, source, backfill) } func (portal *PortalInternals) UpdateName(ctx context.Context, name string, sender MatrixAPI, ts time.Time) bool { diff --git a/bridgev2/queue.go b/bridgev2/queue.go index 74424290..22663222 100644 --- a/bridgev2/queue.go +++ b/bridgev2/queue.go @@ -151,11 +151,24 @@ func (br *Bridge) QueueMatrixEvent(ctx context.Context, evt *event.Event) { } } -func (ul *UserLogin) QueueRemoteEvent(evt RemoteEvent) { - ul.Bridge.QueueRemoteEvent(ul, evt) +type EventHandlingResult struct { + Success bool + Ignored bool + Queued bool } -func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) { +var ( + EventHandlingResultFailed = EventHandlingResult{} + EventHandlingResultQueued = EventHandlingResult{Queued: true} + EventHandlingResultSuccess = EventHandlingResult{Success: true} + EventHandlingResultIgnored = EventHandlingResult{Success: true, Ignored: true} +) + +func (ul *UserLogin) QueueRemoteEvent(evt RemoteEvent) EventHandlingResult { + return ul.Bridge.QueueRemoteEvent(ul, evt) +} + +func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) (res EventHandlingResult) { log := login.Log ctx := log.WithContext(br.BackgroundCtx) maybeUncertain, ok := evt.(RemoteEventWithUncertainPortalReceiver) @@ -182,7 +195,7 @@ func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) { } // TODO put this in a better place, and maybe cache to avoid constant db queries login.MarkInPortal(ctx, portal) - portal.queueEvent(ctx, &portalRemoteEvent{ + return portal.queueEvent(ctx, &portalRemoteEvent{ evt: evt, source: login, })