bridgev2/portal: return result of handling remote events (#389)
Some checks failed
Go / Lint (latest) (push) Has been cancelled
Go / Build (old, libolm) (push) Has been cancelled
Go / Build (latest, libolm) (push) Has been cancelled
Go / Build (old, goolm) (push) Has been cancelled
Go / Build (latest, goolm) (push) Has been cancelled

This commit is contained in:
Tulir Asokan 2025-06-17 22:08:29 +05:30 committed by GitHub
commit 26da46dbbf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 318 additions and 175 deletions

View file

@ -283,19 +283,21 @@ func (br *Bridge) GetExistingPortalByKey(ctx context.Context, key networkid.Port
return br.loadPortal(ctx, db, err, nil)
}
func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) {
func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) EventHandlingResult {
if PortalEventBuffer == 0 {
portal.eventsLock.Lock()
defer portal.eventsLock.Unlock()
portal.eventIdx++
portal.handleSingleEventAsync(portal.eventIdx, evt)
return portal.handleSingleEventAsync(portal.eventIdx, evt)
} else {
select {
case portal.events <- evt:
return EventHandlingResultQueued
default:
zerolog.Ctx(ctx).Error().
Str("portal_id", string(portal.ID)).
Msg("Portal event channel is full")
return EventHandlingResultFailed
}
}
}
@ -313,19 +315,25 @@ func (portal *Portal) eventLoop() {
}
}
func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) {
func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) (outerRes EventHandlingResult) {
ctx := portal.getEventCtxWithLog(rawEvt, idx)
if _, isCreate := rawEvt.(*portalCreateEvent); isCreate {
portal.handleSingleEvent(ctx, rawEvt, func() {})
portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) {
outerRes = res
})
} else if portal.Bridge.Config.AsyncEvents {
go portal.handleSingleEvent(ctx, rawEvt, func() {})
outerRes = EventHandlingResultQueued
go portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) {})
} else {
log := zerolog.Ctx(ctx)
doneCh := make(chan struct{})
var backgrounded atomic.Bool
start := time.Now()
var handleDuration time.Duration
go portal.handleSingleEvent(ctx, rawEvt, func() {
// Note: this will not set the success flag if the handler times out
outerRes = EventHandlingResult{Queued: true}
go portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) {
outerRes = res
handleDuration = time.Since(start)
close(doneCh)
if backgrounded.Load() {
@ -358,6 +366,7 @@ func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) {
Msg("Event handling is taking too long, continuing in background")
backgrounded.Store(true)
}
return
}
func (portal *Portal) getEventCtxWithLog(rawEvt any, idx int) context.Context {
@ -404,10 +413,11 @@ func (portal *Portal) getEventCtxWithLog(rawEvt any, idx int) context.Context {
return logWith.Logger().WithContext(portal.Bridge.BackgroundCtx)
}
func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCallback func()) {
func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCallback func(res EventHandlingResult)) {
log := zerolog.Ctx(ctx)
var res EventHandlingResult
defer func() {
doneCallback()
doneCallback(res)
if err := recover(); err != nil {
logEvt := log.Error()
if realErr, ok := err.(error); ok {
@ -432,9 +442,11 @@ func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCal
case *portalMatrixEvent:
portal.handleMatrixEvent(ctx, evt.sender, evt.evt)
case *portalRemoteEvent:
portal.handleRemoteEvent(ctx, evt.source, evt.evtType, evt.evt)
res = portal.handleRemoteEvent(ctx, evt.source, evt.evtType, evt.evt)
case *portalCreateEvent:
evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil))
err := portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil)
res.Success = err == nil
evt.cb(err)
default:
panic(fmt.Errorf("illegal type %T in eventLoop", evt))
}
@ -627,7 +639,7 @@ func (portal *Portal) handleMatrixReceipts(ctx context.Context, evt *event.Event
for userID, receipt := range readReceipts {
sender, err := portal.Bridge.GetUserByMXID(ctx, userID)
if err != nil {
// TODO log
zerolog.Ctx(ctx).Err(err).Msg("Failed to get user to handle read receipt")
return
}
portal.handleMatrixReadReceipt(ctx, sender, evtID, receipt)
@ -1752,13 +1764,13 @@ func (portal *Portal) handleMatrixRedaction(ctx context.Context, sender *UserLog
portal.sendSuccessStatus(ctx, evt, 0, "")
}
func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) {
func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) (res EventHandlingResult) {
log := zerolog.Ctx(ctx)
if portal.MXID == "" {
mcp, ok := evt.(RemoteEventThatMayCreatePortal)
if !ok || !mcp.ShouldCreatePortal() {
log.Debug().Msg("Dropping event as portal doesn't exist")
return
return EventHandlingResultIgnored
}
infoProvider, ok := mcp.(RemoteChatResyncWithInfo)
var info *ChatInfo
@ -1777,8 +1789,7 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin,
err = portal.createMatrixRoomInLoop(ctx, source, info, bundle)
if err != nil {
log.Err(err).Msg("Failed to create portal to handle event")
// TODO error
return
return EventHandlingResultFailed
}
if evtType == RemoteEventChatResync {
log.Debug().Msg("Not handling chat resync event further as portal was created by it")
@ -1786,7 +1797,7 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin,
if ok {
postHandler.PostHandle(ctx, portal)
}
return
return EventHandlingResultSuccess
}
}
preHandler, ok := evt.(RemotePreHandler)
@ -1798,33 +1809,33 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin,
case RemoteEventUnknown:
log.Debug().Msg("Ignoring remote event with type unknown")
case RemoteEventMessage, RemoteEventMessageUpsert:
portal.handleRemoteMessage(ctx, source, evt.(RemoteMessage))
res = portal.handleRemoteMessage(ctx, source, evt.(RemoteMessage))
case RemoteEventEdit:
portal.handleRemoteEdit(ctx, source, evt.(RemoteEdit))
res = portal.handleRemoteEdit(ctx, source, evt.(RemoteEdit))
case RemoteEventReaction:
portal.handleRemoteReaction(ctx, source, evt.(RemoteReaction))
res = portal.handleRemoteReaction(ctx, source, evt.(RemoteReaction))
case RemoteEventReactionRemove:
portal.handleRemoteReactionRemove(ctx, source, evt.(RemoteReactionRemove))
res = portal.handleRemoteReactionRemove(ctx, source, evt.(RemoteReactionRemove))
case RemoteEventReactionSync:
portal.handleRemoteReactionSync(ctx, source, evt.(RemoteReactionSync))
res = portal.handleRemoteReactionSync(ctx, source, evt.(RemoteReactionSync))
case RemoteEventMessageRemove:
portal.handleRemoteMessageRemove(ctx, source, evt.(RemoteMessageRemove))
res = portal.handleRemoteMessageRemove(ctx, source, evt.(RemoteMessageRemove))
case RemoteEventReadReceipt:
portal.handleRemoteReadReceipt(ctx, source, evt.(RemoteReadReceipt))
res = portal.handleRemoteReadReceipt(ctx, source, evt.(RemoteReadReceipt))
case RemoteEventMarkUnread:
portal.handleRemoteMarkUnread(ctx, source, evt.(RemoteMarkUnread))
res = portal.handleRemoteMarkUnread(ctx, source, evt.(RemoteMarkUnread))
case RemoteEventDeliveryReceipt:
portal.handleRemoteDeliveryReceipt(ctx, source, evt.(RemoteDeliveryReceipt))
res = portal.handleRemoteDeliveryReceipt(ctx, source, evt.(RemoteDeliveryReceipt))
case RemoteEventTyping:
portal.handleRemoteTyping(ctx, source, evt.(RemoteTyping))
res = portal.handleRemoteTyping(ctx, source, evt.(RemoteTyping))
case RemoteEventChatInfoChange:
portal.handleRemoteChatInfoChange(ctx, source, evt.(RemoteChatInfoChange))
res = portal.handleRemoteChatInfoChange(ctx, source, evt.(RemoteChatInfoChange))
case RemoteEventChatResync:
portal.handleRemoteChatResync(ctx, source, evt.(RemoteChatResync))
res = portal.handleRemoteChatResync(ctx, source, evt.(RemoteChatResync))
case RemoteEventChatDelete:
portal.handleRemoteChatDelete(ctx, source, evt.(RemoteChatDelete))
res = portal.handleRemoteChatDelete(ctx, source, evt.(RemoteChatDelete))
case RemoteEventBackfill:
portal.handleRemoteBackfill(ctx, source, evt.(RemoteBackfill))
res = portal.handleRemoteBackfill(ctx, source, evt.(RemoteBackfill))
default:
log.Warn().Msg("Got remote event with unknown type")
}
@ -1832,9 +1843,10 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin,
if ok {
postHandler.PostHandle(ctx, portal)
}
return
}
func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID) {
func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID, err error) {
var ghost *Ghost
if !sender.IsFromMe && sender.ForceDMUser && portal.OtherUserID != "" && sender.Sender != portal.OtherUserID {
zerolog.Ctx(ctx).Warn().
@ -1843,21 +1855,20 @@ func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventS
Msg("Overriding event sender with primary other user in DM portal")
// Ensure the ghost row exists anyway to prevent foreign key errors when saving messages
// TODO it'd probably be better to override the sender in the saved message, but that's more effort
_, err := portal.Bridge.GetGhostByID(ctx, sender.Sender)
_, err = portal.Bridge.GetGhostByID(ctx, sender.Sender)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get ghost with original user ID")
return
}
sender.Sender = portal.OtherUserID
}
if sender.Sender != "" {
var err error
ghost, err = portal.Bridge.GetGhostByID(ctx, sender.Sender)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get ghost for message sender")
return
} else {
ghost.UpdateInfoIfNecessary(ctx, source, evtType)
}
ghost.UpdateInfoIfNecessary(ctx, source, evtType)
}
if sender.IsFromMe {
intent = source.User.DoublePuppet(ctx)
@ -1892,15 +1903,21 @@ func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventS
return
}
func (portal *Portal) GetIntentFor(ctx context.Context, sender EventSender, source *UserLogin, evtType RemoteEventType) MatrixAPI {
intent, _ := portal.getIntentAndUserMXIDFor(ctx, sender, source, nil, evtType)
func (portal *Portal) GetIntentFor(ctx context.Context, sender EventSender, source *UserLogin, evtType RemoteEventType) (MatrixAPI, bool) {
intent, _, err := portal.getIntentAndUserMXIDFor(ctx, sender, source, nil, evtType)
if err != nil {
return nil, false
}
if intent == nil {
// TODO this is very hacky - we should either insert an empty ghost row automatically
// (and not fetch it at runtime) or make the message sender column nullable.
portal.Bridge.GetGhostByID(ctx, "")
intent = portal.Bridge.Bot
if intent == nil {
panic(fmt.Errorf("bridge bot is nil"))
}
}
return intent
return intent, true
}
func (portal *Portal) getRelationMeta(ctx context.Context, currentMsg networkid.MessageID, replyToPtr *networkid.MessageOptionalPartID, threadRootPtr *networkid.MessageID, isBatchSend bool) (replyTo, threadRoot, prevThreadEvent *database.Message) {
@ -1982,7 +1999,7 @@ func (portal *Portal) sendConvertedMessage(
ts time.Time,
streamOrder int64,
logContext func(*zerolog.Event) *zerolog.Event,
) []*database.Message {
) ([]*database.Message, EventHandlingResult) {
if logContext == nil {
logContext = func(e *zerolog.Event) *zerolog.Event {
return e
@ -1991,6 +2008,7 @@ func (portal *Portal) sendConvertedMessage(
log := zerolog.Ctx(ctx)
replyTo, threadRoot, prevThreadEvent := portal.getRelationMeta(ctx, id, converted.ReplyTo, converted.ThreadRoot, false)
output := make([]*database.Message, 0, len(converted.Parts))
allSuccess := true
for i, part := range converted.Parts {
portal.applyRelationMeta(ctx, part.Content, replyTo, threadRoot, prevThreadEvent)
dbMessage := &database.Message{
@ -2023,6 +2041,7 @@ func (portal *Portal) sendConvertedMessage(
})
if err != nil {
logContext(log.Err(err)).Str("part_id", string(part.ID)).Msg("Failed to send message part to Matrix")
allSuccess = false
continue
}
logContext(log.Debug()).
@ -2034,12 +2053,13 @@ func (portal *Portal) sendConvertedMessage(
err := portal.Bridge.DB.Message.Insert(ctx, dbMessage)
if err != nil {
logContext(log.Err(err)).Str("part_id", string(part.ID)).Msg("Failed to save message part to database")
allSuccess = false
}
if converted.Disappear.Type != database.DisappearingTypeNone && !dbMessage.HasFakeMXID() {
if converted.Disappear.Type == database.DisappearingTypeAfterSend && converted.Disappear.DisappearAt.IsZero() {
converted.Disappear.DisappearAt = dbMessage.Timestamp.Add(converted.Disappear.Timer)
}
go portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{
portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{
RoomID: portal.MXID,
EventID: dbMessage.MXID,
DisappearingSetting: converted.Disappear,
@ -2050,7 +2070,10 @@ func (portal *Portal) sendConvertedMessage(
}
output = append(output, dbMessage)
}
return output
if !allSuccess {
return output, EventHandlingResultFailed
}
return output, EventHandlingResultSuccess
}
func (portal *Portal) checkPendingMessage(ctx context.Context, evt RemoteMessage) (bool, *database.Message) {
@ -2110,21 +2133,24 @@ func (portal *Portal) checkPendingMessage(ctx context.Context, evt RemoteMessage
return true, pending.db
}
func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) bool {
func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) (handleRes EventHandlingResult, continueHandling bool) {
log := zerolog.Ctx(ctx)
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageUpsert)
if intent == nil {
return false
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageUpsert)
if !ok {
return
}
res, err := evt.HandleExisting(ctx, portal, intent, existing)
if err != nil {
log.Err(err).Msg("Failed to handle existing message in upsert event after receiving remote echo")
} else {
handleRes = EventHandlingResultSuccess
}
if res.SaveParts {
for _, part := range existing {
err = portal.Bridge.DB.Message.Update(ctx, part)
if err != nil {
log.Err(err).Str("part_id", string(part.PartID)).Msg("Failed to update message part in database")
handleRes = EventHandlingResultFailed
}
}
}
@ -2136,19 +2162,25 @@ func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin,
Str("action", "handle remote subevent").
Stringer("bridge_evt_type", subType).
Logger()
portal.handleRemoteEvent(log.WithContext(ctx), source, subType, subEvt)
subRes := portal.handleRemoteEvent(log.WithContext(ctx), source, subType, subEvt)
if !subRes.Success {
handleRes.Success = false
}
}
}
return res.ContinueMessageHandling
continueHandling = res.ContinueMessageHandling
return
}
func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) {
func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) (res EventHandlingResult) {
log := zerolog.Ctx(ctx)
upsertEvt, isUpsert := evt.(RemoteMessageUpsert)
isUpsert = isUpsert && evt.GetType() == RemoteEventMessageUpsert
if wasPending, dbMessage := portal.checkPendingMessage(ctx, evt); wasPending {
if isUpsert && dbMessage != nil {
portal.handleRemoteUpsert(ctx, source, upsertEvt, []*database.Message{dbMessage})
res, _ = portal.handleRemoteUpsert(ctx, source, upsertEvt, []*database.Message{dbMessage})
} else {
res = EventHandlingResultIgnored
}
return
}
@ -2157,35 +2189,42 @@ func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin
log.Err(err).Msg("Failed to check if message is a duplicate")
} else if len(existing) > 0 {
if isUpsert {
if portal.handleRemoteUpsert(ctx, source, upsertEvt, existing) {
var continueHandling bool
res, continueHandling = portal.handleRemoteUpsert(ctx, source, upsertEvt, existing)
if continueHandling {
log.Debug().Msg("Upsert handler said to continue message handling normally")
} else {
return
return res
}
} else {
log.Debug().Stringer("existing_mxid", existing[0].MXID).Msg("Ignoring duplicate message")
return
return EventHandlingResultIgnored
}
}
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessage)
if intent == nil {
return
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessage)
if !ok {
return EventHandlingResultFailed
}
ts := getEventTS(evt)
converted, err := evt.ConvertMessage(ctx, portal, intent)
if err != nil {
if errors.Is(err, ErrIgnoringRemoteEvent) {
log.Debug().Err(err).Msg("Remote message handling was cancelled by convert function")
return EventHandlingResultIgnored
} else {
log.Err(err).Msg("Failed to convert remote message")
portal.sendRemoteErrorNotice(ctx, intent, err, ts, "message")
return EventHandlingResultFailed
}
return
}
portal.sendConvertedMessage(ctx, evt.GetID(), intent, evt.GetSender().Sender, converted, ts, getStreamOrder(evt), nil)
_, res = portal.sendConvertedMessage(ctx, evt.GetID(), intent, evt.GetSender().Sender, converted, ts, getStreamOrder(evt), nil)
if portal.currentlyTypingGhosts.Pop(intent.GetMXID()) {
intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0)
err = intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0)
if err != nil {
log.Warn().Err(err).Msg("Failed to send stop typing event after bridging message")
}
}
return
}
func (portal *Portal) sendRemoteErrorNotice(ctx context.Context, intent MatrixAPI, err error, ts time.Time, evtTypeName string) {
@ -2208,7 +2247,7 @@ func (portal *Portal) sendRemoteErrorNotice(ctx context.Context, intent MatrixAP
}
}
func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) {
func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) EventHandlingResult {
log := zerolog.Ctx(ctx)
var existing []*database.Message
if bundledEvt, ok := evt.(RemoteEventWithBundledParts); ok {
@ -2220,37 +2259,41 @@ func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, e
existing, err = portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, targetID)
if err != nil {
log.Err(err).Msg("Failed to get edit target message")
return
return EventHandlingResultFailed
}
}
if existing == nil {
log.Warn().Msg("Edit target message not found")
return
return EventHandlingResultIgnored
}
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventEdit)
if intent == nil {
return
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventEdit)
if !ok {
return EventHandlingResultFailed
} else if intent.GetMXID() != existing[0].SenderMXID {
log.Warn().
Stringer("edit_sender_mxid", intent.GetMXID()).
Stringer("original_sender_mxid", existing[0].SenderMXID).
Msg("Not bridging edit: sender doesn't match original message sender")
return
return EventHandlingResultIgnored
}
ts := getEventTS(evt)
converted, err := evt.ConvertEdit(ctx, portal, intent, existing)
if errors.Is(err, ErrIgnoringRemoteEvent) {
log.Debug().Err(err).Msg("Remote edit handling was cancelled by convert function")
return
return EventHandlingResultIgnored
} else if err != nil {
log.Err(err).Msg("Failed to convert remote edit")
portal.sendRemoteErrorNotice(ctx, intent, err, ts, "edit")
return
return EventHandlingResultFailed
}
portal.sendConvertedEdit(ctx, existing[0].ID, evt.GetSender().Sender, converted, intent, ts, getStreamOrder(evt))
res := portal.sendConvertedEdit(ctx, existing[0].ID, evt.GetSender().Sender, converted, intent, ts, getStreamOrder(evt))
if portal.currentlyTypingGhosts.Pop(intent.GetMXID()) {
intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0)
err = intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0)
if err != nil {
log.Warn().Err(err).Msg("Failed to send stop typing event after bridging edit")
}
}
return res
}
func (portal *Portal) sendConvertedEdit(
@ -2261,8 +2304,9 @@ func (portal *Portal) sendConvertedEdit(
intent MatrixAPI,
ts time.Time,
streamOrder int64,
) {
) EventHandlingResult {
log := zerolog.Ctx(ctx)
allSuccess := true
for i, part := range converted.ModifiedParts {
if part.Content.Mentions == nil {
part.Content.Mentions = &event.Mentions{}
@ -2298,6 +2342,7 @@ func (portal *Portal) sendConvertedEdit(
})
if err != nil {
log.Err(err).Stringer("part_mxid", part.Part.MXID).Msg("Failed to edit message part")
allSuccess = false
continue
} else {
log.Debug().
@ -2312,6 +2357,7 @@ func (portal *Portal) sendConvertedEdit(
err := portal.Bridge.DB.Message.Update(ctx, part.Part)
if err != nil {
log.Err(err).Int64("part_rowid", part.Part.RowID).Msg("Failed to update message part in database")
allSuccess = false
}
}
for _, part := range converted.DeletedParts {
@ -2325,6 +2371,7 @@ func (portal *Portal) sendConvertedEdit(
})
if err != nil {
log.Err(err).Stringer("part_mxid", part.MXID).Msg("Failed to redact message part deleted in edit")
allSuccess = false
} else {
log.Debug().
Stringer("redaction_event_id", resp.EventID).
@ -2335,11 +2382,19 @@ func (portal *Portal) sendConvertedEdit(
err = portal.Bridge.DB.Message.Delete(ctx, part.RowID)
if err != nil {
log.Err(err).Int64("part_rowid", part.RowID).Msg("Failed to delete message part from database")
allSuccess = false
}
}
if converted.AddedParts != nil {
portal.sendConvertedMessage(ctx, targetID, intent, senderID, converted.AddedParts, ts, streamOrder, nil)
_, res := portal.sendConvertedMessage(ctx, targetID, intent, senderID, converted.AddedParts, ts, streamOrder, nil)
if !res.Success {
allSuccess = false
}
}
if !allSuccess {
return EventHandlingResultFailed
}
return EventHandlingResultSuccess
}
func (portal *Portal) getTargetMessagePart(ctx context.Context, evt RemoteEventWithTargetMessage) (*database.Message, error) {
@ -2372,17 +2427,17 @@ func getStreamOrder(evt RemoteEvent) int64 {
return 0
}
func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) {
func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) EventHandlingResult {
log := zerolog.Ctx(ctx)
eventTS := getEventTS(evt)
targetMessage, err := portal.getTargetMessagePart(ctx, evt)
if err != nil {
log.Err(err).Msg("Failed to get target message for reaction")
return
return EventHandlingResultFailed
} else if targetMessage == nil {
// TODO use deterministic event ID as target if applicable?
log.Warn().Msg("Target message for reaction not found")
return
return EventHandlingResultIgnored
}
var existingReactions []*database.Reaction
if partTargeter, ok := evt.(RemoteEventWithTargetPart); ok {
@ -2390,6 +2445,10 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
} else {
existingReactions, err = portal.Bridge.DB.Reaction.GetAllToMessage(ctx, portal.Receiver, evt.GetTargetMessage())
}
if err != nil {
log.Err(err).Msg("Failed to get existing reactions for reaction sync")
return EventHandlingResultFailed
}
existing := make(map[networkid.UserID]map[networkid.EmojiID]*database.Reaction)
for _, existingReaction := range existingReactions {
if existing[existingReaction.SenderID] == nil {
@ -2398,9 +2457,13 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
existing[existingReaction.SenderID][existingReaction.EmojiID] = existingReaction
}
doAddReaction := func(new *BackfillReaction, intent MatrixAPI) MatrixAPI {
doAddReaction := func(new *BackfillReaction, intent MatrixAPI) {
if intent == nil {
intent = portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync)
var ok bool
intent, ok = portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync)
if !ok {
return
}
}
portal.sendConvertedReaction(
ctx, new.Sender.Sender, intent, targetMessage, new.EmojiID, new.Emoji,
@ -2411,7 +2474,6 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
Time("reaction_ts", new.Timestamp)
},
)
return intent
}
doRemoveReaction := func(old *database.Reaction, intent MatrixAPI, deleteRow bool) {
if intent == nil && old.SenderMXID != "" {
@ -2445,7 +2507,10 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
}
}
doOverwriteReaction := func(new *BackfillReaction, old *database.Reaction) {
intent := portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync)
intent, ok := portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync)
if !ok {
return
}
doRemoveReaction(old, intent, false)
doAddReaction(new, intent)
}
@ -2496,30 +2561,34 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
}
}
}
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) {
func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) EventHandlingResult {
log := zerolog.Ctx(ctx)
targetMessage, err := portal.getTargetMessagePart(ctx, evt)
if err != nil {
log.Err(err).Msg("Failed to get target message for reaction")
return
return EventHandlingResultFailed
} else if targetMessage == nil {
// TODO use deterministic event ID as target if applicable?
log.Warn().Msg("Target message for reaction not found")
return
return EventHandlingResultIgnored
}
emoji, emojiID := evt.GetReactionEmoji()
existingReaction, err := portal.Bridge.DB.Reaction.GetByID(ctx, portal.Receiver, targetMessage.ID, targetMessage.PartID, evt.GetSender().Sender, emojiID)
if err != nil {
log.Err(err).Msg("Failed to check if reaction is a duplicate")
return
return EventHandlingResultFailed
} else if existingReaction != nil && (emojiID != "" || existingReaction.Emoji == emoji) {
log.Debug().Msg("Ignoring duplicate reaction")
return
return EventHandlingResultIgnored
}
ts := getEventTS(evt)
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReaction)
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReaction)
if !ok {
return EventHandlingResultFailed
}
var extra map[string]any
if extraContentProvider, ok := evt.(RemoteReactionWithExtraContent); ok {
extra = extraContentProvider.GetReactionExtraContent()
@ -2538,14 +2607,14 @@ func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogi
log.Err(err).Msg("Failed to redact old reaction")
}
}
portal.sendConvertedReaction(ctx, evt.GetSender().Sender, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extra, nil)
return portal.sendConvertedReaction(ctx, evt.GetSender().Sender, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extra, nil)
}
func (portal *Portal) sendConvertedReaction(
ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message,
emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any,
logContext func(*zerolog.Event) *zerolog.Event,
) {
) EventHandlingResult {
if logContext == nil {
logContext = func(e *zerolog.Event) *zerolog.Event {
return e
@ -2580,7 +2649,7 @@ func (portal *Portal) sendConvertedReaction(
})
if err != nil {
logContext(log.Err(err)).Msg("Failed to send reaction to Matrix")
return
return EventHandlingResultFailed
}
logContext(log.Debug()).
Stringer("event_id", resp.EventID).
@ -2589,7 +2658,9 @@ func (portal *Portal) sendConvertedReaction(
err = portal.Bridge.DB.Reaction.Upsert(ctx, dbReaction)
if err != nil {
logContext(log.Err(err)).Msg("Failed to save reaction to database")
return EventHandlingResultFailed
}
return EventHandlingResultSuccess
}
func (portal *Portal) getIntentForMXID(ctx context.Context, userID id.UserID) (MatrixAPI, error) {
@ -2608,22 +2679,26 @@ func (portal *Portal) getIntentForMXID(ctx context.Context, userID id.UserID) (M
}
}
func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) {
func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) EventHandlingResult {
log := zerolog.Ctx(ctx)
targetReaction, err := portal.getTargetReaction(ctx, evt)
if err != nil {
log.Err(err).Msg("Failed to get target reaction for removal")
return
return EventHandlingResultFailed
} else if targetReaction == nil {
log.Warn().Msg("Target reaction not found")
return
return EventHandlingResultIgnored
}
intent, err := portal.getIntentForMXID(ctx, targetReaction.SenderMXID)
if err != nil {
log.Err(err).Stringer("sender_mxid", targetReaction.SenderMXID).Msg("Failed to get intent for removing reaction")
}
if intent == nil {
intent = portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReactionRemove)
var ok bool
intent, ok = portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReactionRemove)
if !ok {
return EventHandlingResultFailed
}
}
ts := getEventTS(evt)
_, err = intent.SendMessage(ctx, portal.MXID, event.EventRedaction, &event.Content{
@ -2633,22 +2708,24 @@ func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *Us
}, &MatrixSendExtra{Timestamp: ts, ReactionMeta: targetReaction})
if err != nil {
log.Err(err).Stringer("reaction_mxid", targetReaction.MXID).Msg("Failed to redact reaction")
return EventHandlingResultFailed
}
err = portal.Bridge.DB.Reaction.Delete(ctx, targetReaction)
if err != nil {
log.Err(err).Msg("Failed to delete target reaction from database")
}
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) {
func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) EventHandlingResult {
log := zerolog.Ctx(ctx)
targetParts, err := portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, evt.GetTargetMessage())
if err != nil {
log.Err(err).Msg("Failed to get target message for removal")
return
return EventHandlingResultFailed
} else if len(targetParts) == 0 {
log.Debug().Msg("Target message not found")
return
return EventHandlingResultIgnored
}
onlyForMeProvider, ok := evt.(RemoteDeleteOnlyForMe)
onlyForMe := ok && onlyForMeProvider.DeleteOnlyForMe()
@ -2656,7 +2733,10 @@ func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *Use
// TODO check if there are other user logins before deleting
}
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageRemove)
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageRemove)
if !ok {
return EventHandlingResultFailed
}
if intent == portal.Bridge.Bot && len(targetParts) > 0 {
senderIntent, err := portal.getIntentForMXID(ctx, targetParts[0].SenderMXID)
if err != nil {
@ -2665,15 +2745,17 @@ func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *Use
intent = senderIntent
}
}
portal.redactMessageParts(ctx, targetParts, intent, getEventTS(evt))
res := portal.redactMessageParts(ctx, targetParts, intent, getEventTS(evt))
err = portal.Bridge.DB.Message.DeleteAllParts(ctx, portal.Receiver, evt.GetTargetMessage())
if err != nil {
log.Err(err).Msg("Failed to delete target message from database")
}
return res
}
func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) {
func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) EventHandlingResult {
log := zerolog.Ctx(ctx)
var anyFailed bool
for _, part := range parts {
if part.HasFakeMXID() {
continue
@ -2685,6 +2767,7 @@ func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.
}, &MatrixSendExtra{Timestamp: ts, MessageMeta: part})
if err != nil {
log.Err(err).Stringer("part_mxid", part.MXID).Msg("Failed to redact message part")
anyFailed = true
} else {
log.Debug().
Stringer("redaction_event_id", resp.EventID).
@ -2693,9 +2776,13 @@ func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.
Msg("Sent redaction of message part to Matrix")
}
}
if anyFailed {
return EventHandlingResultFailed
}
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) {
func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) EventHandlingResult {
log := zerolog.Ctx(ctx)
var err error
var lastTarget *database.Message
@ -2705,7 +2792,7 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
if err != nil {
log.Err(err).Str("last_target_id", string(lastTargetID)).
Msg("Failed to get last target message for read receipt")
return
return EventHandlingResultFailed
} else if lastTarget == nil {
log.Debug().Str("last_target_id", string(lastTargetID)).
Msg("Last target message not found")
@ -2724,7 +2811,7 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
if err != nil {
log.Err(err).Str("target_id", string(targetID)).
Msg("Failed to get target message for read receipt")
return
return EventHandlingResultFailed
} else if target != nil && !target.HasFakeMXID() && (lastTarget == nil || target.Timestamp.After(lastTarget.Timestamp)) {
lastTarget = target
}
@ -2737,14 +2824,17 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
}
}
sender := evt.GetSender()
intent := portal.GetIntentFor(ctx, sender, source, RemoteEventReadReceipt)
intent, ok := portal.GetIntentFor(ctx, sender, source, RemoteEventReadReceipt)
if !ok {
return EventHandlingResultFailed
}
var addTargetLog func(evt *zerolog.Event) *zerolog.Event
if lastTarget == nil {
sevt, evtOK := evt.(RemoteReadReceiptWithStreamOrder)
soIntent, soIntentOK := intent.(StreamOrderReadingMatrixAPI)
if !evtOK || !soIntentOK || sevt.GetReadUpToStreamOrder() == 0 {
log.Warn().Msg("No target message found for read receipt")
return
return EventHandlingResultIgnored
}
targetStreamOrder := sevt.GetReadUpToStreamOrder()
addTargetLog = func(evt *zerolog.Event) *zerolog.Event {
@ -2759,40 +2849,47 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
}
if err != nil {
addTargetLog(log.Err(err)).Msg("Failed to bridge read receipt")
return EventHandlingResultFailed
} else {
addTargetLog(log.Debug()).Msg("Bridged read receipt")
}
if sender.IsFromMe {
portal.Bridge.DisappearLoop.StartAll(ctx, portal.MXID)
}
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) {
func (portal *Portal) handleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) EventHandlingResult {
if !evt.GetSender().IsFromMe {
zerolog.Ctx(ctx).Warn().Msg("Ignoring mark unread event from non-self user")
return
return EventHandlingResultIgnored
}
dp := source.User.DoublePuppet(ctx)
if dp == nil {
return
return EventHandlingResultIgnored
}
err := dp.MarkUnread(ctx, portal.MXID, evt.GetUnread())
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to bridge mark unread event")
return EventHandlingResultFailed
}
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) {
func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) EventHandlingResult {
if portal.RoomType != database.RoomTypeDM || evt.GetSender().Sender != portal.OtherUserID {
return
return EventHandlingResultIgnored
}
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventDeliveryReceipt)
if !ok {
return EventHandlingResultFailed
}
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventDeliveryReceipt)
log := zerolog.Ctx(ctx)
for _, target := range evt.GetReceiptTargets() {
targetParts, err := portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, target)
if err != nil {
log.Err(err).Str("target_id", string(target)).Msg("Failed to get target message for delivery receipt")
continue
return EventHandlingResultFailed
} else if len(targetParts) == 0 {
continue
} else if _, sentByGhost := portal.Bridge.Matrix.ParseGhostMXID(targetParts[0].SenderMXID); sentByGhost {
@ -2811,36 +2908,43 @@ func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *U
})
}
}
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) {
func (portal *Portal) handleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) EventHandlingResult {
var typingType TypingType
if typedEvt, ok := evt.(RemoteTypingWithType); ok {
typingType = typedEvt.GetTypingType()
}
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventTyping)
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventTyping)
if !ok {
return EventHandlingResultFailed
}
timeout := evt.GetTimeout()
err := intent.MarkTyping(ctx, portal.MXID, typingType, timeout)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to bridge typing event")
return EventHandlingResultFailed
}
if timeout == 0 {
portal.currentlyTypingGhosts.Remove(intent.GetMXID())
} else {
portal.currentlyTypingGhosts.Add(intent.GetMXID())
}
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) {
func (portal *Portal) handleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) EventHandlingResult {
info, err := evt.GetChatInfoChange(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get chat info change")
return
return EventHandlingResultFailed
}
portal.ProcessChatInfoChange(ctx, evt.GetSender(), source, info, getEventTS(evt))
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) {
func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) EventHandlingResult {
log := zerolog.Ctx(ctx)
infoProvider, ok := evt.(RemoteChatResyncWithInfo)
if ok {
@ -2869,15 +2973,16 @@ func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLo
portal.doForwardBackfill(ctx, source, latestMessage, bundle)
}
}
return EventHandlingResultSuccess
}
func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) {
func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) EventHandlingResult {
log := zerolog.Ctx(ctx)
if portal.Receiver == "" && evt.DeleteOnlyForMe() {
logins, err := portal.Bridge.DB.UserPortal.GetAllInPortal(ctx, portal.PortalKey)
if err != nil {
log.Err(err).Msg("Failed to check if portal has other logins")
return
return EventHandlingResultFailed
}
var ownUP *database.UserPortal
logins = slices.DeleteFunc(logins, func(up *database.UserPortal) bool {
@ -2907,31 +3012,35 @@ func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLo
)
if err != nil {
log.Err(err).Msg("Failed to send leave state event for user after remote chat delete")
return EventHandlingResultFailed
} else {
log.Debug().Msg("Sent leave state event for user after remote chat delete")
return EventHandlingResultSuccess
}
return
}
}
err := portal.Delete(ctx)
if err != nil {
log.Err(err).Msg("Failed to delete portal from database")
return
return EventHandlingResultFailed
}
err = portal.Bridge.Bot.DeleteRoom(ctx, portal.MXID, false)
if err != nil {
log.Err(err).Msg("Failed to delete Matrix room")
return EventHandlingResultFailed
} else {
log.Info().Msg("Deleted room after remote chat delete event")
return EventHandlingResultSuccess
}
}
func (portal *Portal) handleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) {
func (portal *Portal) handleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) (res EventHandlingResult) {
//data, err := backfill.GetBackfillData(ctx, portal)
//if err != nil {
// zerolog.Ctx(ctx).Err(err).Msg("Failed to get backfill data")
// return
//}
return
}
type ChatInfoChange struct {
@ -2944,7 +3053,10 @@ type ChatInfoChange struct {
}
func (portal *Portal) ProcessChatInfoChange(ctx context.Context, sender EventSender, source *UserLogin, change *ChatInfoChange, ts time.Time) {
intent := portal.GetIntentFor(ctx, sender, source, RemoteEventChatInfoChange)
intent, ok := portal.GetIntentFor(ctx, sender, source, RemoteEventChatInfoChange)
if !ok {
return
}
if change.ChatInfo != nil {
portal.UpdateInfo(ctx, change.ChatInfo, source, intent, ts)
}
@ -3339,7 +3451,10 @@ func (portal *Portal) getInitialMemberList(ctx context.Context, members *ChatMem
ghost.UpdateInfo(ctx, member.UserInfo)
}
}
intent, extraUserID := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0)
intent, extraUserID, err := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0)
if err != nil {
return nil, nil, err
}
if extraUserID != "" {
invite = append(invite, extraUserID)
if member.PowerLevel != nil {
@ -3535,7 +3650,10 @@ func (portal *Portal) syncParticipants(
ghost.UpdateInfo(ctx, member.UserInfo)
}
}
intent, extraUserID := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0)
intent, extraUserID, err := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0)
if err != nil {
return err
}
if intent != nil {
syncIntent(intent, member)
}

View file

@ -323,7 +323,10 @@ func (portal *Portal) compileBatchMessage(ctx context.Context, source *UserLogin
if len(msg.Parts) == 0 {
return
}
intent := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage)
intent, ok := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage)
if !ok {
return
}
replyTo, threadRoot, prevThreadEvent := portal.getRelationMeta(ctx, msg.ID, msg.ReplyTo, msg.ThreadRoot, true)
if threadRoot != nil && out.PrevThreadEvents[*msg.ThreadRoot] != "" {
prevThreadEvent.MXID = out.PrevThreadEvents[*msg.ThreadRoot]
@ -387,7 +390,10 @@ func (portal *Portal) compileBatchMessage(ctx context.Context, source *UserLogin
}
slices.Sort(partIDs)
for _, reaction := range msg.Reactions {
reactionIntent := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReactionRemove)
reactionIntent, ok := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReactionRemove)
if !ok {
continue
}
if reaction.TargetPart == nil {
reaction.TargetPart = &partIDs[0]
}
@ -513,8 +519,11 @@ func (portal *Portal) sendBatch(ctx context.Context, source *UserLogin, messages
func (portal *Portal) sendLegacyBackfill(ctx context.Context, source *UserLogin, messages []*BackfillMessage, markRead bool) {
var lastPart id.EventID
for _, msg := range messages {
intent := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage)
dbMessages := portal.sendConvertedMessage(ctx, msg.ID, intent, msg.Sender.Sender, msg.ConvertedMessage, msg.Timestamp, msg.StreamOrder, func(z *zerolog.Event) *zerolog.Event {
intent, ok := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage)
if !ok {
continue
}
dbMessages, _ := portal.sendConvertedMessage(ctx, msg.ID, intent, msg.Sender.Sender, msg.ConvertedMessage, msg.Timestamp, msg.StreamOrder, func(z *zerolog.Event) *zerolog.Event {
return z.
Str("message_id", string(msg.ID)).
Any("sender_id", msg.Sender).
@ -523,7 +532,10 @@ func (portal *Portal) sendLegacyBackfill(ctx context.Context, source *UserLogin,
if len(dbMessages) > 0 {
lastPart = dbMessages[len(dbMessages)-1].MXID
for _, reaction := range msg.Reactions {
reactionIntent := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReaction)
reactionIntent, ok := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReaction)
if !ok {
continue
}
targetPart := dbMessages[0]
if reaction.TargetPart != nil {
targetPartIdx := slices.IndexFunc(dbMessages, func(dbMsg *database.Message) bool {

View file

@ -29,23 +29,23 @@ func (portal *PortalInternals) UpdateLogger() {
(*Portal)(portal).updateLogger()
}
func (portal *PortalInternals) QueueEvent(ctx context.Context, evt portalEvent) {
(*Portal)(portal).queueEvent(ctx, evt)
func (portal *PortalInternals) QueueEvent(ctx context.Context, evt portalEvent) EventHandlingResult {
return (*Portal)(portal).queueEvent(ctx, evt)
}
func (portal *PortalInternals) EventLoop() {
(*Portal)(portal).eventLoop()
}
func (portal *PortalInternals) HandleSingleEventAsync(idx int, rawEvt any) {
(*Portal)(portal).handleSingleEventAsync(idx, rawEvt)
func (portal *PortalInternals) HandleSingleEventAsync(idx int, rawEvt any) (outerRes EventHandlingResult) {
return (*Portal)(portal).handleSingleEventAsync(idx, rawEvt)
}
func (portal *PortalInternals) GetEventCtxWithLog(rawEvt any, idx int) context.Context {
return (*Portal)(portal).getEventCtxWithLog(rawEvt, idx)
}
func (portal *PortalInternals) HandleSingleEvent(ctx context.Context, rawEvt any, doneCallback func()) {
func (portal *PortalInternals) HandleSingleEvent(ctx context.Context, rawEvt any, doneCallback func(EventHandlingResult)) {
(*Portal)(portal).handleSingleEvent(ctx, rawEvt, doneCallback)
}
@ -129,11 +129,11 @@ func (portal *PortalInternals) HandleMatrixRedaction(ctx context.Context, sender
(*Portal)(portal).handleMatrixRedaction(ctx, sender, origSender, evt)
}
func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) {
(*Portal)(portal).handleRemoteEvent(ctx, source, evtType, evt)
func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) (res EventHandlingResult) {
return (*Portal)(portal).handleRemoteEvent(ctx, source, evtType, evt)
}
func (portal *PortalInternals) GetIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID) {
func (portal *PortalInternals) GetIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID, err error) {
return (*Portal)(portal).getIntentAndUserMXIDFor(ctx, sender, source, otherLogins, evtType)
}
@ -145,7 +145,7 @@ func (portal *PortalInternals) ApplyRelationMeta(ctx context.Context, content *e
(*Portal)(portal).applyRelationMeta(ctx, content, replyTo, threadRoot, prevThreadEvent)
}
func (portal *PortalInternals) SendConvertedMessage(ctx context.Context, id networkid.MessageID, intent MatrixAPI, senderID networkid.UserID, converted *ConvertedMessage, ts time.Time, streamOrder int64, logContext func(*zerolog.Event) *zerolog.Event) []*database.Message {
func (portal *PortalInternals) SendConvertedMessage(ctx context.Context, id networkid.MessageID, intent MatrixAPI, senderID networkid.UserID, converted *ConvertedMessage, ts time.Time, streamOrder int64, logContext func(*zerolog.Event) *zerolog.Event) ([]*database.Message, EventHandlingResult) {
return (*Portal)(portal).sendConvertedMessage(ctx, id, intent, senderID, converted, ts, streamOrder, logContext)
}
@ -153,24 +153,24 @@ func (portal *PortalInternals) CheckPendingMessage(ctx context.Context, evt Remo
return (*Portal)(portal).checkPendingMessage(ctx, evt)
}
func (portal *PortalInternals) HandleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) bool {
func (portal *PortalInternals) HandleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) (handleRes EventHandlingResult, continueHandling bool) {
return (*Portal)(portal).handleRemoteUpsert(ctx, source, evt, existing)
}
func (portal *PortalInternals) HandleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) {
(*Portal)(portal).handleRemoteMessage(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) (res EventHandlingResult) {
return (*Portal)(portal).handleRemoteMessage(ctx, source, evt)
}
func (portal *PortalInternals) SendRemoteErrorNotice(ctx context.Context, intent MatrixAPI, err error, ts time.Time, evtTypeName string) {
(*Portal)(portal).sendRemoteErrorNotice(ctx, intent, err, ts, evtTypeName)
}
func (portal *PortalInternals) HandleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) {
(*Portal)(portal).handleRemoteEdit(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) EventHandlingResult {
return (*Portal)(portal).handleRemoteEdit(ctx, source, evt)
}
func (portal *PortalInternals) SendConvertedEdit(ctx context.Context, targetID networkid.MessageID, senderID networkid.UserID, converted *ConvertedEdit, intent MatrixAPI, ts time.Time, streamOrder int64) {
(*Portal)(portal).sendConvertedEdit(ctx, targetID, senderID, converted, intent, ts, streamOrder)
func (portal *PortalInternals) SendConvertedEdit(ctx context.Context, targetID networkid.MessageID, senderID networkid.UserID, converted *ConvertedEdit, intent MatrixAPI, ts time.Time, streamOrder int64) EventHandlingResult {
return (*Portal)(portal).sendConvertedEdit(ctx, targetID, senderID, converted, intent, ts, streamOrder)
}
func (portal *PortalInternals) GetTargetMessagePart(ctx context.Context, evt RemoteEventWithTargetMessage) (*database.Message, error) {
@ -181,64 +181,64 @@ func (portal *PortalInternals) GetTargetReaction(ctx context.Context, evt Remote
return (*Portal)(portal).getTargetReaction(ctx, evt)
}
func (portal *PortalInternals) HandleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) {
(*Portal)(portal).handleRemoteReactionSync(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) EventHandlingResult {
return (*Portal)(portal).handleRemoteReactionSync(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) {
(*Portal)(portal).handleRemoteReaction(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) EventHandlingResult {
return (*Portal)(portal).handleRemoteReaction(ctx, source, evt)
}
func (portal *PortalInternals) SendConvertedReaction(ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message, emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any, logContext func(*zerolog.Event) *zerolog.Event) {
(*Portal)(portal).sendConvertedReaction(ctx, senderID, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extraContent, logContext)
func (portal *PortalInternals) SendConvertedReaction(ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message, emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any, logContext func(*zerolog.Event) *zerolog.Event) EventHandlingResult {
return (*Portal)(portal).sendConvertedReaction(ctx, senderID, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extraContent, logContext)
}
func (portal *PortalInternals) GetIntentForMXID(ctx context.Context, userID id.UserID) (MatrixAPI, error) {
return (*Portal)(portal).getIntentForMXID(ctx, userID)
}
func (portal *PortalInternals) HandleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) {
(*Portal)(portal).handleRemoteReactionRemove(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) EventHandlingResult {
return (*Portal)(portal).handleRemoteReactionRemove(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) {
(*Portal)(portal).handleRemoteMessageRemove(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) EventHandlingResult {
return (*Portal)(portal).handleRemoteMessageRemove(ctx, source, evt)
}
func (portal *PortalInternals) RedactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) {
(*Portal)(portal).redactMessageParts(ctx, parts, intent, ts)
func (portal *PortalInternals) RedactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) EventHandlingResult {
return (*Portal)(portal).redactMessageParts(ctx, parts, intent, ts)
}
func (portal *PortalInternals) HandleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) {
(*Portal)(portal).handleRemoteReadReceipt(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) EventHandlingResult {
return (*Portal)(portal).handleRemoteReadReceipt(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) {
(*Portal)(portal).handleRemoteMarkUnread(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) EventHandlingResult {
return (*Portal)(portal).handleRemoteMarkUnread(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) {
(*Portal)(portal).handleRemoteDeliveryReceipt(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) EventHandlingResult {
return (*Portal)(portal).handleRemoteDeliveryReceipt(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) {
(*Portal)(portal).handleRemoteTyping(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) EventHandlingResult {
return (*Portal)(portal).handleRemoteTyping(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) {
(*Portal)(portal).handleRemoteChatInfoChange(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) EventHandlingResult {
return (*Portal)(portal).handleRemoteChatInfoChange(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) {
(*Portal)(portal).handleRemoteChatResync(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) EventHandlingResult {
return (*Portal)(portal).handleRemoteChatResync(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) {
(*Portal)(portal).handleRemoteChatDelete(ctx, source, evt)
func (portal *PortalInternals) HandleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) EventHandlingResult {
return (*Portal)(portal).handleRemoteChatDelete(ctx, source, evt)
}
func (portal *PortalInternals) HandleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) {
(*Portal)(portal).handleRemoteBackfill(ctx, source, backfill)
func (portal *PortalInternals) HandleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) (res EventHandlingResult) {
return (*Portal)(portal).handleRemoteBackfill(ctx, source, backfill)
}
func (portal *PortalInternals) UpdateName(ctx context.Context, name string, sender MatrixAPI, ts time.Time) bool {

View file

@ -151,11 +151,24 @@ func (br *Bridge) QueueMatrixEvent(ctx context.Context, evt *event.Event) {
}
}
func (ul *UserLogin) QueueRemoteEvent(evt RemoteEvent) {
ul.Bridge.QueueRemoteEvent(ul, evt)
type EventHandlingResult struct {
Success bool
Ignored bool
Queued bool
}
func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) {
var (
EventHandlingResultFailed = EventHandlingResult{}
EventHandlingResultQueued = EventHandlingResult{Success: true, Queued: true}
EventHandlingResultSuccess = EventHandlingResult{Success: true}
EventHandlingResultIgnored = EventHandlingResult{Success: true, Ignored: true}
)
func (ul *UserLogin) QueueRemoteEvent(evt RemoteEvent) EventHandlingResult {
return ul.Bridge.QueueRemoteEvent(ul, evt)
}
func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) (res EventHandlingResult) {
log := login.Log
ctx := log.WithContext(br.BackgroundCtx)
maybeUncertain, ok := evt.(RemoteEventWithUncertainPortalReceiver)
@ -182,7 +195,7 @@ func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) {
}
// TODO put this in a better place, and maybe cache to avoid constant db queries
login.MarkInPortal(ctx, portal)
portal.queueEvent(ctx, &portalRemoteEvent{
return portal.queueEvent(ctx, &portalRemoteEvent{
evt: evt,
source: login,
})