mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
bridgev2/portal: return result of handling remote events
This commit is contained in:
parent
1878700a9d
commit
8115db9af6
4 changed files with 316 additions and 175 deletions
|
|
@ -283,19 +283,21 @@ func (br *Bridge) GetExistingPortalByKey(ctx context.Context, key networkid.Port
|
|||
return br.loadPortal(ctx, db, err, nil)
|
||||
}
|
||||
|
||||
func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) {
|
||||
func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) EventHandlingResult {
|
||||
if PortalEventBuffer == 0 {
|
||||
portal.eventsLock.Lock()
|
||||
defer portal.eventsLock.Unlock()
|
||||
portal.eventIdx++
|
||||
portal.handleSingleEventAsync(portal.eventIdx, evt)
|
||||
return portal.handleSingleEventAsync(portal.eventIdx, evt)
|
||||
} else {
|
||||
select {
|
||||
case portal.events <- evt:
|
||||
return EventHandlingResultQueued
|
||||
default:
|
||||
zerolog.Ctx(ctx).Error().
|
||||
Str("portal_id", string(portal.ID)).
|
||||
Msg("Portal event channel is full")
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -313,19 +315,23 @@ func (portal *Portal) eventLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) {
|
||||
func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) (outerRes EventHandlingResult) {
|
||||
ctx := portal.getEventCtxWithLog(rawEvt, idx)
|
||||
if _, isCreate := rawEvt.(*portalCreateEvent); isCreate {
|
||||
portal.handleSingleEvent(ctx, rawEvt, func() {})
|
||||
portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) {
|
||||
outerRes = res
|
||||
})
|
||||
} else if portal.Bridge.Config.AsyncEvents {
|
||||
go portal.handleSingleEvent(ctx, rawEvt, func() {})
|
||||
outerRes = EventHandlingResultQueued
|
||||
go portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) {})
|
||||
} else {
|
||||
log := zerolog.Ctx(ctx)
|
||||
doneCh := make(chan struct{})
|
||||
var backgrounded atomic.Bool
|
||||
start := time.Now()
|
||||
var handleDuration time.Duration
|
||||
go portal.handleSingleEvent(ctx, rawEvt, func() {
|
||||
go portal.handleSingleEvent(ctx, rawEvt, func(res EventHandlingResult) {
|
||||
outerRes = res
|
||||
handleDuration = time.Since(start)
|
||||
close(doneCh)
|
||||
if backgrounded.Load() {
|
||||
|
|
@ -358,6 +364,7 @@ func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) {
|
|||
Msg("Event handling is taking too long, continuing in background")
|
||||
backgrounded.Store(true)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (portal *Portal) getEventCtxWithLog(rawEvt any, idx int) context.Context {
|
||||
|
|
@ -404,10 +411,11 @@ func (portal *Portal) getEventCtxWithLog(rawEvt any, idx int) context.Context {
|
|||
return logWith.Logger().WithContext(portal.Bridge.BackgroundCtx)
|
||||
}
|
||||
|
||||
func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCallback func()) {
|
||||
func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCallback func(res EventHandlingResult)) {
|
||||
log := zerolog.Ctx(ctx)
|
||||
var res EventHandlingResult
|
||||
defer func() {
|
||||
doneCallback()
|
||||
doneCallback(res)
|
||||
if err := recover(); err != nil {
|
||||
logEvt := log.Error()
|
||||
if realErr, ok := err.(error); ok {
|
||||
|
|
@ -432,9 +440,11 @@ func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCal
|
|||
case *portalMatrixEvent:
|
||||
portal.handleMatrixEvent(ctx, evt.sender, evt.evt)
|
||||
case *portalRemoteEvent:
|
||||
portal.handleRemoteEvent(ctx, evt.source, evt.evtType, evt.evt)
|
||||
res = portal.handleRemoteEvent(ctx, evt.source, evt.evtType, evt.evt)
|
||||
case *portalCreateEvent:
|
||||
evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil))
|
||||
err := portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil)
|
||||
res.Success = err == nil
|
||||
evt.cb(err)
|
||||
default:
|
||||
panic(fmt.Errorf("illegal type %T in eventLoop", evt))
|
||||
}
|
||||
|
|
@ -627,7 +637,7 @@ func (portal *Portal) handleMatrixReceipts(ctx context.Context, evt *event.Event
|
|||
for userID, receipt := range readReceipts {
|
||||
sender, err := portal.Bridge.GetUserByMXID(ctx, userID)
|
||||
if err != nil {
|
||||
// TODO log
|
||||
zerolog.Ctx(ctx).Err(err).Msg("Failed to get user to handle read receipt")
|
||||
return
|
||||
}
|
||||
portal.handleMatrixReadReceipt(ctx, sender, evtID, receipt)
|
||||
|
|
@ -1752,13 +1762,13 @@ func (portal *Portal) handleMatrixRedaction(ctx context.Context, sender *UserLog
|
|||
portal.sendSuccessStatus(ctx, evt, 0, "")
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) {
|
||||
func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) (res EventHandlingResult) {
|
||||
log := zerolog.Ctx(ctx)
|
||||
if portal.MXID == "" {
|
||||
mcp, ok := evt.(RemoteEventThatMayCreatePortal)
|
||||
if !ok || !mcp.ShouldCreatePortal() {
|
||||
log.Debug().Msg("Dropping event as portal doesn't exist")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
infoProvider, ok := mcp.(RemoteChatResyncWithInfo)
|
||||
var info *ChatInfo
|
||||
|
|
@ -1777,8 +1787,7 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin,
|
|||
err = portal.createMatrixRoomInLoop(ctx, source, info, bundle)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to create portal to handle event")
|
||||
// TODO error
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
if evtType == RemoteEventChatResync {
|
||||
log.Debug().Msg("Not handling chat resync event further as portal was created by it")
|
||||
|
|
@ -1786,7 +1795,7 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin,
|
|||
if ok {
|
||||
postHandler.PostHandle(ctx, portal)
|
||||
}
|
||||
return
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
}
|
||||
preHandler, ok := evt.(RemotePreHandler)
|
||||
|
|
@ -1798,33 +1807,33 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin,
|
|||
case RemoteEventUnknown:
|
||||
log.Debug().Msg("Ignoring remote event with type unknown")
|
||||
case RemoteEventMessage, RemoteEventMessageUpsert:
|
||||
portal.handleRemoteMessage(ctx, source, evt.(RemoteMessage))
|
||||
res = portal.handleRemoteMessage(ctx, source, evt.(RemoteMessage))
|
||||
case RemoteEventEdit:
|
||||
portal.handleRemoteEdit(ctx, source, evt.(RemoteEdit))
|
||||
res = portal.handleRemoteEdit(ctx, source, evt.(RemoteEdit))
|
||||
case RemoteEventReaction:
|
||||
portal.handleRemoteReaction(ctx, source, evt.(RemoteReaction))
|
||||
res = portal.handleRemoteReaction(ctx, source, evt.(RemoteReaction))
|
||||
case RemoteEventReactionRemove:
|
||||
portal.handleRemoteReactionRemove(ctx, source, evt.(RemoteReactionRemove))
|
||||
res = portal.handleRemoteReactionRemove(ctx, source, evt.(RemoteReactionRemove))
|
||||
case RemoteEventReactionSync:
|
||||
portal.handleRemoteReactionSync(ctx, source, evt.(RemoteReactionSync))
|
||||
res = portal.handleRemoteReactionSync(ctx, source, evt.(RemoteReactionSync))
|
||||
case RemoteEventMessageRemove:
|
||||
portal.handleRemoteMessageRemove(ctx, source, evt.(RemoteMessageRemove))
|
||||
res = portal.handleRemoteMessageRemove(ctx, source, evt.(RemoteMessageRemove))
|
||||
case RemoteEventReadReceipt:
|
||||
portal.handleRemoteReadReceipt(ctx, source, evt.(RemoteReadReceipt))
|
||||
res = portal.handleRemoteReadReceipt(ctx, source, evt.(RemoteReadReceipt))
|
||||
case RemoteEventMarkUnread:
|
||||
portal.handleRemoteMarkUnread(ctx, source, evt.(RemoteMarkUnread))
|
||||
res = portal.handleRemoteMarkUnread(ctx, source, evt.(RemoteMarkUnread))
|
||||
case RemoteEventDeliveryReceipt:
|
||||
portal.handleRemoteDeliveryReceipt(ctx, source, evt.(RemoteDeliveryReceipt))
|
||||
res = portal.handleRemoteDeliveryReceipt(ctx, source, evt.(RemoteDeliveryReceipt))
|
||||
case RemoteEventTyping:
|
||||
portal.handleRemoteTyping(ctx, source, evt.(RemoteTyping))
|
||||
res = portal.handleRemoteTyping(ctx, source, evt.(RemoteTyping))
|
||||
case RemoteEventChatInfoChange:
|
||||
portal.handleRemoteChatInfoChange(ctx, source, evt.(RemoteChatInfoChange))
|
||||
res = portal.handleRemoteChatInfoChange(ctx, source, evt.(RemoteChatInfoChange))
|
||||
case RemoteEventChatResync:
|
||||
portal.handleRemoteChatResync(ctx, source, evt.(RemoteChatResync))
|
||||
res = portal.handleRemoteChatResync(ctx, source, evt.(RemoteChatResync))
|
||||
case RemoteEventChatDelete:
|
||||
portal.handleRemoteChatDelete(ctx, source, evt.(RemoteChatDelete))
|
||||
res = portal.handleRemoteChatDelete(ctx, source, evt.(RemoteChatDelete))
|
||||
case RemoteEventBackfill:
|
||||
portal.handleRemoteBackfill(ctx, source, evt.(RemoteBackfill))
|
||||
res = portal.handleRemoteBackfill(ctx, source, evt.(RemoteBackfill))
|
||||
default:
|
||||
log.Warn().Msg("Got remote event with unknown type")
|
||||
}
|
||||
|
|
@ -1832,9 +1841,10 @@ func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin,
|
|||
if ok {
|
||||
postHandler.PostHandle(ctx, portal)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID) {
|
||||
func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID, err error) {
|
||||
var ghost *Ghost
|
||||
if !sender.IsFromMe && sender.ForceDMUser && portal.OtherUserID != "" && sender.Sender != portal.OtherUserID {
|
||||
zerolog.Ctx(ctx).Warn().
|
||||
|
|
@ -1843,21 +1853,20 @@ func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventS
|
|||
Msg("Overriding event sender with primary other user in DM portal")
|
||||
// Ensure the ghost row exists anyway to prevent foreign key errors when saving messages
|
||||
// TODO it'd probably be better to override the sender in the saved message, but that's more effort
|
||||
_, err := portal.Bridge.GetGhostByID(ctx, sender.Sender)
|
||||
_, err = portal.Bridge.GetGhostByID(ctx, sender.Sender)
|
||||
if err != nil {
|
||||
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get ghost with original user ID")
|
||||
return
|
||||
}
|
||||
sender.Sender = portal.OtherUserID
|
||||
}
|
||||
if sender.Sender != "" {
|
||||
var err error
|
||||
ghost, err = portal.Bridge.GetGhostByID(ctx, sender.Sender)
|
||||
if err != nil {
|
||||
zerolog.Ctx(ctx).Err(err).Msg("Failed to get ghost for message sender")
|
||||
return
|
||||
} else {
|
||||
ghost.UpdateInfoIfNecessary(ctx, source, evtType)
|
||||
}
|
||||
ghost.UpdateInfoIfNecessary(ctx, source, evtType)
|
||||
}
|
||||
if sender.IsFromMe {
|
||||
intent = source.User.DoublePuppet(ctx)
|
||||
|
|
@ -1892,15 +1901,21 @@ func (portal *Portal) getIntentAndUserMXIDFor(ctx context.Context, sender EventS
|
|||
return
|
||||
}
|
||||
|
||||
func (portal *Portal) GetIntentFor(ctx context.Context, sender EventSender, source *UserLogin, evtType RemoteEventType) MatrixAPI {
|
||||
intent, _ := portal.getIntentAndUserMXIDFor(ctx, sender, source, nil, evtType)
|
||||
func (portal *Portal) GetIntentFor(ctx context.Context, sender EventSender, source *UserLogin, evtType RemoteEventType) (MatrixAPI, bool) {
|
||||
intent, _, err := portal.getIntentAndUserMXIDFor(ctx, sender, source, nil, evtType)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
if intent == nil {
|
||||
// TODO this is very hacky - we should either insert an empty ghost row automatically
|
||||
// (and not fetch it at runtime) or make the message sender column nullable.
|
||||
portal.Bridge.GetGhostByID(ctx, "")
|
||||
intent = portal.Bridge.Bot
|
||||
if intent == nil {
|
||||
panic(fmt.Errorf("bridge bot is nil"))
|
||||
}
|
||||
}
|
||||
return intent
|
||||
return intent, true
|
||||
}
|
||||
|
||||
func (portal *Portal) getRelationMeta(ctx context.Context, currentMsg networkid.MessageID, replyToPtr *networkid.MessageOptionalPartID, threadRootPtr *networkid.MessageID, isBatchSend bool) (replyTo, threadRoot, prevThreadEvent *database.Message) {
|
||||
|
|
@ -1982,7 +1997,7 @@ func (portal *Portal) sendConvertedMessage(
|
|||
ts time.Time,
|
||||
streamOrder int64,
|
||||
logContext func(*zerolog.Event) *zerolog.Event,
|
||||
) []*database.Message {
|
||||
) ([]*database.Message, EventHandlingResult) {
|
||||
if logContext == nil {
|
||||
logContext = func(e *zerolog.Event) *zerolog.Event {
|
||||
return e
|
||||
|
|
@ -1991,6 +2006,7 @@ func (portal *Portal) sendConvertedMessage(
|
|||
log := zerolog.Ctx(ctx)
|
||||
replyTo, threadRoot, prevThreadEvent := portal.getRelationMeta(ctx, id, converted.ReplyTo, converted.ThreadRoot, false)
|
||||
output := make([]*database.Message, 0, len(converted.Parts))
|
||||
allSuccess := true
|
||||
for i, part := range converted.Parts {
|
||||
portal.applyRelationMeta(ctx, part.Content, replyTo, threadRoot, prevThreadEvent)
|
||||
dbMessage := &database.Message{
|
||||
|
|
@ -2023,6 +2039,7 @@ func (portal *Portal) sendConvertedMessage(
|
|||
})
|
||||
if err != nil {
|
||||
logContext(log.Err(err)).Str("part_id", string(part.ID)).Msg("Failed to send message part to Matrix")
|
||||
allSuccess = false
|
||||
continue
|
||||
}
|
||||
logContext(log.Debug()).
|
||||
|
|
@ -2034,12 +2051,13 @@ func (portal *Portal) sendConvertedMessage(
|
|||
err := portal.Bridge.DB.Message.Insert(ctx, dbMessage)
|
||||
if err != nil {
|
||||
logContext(log.Err(err)).Str("part_id", string(part.ID)).Msg("Failed to save message part to database")
|
||||
allSuccess = false
|
||||
}
|
||||
if converted.Disappear.Type != database.DisappearingTypeNone && !dbMessage.HasFakeMXID() {
|
||||
if converted.Disappear.Type == database.DisappearingTypeAfterSend && converted.Disappear.DisappearAt.IsZero() {
|
||||
converted.Disappear.DisappearAt = dbMessage.Timestamp.Add(converted.Disappear.Timer)
|
||||
}
|
||||
go portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{
|
||||
portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{
|
||||
RoomID: portal.MXID,
|
||||
EventID: dbMessage.MXID,
|
||||
DisappearingSetting: converted.Disappear,
|
||||
|
|
@ -2050,7 +2068,10 @@ func (portal *Portal) sendConvertedMessage(
|
|||
}
|
||||
output = append(output, dbMessage)
|
||||
}
|
||||
return output
|
||||
if !allSuccess {
|
||||
return output, EventHandlingResultFailed
|
||||
}
|
||||
return output, EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) checkPendingMessage(ctx context.Context, evt RemoteMessage) (bool, *database.Message) {
|
||||
|
|
@ -2110,21 +2131,24 @@ func (portal *Portal) checkPendingMessage(ctx context.Context, evt RemoteMessage
|
|||
return true, pending.db
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) bool {
|
||||
func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) (handleRes EventHandlingResult, continueHandling bool) {
|
||||
log := zerolog.Ctx(ctx)
|
||||
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageUpsert)
|
||||
if intent == nil {
|
||||
return false
|
||||
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageUpsert)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
res, err := evt.HandleExisting(ctx, portal, intent, existing)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to handle existing message in upsert event after receiving remote echo")
|
||||
} else {
|
||||
handleRes = EventHandlingResultSuccess
|
||||
}
|
||||
if res.SaveParts {
|
||||
for _, part := range existing {
|
||||
err = portal.Bridge.DB.Message.Update(ctx, part)
|
||||
if err != nil {
|
||||
log.Err(err).Str("part_id", string(part.PartID)).Msg("Failed to update message part in database")
|
||||
handleRes = EventHandlingResultFailed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2136,19 +2160,25 @@ func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin,
|
|||
Str("action", "handle remote subevent").
|
||||
Stringer("bridge_evt_type", subType).
|
||||
Logger()
|
||||
portal.handleRemoteEvent(log.WithContext(ctx), source, subType, subEvt)
|
||||
subRes := portal.handleRemoteEvent(log.WithContext(ctx), source, subType, subEvt)
|
||||
if !subRes.Success {
|
||||
handleRes.Success = false
|
||||
}
|
||||
}
|
||||
}
|
||||
return res.ContinueMessageHandling
|
||||
continueHandling = res.ContinueMessageHandling
|
||||
return
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) {
|
||||
func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) (res EventHandlingResult) {
|
||||
log := zerolog.Ctx(ctx)
|
||||
upsertEvt, isUpsert := evt.(RemoteMessageUpsert)
|
||||
isUpsert = isUpsert && evt.GetType() == RemoteEventMessageUpsert
|
||||
if wasPending, dbMessage := portal.checkPendingMessage(ctx, evt); wasPending {
|
||||
if isUpsert && dbMessage != nil {
|
||||
portal.handleRemoteUpsert(ctx, source, upsertEvt, []*database.Message{dbMessage})
|
||||
res, _ = portal.handleRemoteUpsert(ctx, source, upsertEvt, []*database.Message{dbMessage})
|
||||
} else {
|
||||
res = EventHandlingResultIgnored
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
@ -2157,35 +2187,42 @@ func (portal *Portal) handleRemoteMessage(ctx context.Context, source *UserLogin
|
|||
log.Err(err).Msg("Failed to check if message is a duplicate")
|
||||
} else if len(existing) > 0 {
|
||||
if isUpsert {
|
||||
if portal.handleRemoteUpsert(ctx, source, upsertEvt, existing) {
|
||||
var continueHandling bool
|
||||
res, continueHandling = portal.handleRemoteUpsert(ctx, source, upsertEvt, existing)
|
||||
if continueHandling {
|
||||
log.Debug().Msg("Upsert handler said to continue message handling normally")
|
||||
} else {
|
||||
return
|
||||
return res
|
||||
}
|
||||
} else {
|
||||
log.Debug().Stringer("existing_mxid", existing[0].MXID).Msg("Ignoring duplicate message")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
}
|
||||
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessage)
|
||||
if intent == nil {
|
||||
return
|
||||
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessage)
|
||||
if !ok {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
ts := getEventTS(evt)
|
||||
converted, err := evt.ConvertMessage(ctx, portal, intent)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrIgnoringRemoteEvent) {
|
||||
log.Debug().Err(err).Msg("Remote message handling was cancelled by convert function")
|
||||
return EventHandlingResultIgnored
|
||||
} else {
|
||||
log.Err(err).Msg("Failed to convert remote message")
|
||||
portal.sendRemoteErrorNotice(ctx, intent, err, ts, "message")
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
return
|
||||
}
|
||||
portal.sendConvertedMessage(ctx, evt.GetID(), intent, evt.GetSender().Sender, converted, ts, getStreamOrder(evt), nil)
|
||||
_, res = portal.sendConvertedMessage(ctx, evt.GetID(), intent, evt.GetSender().Sender, converted, ts, getStreamOrder(evt), nil)
|
||||
if portal.currentlyTypingGhosts.Pop(intent.GetMXID()) {
|
||||
intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0)
|
||||
err = intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to send stop typing event after bridging message")
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (portal *Portal) sendRemoteErrorNotice(ctx context.Context, intent MatrixAPI, err error, ts time.Time, evtTypeName string) {
|
||||
|
|
@ -2208,7 +2245,7 @@ func (portal *Portal) sendRemoteErrorNotice(ctx context.Context, intent MatrixAP
|
|||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) {
|
||||
func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
var existing []*database.Message
|
||||
if bundledEvt, ok := evt.(RemoteEventWithBundledParts); ok {
|
||||
|
|
@ -2220,37 +2257,41 @@ func (portal *Portal) handleRemoteEdit(ctx context.Context, source *UserLogin, e
|
|||
existing, err = portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, targetID)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get edit target message")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
}
|
||||
if existing == nil {
|
||||
log.Warn().Msg("Edit target message not found")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventEdit)
|
||||
if intent == nil {
|
||||
return
|
||||
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventEdit)
|
||||
if !ok {
|
||||
return EventHandlingResultFailed
|
||||
} else if intent.GetMXID() != existing[0].SenderMXID {
|
||||
log.Warn().
|
||||
Stringer("edit_sender_mxid", intent.GetMXID()).
|
||||
Stringer("original_sender_mxid", existing[0].SenderMXID).
|
||||
Msg("Not bridging edit: sender doesn't match original message sender")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
ts := getEventTS(evt)
|
||||
converted, err := evt.ConvertEdit(ctx, portal, intent, existing)
|
||||
if errors.Is(err, ErrIgnoringRemoteEvent) {
|
||||
log.Debug().Err(err).Msg("Remote edit handling was cancelled by convert function")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
} else if err != nil {
|
||||
log.Err(err).Msg("Failed to convert remote edit")
|
||||
portal.sendRemoteErrorNotice(ctx, intent, err, ts, "edit")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
portal.sendConvertedEdit(ctx, existing[0].ID, evt.GetSender().Sender, converted, intent, ts, getStreamOrder(evt))
|
||||
res := portal.sendConvertedEdit(ctx, existing[0].ID, evt.GetSender().Sender, converted, intent, ts, getStreamOrder(evt))
|
||||
if portal.currentlyTypingGhosts.Pop(intent.GetMXID()) {
|
||||
intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0)
|
||||
err = intent.MarkTyping(ctx, portal.MXID, TypingTypeText, 0)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to send stop typing event after bridging edit")
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (portal *Portal) sendConvertedEdit(
|
||||
|
|
@ -2261,8 +2302,9 @@ func (portal *Portal) sendConvertedEdit(
|
|||
intent MatrixAPI,
|
||||
ts time.Time,
|
||||
streamOrder int64,
|
||||
) {
|
||||
) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
allSuccess := true
|
||||
for i, part := range converted.ModifiedParts {
|
||||
if part.Content.Mentions == nil {
|
||||
part.Content.Mentions = &event.Mentions{}
|
||||
|
|
@ -2298,6 +2340,7 @@ func (portal *Portal) sendConvertedEdit(
|
|||
})
|
||||
if err != nil {
|
||||
log.Err(err).Stringer("part_mxid", part.Part.MXID).Msg("Failed to edit message part")
|
||||
allSuccess = false
|
||||
continue
|
||||
} else {
|
||||
log.Debug().
|
||||
|
|
@ -2312,6 +2355,7 @@ func (portal *Portal) sendConvertedEdit(
|
|||
err := portal.Bridge.DB.Message.Update(ctx, part.Part)
|
||||
if err != nil {
|
||||
log.Err(err).Int64("part_rowid", part.Part.RowID).Msg("Failed to update message part in database")
|
||||
allSuccess = false
|
||||
}
|
||||
}
|
||||
for _, part := range converted.DeletedParts {
|
||||
|
|
@ -2325,6 +2369,7 @@ func (portal *Portal) sendConvertedEdit(
|
|||
})
|
||||
if err != nil {
|
||||
log.Err(err).Stringer("part_mxid", part.MXID).Msg("Failed to redact message part deleted in edit")
|
||||
allSuccess = false
|
||||
} else {
|
||||
log.Debug().
|
||||
Stringer("redaction_event_id", resp.EventID).
|
||||
|
|
@ -2335,11 +2380,19 @@ func (portal *Portal) sendConvertedEdit(
|
|||
err = portal.Bridge.DB.Message.Delete(ctx, part.RowID)
|
||||
if err != nil {
|
||||
log.Err(err).Int64("part_rowid", part.RowID).Msg("Failed to delete message part from database")
|
||||
allSuccess = false
|
||||
}
|
||||
}
|
||||
if converted.AddedParts != nil {
|
||||
portal.sendConvertedMessage(ctx, targetID, intent, senderID, converted.AddedParts, ts, streamOrder, nil)
|
||||
_, res := portal.sendConvertedMessage(ctx, targetID, intent, senderID, converted.AddedParts, ts, streamOrder, nil)
|
||||
if !res.Success {
|
||||
allSuccess = false
|
||||
}
|
||||
}
|
||||
if !allSuccess {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) getTargetMessagePart(ctx context.Context, evt RemoteEventWithTargetMessage) (*database.Message, error) {
|
||||
|
|
@ -2372,17 +2425,17 @@ func getStreamOrder(evt RemoteEvent) int64 {
|
|||
return 0
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) {
|
||||
func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
eventTS := getEventTS(evt)
|
||||
targetMessage, err := portal.getTargetMessagePart(ctx, evt)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get target message for reaction")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
} else if targetMessage == nil {
|
||||
// TODO use deterministic event ID as target if applicable?
|
||||
log.Warn().Msg("Target message for reaction not found")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
var existingReactions []*database.Reaction
|
||||
if partTargeter, ok := evt.(RemoteEventWithTargetPart); ok {
|
||||
|
|
@ -2390,6 +2443,10 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
|
|||
} else {
|
||||
existingReactions, err = portal.Bridge.DB.Reaction.GetAllToMessage(ctx, portal.Receiver, evt.GetTargetMessage())
|
||||
}
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get existing reactions for reaction sync")
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
existing := make(map[networkid.UserID]map[networkid.EmojiID]*database.Reaction)
|
||||
for _, existingReaction := range existingReactions {
|
||||
if existing[existingReaction.SenderID] == nil {
|
||||
|
|
@ -2398,9 +2455,13 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
|
|||
existing[existingReaction.SenderID][existingReaction.EmojiID] = existingReaction
|
||||
}
|
||||
|
||||
doAddReaction := func(new *BackfillReaction, intent MatrixAPI) MatrixAPI {
|
||||
doAddReaction := func(new *BackfillReaction, intent MatrixAPI) {
|
||||
if intent == nil {
|
||||
intent = portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync)
|
||||
var ok bool
|
||||
intent, ok = portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
portal.sendConvertedReaction(
|
||||
ctx, new.Sender.Sender, intent, targetMessage, new.EmojiID, new.Emoji,
|
||||
|
|
@ -2411,7 +2472,6 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
|
|||
Time("reaction_ts", new.Timestamp)
|
||||
},
|
||||
)
|
||||
return intent
|
||||
}
|
||||
doRemoveReaction := func(old *database.Reaction, intent MatrixAPI, deleteRow bool) {
|
||||
if intent == nil && old.SenderMXID != "" {
|
||||
|
|
@ -2445,7 +2505,10 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
|
|||
}
|
||||
}
|
||||
doOverwriteReaction := func(new *BackfillReaction, old *database.Reaction) {
|
||||
intent := portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync)
|
||||
intent, ok := portal.GetIntentFor(ctx, new.Sender, source, RemoteEventReactionSync)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
doRemoveReaction(old, intent, false)
|
||||
doAddReaction(new, intent)
|
||||
}
|
||||
|
|
@ -2496,30 +2559,34 @@ func (portal *Portal) handleRemoteReactionSync(ctx context.Context, source *User
|
|||
}
|
||||
}
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) {
|
||||
func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
targetMessage, err := portal.getTargetMessagePart(ctx, evt)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get target message for reaction")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
} else if targetMessage == nil {
|
||||
// TODO use deterministic event ID as target if applicable?
|
||||
log.Warn().Msg("Target message for reaction not found")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
emoji, emojiID := evt.GetReactionEmoji()
|
||||
existingReaction, err := portal.Bridge.DB.Reaction.GetByID(ctx, portal.Receiver, targetMessage.ID, targetMessage.PartID, evt.GetSender().Sender, emojiID)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to check if reaction is a duplicate")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
} else if existingReaction != nil && (emojiID != "" || existingReaction.Emoji == emoji) {
|
||||
log.Debug().Msg("Ignoring duplicate reaction")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
ts := getEventTS(evt)
|
||||
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReaction)
|
||||
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReaction)
|
||||
if !ok {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
var extra map[string]any
|
||||
if extraContentProvider, ok := evt.(RemoteReactionWithExtraContent); ok {
|
||||
extra = extraContentProvider.GetReactionExtraContent()
|
||||
|
|
@ -2538,14 +2605,14 @@ func (portal *Portal) handleRemoteReaction(ctx context.Context, source *UserLogi
|
|||
log.Err(err).Msg("Failed to redact old reaction")
|
||||
}
|
||||
}
|
||||
portal.sendConvertedReaction(ctx, evt.GetSender().Sender, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extra, nil)
|
||||
return portal.sendConvertedReaction(ctx, evt.GetSender().Sender, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extra, nil)
|
||||
}
|
||||
|
||||
func (portal *Portal) sendConvertedReaction(
|
||||
ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message,
|
||||
emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any,
|
||||
logContext func(*zerolog.Event) *zerolog.Event,
|
||||
) {
|
||||
) EventHandlingResult {
|
||||
if logContext == nil {
|
||||
logContext = func(e *zerolog.Event) *zerolog.Event {
|
||||
return e
|
||||
|
|
@ -2580,7 +2647,7 @@ func (portal *Portal) sendConvertedReaction(
|
|||
})
|
||||
if err != nil {
|
||||
logContext(log.Err(err)).Msg("Failed to send reaction to Matrix")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
logContext(log.Debug()).
|
||||
Stringer("event_id", resp.EventID).
|
||||
|
|
@ -2589,7 +2656,9 @@ func (portal *Portal) sendConvertedReaction(
|
|||
err = portal.Bridge.DB.Reaction.Upsert(ctx, dbReaction)
|
||||
if err != nil {
|
||||
logContext(log.Err(err)).Msg("Failed to save reaction to database")
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) getIntentForMXID(ctx context.Context, userID id.UserID) (MatrixAPI, error) {
|
||||
|
|
@ -2608,22 +2677,26 @@ func (portal *Portal) getIntentForMXID(ctx context.Context, userID id.UserID) (M
|
|||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) {
|
||||
func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
targetReaction, err := portal.getTargetReaction(ctx, evt)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get target reaction for removal")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
} else if targetReaction == nil {
|
||||
log.Warn().Msg("Target reaction not found")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
intent, err := portal.getIntentForMXID(ctx, targetReaction.SenderMXID)
|
||||
if err != nil {
|
||||
log.Err(err).Stringer("sender_mxid", targetReaction.SenderMXID).Msg("Failed to get intent for removing reaction")
|
||||
}
|
||||
if intent == nil {
|
||||
intent = portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReactionRemove)
|
||||
var ok bool
|
||||
intent, ok = portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventReactionRemove)
|
||||
if !ok {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
}
|
||||
ts := getEventTS(evt)
|
||||
_, err = intent.SendMessage(ctx, portal.MXID, event.EventRedaction, &event.Content{
|
||||
|
|
@ -2633,22 +2706,24 @@ func (portal *Portal) handleRemoteReactionRemove(ctx context.Context, source *Us
|
|||
}, &MatrixSendExtra{Timestamp: ts, ReactionMeta: targetReaction})
|
||||
if err != nil {
|
||||
log.Err(err).Stringer("reaction_mxid", targetReaction.MXID).Msg("Failed to redact reaction")
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
err = portal.Bridge.DB.Reaction.Delete(ctx, targetReaction)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to delete target reaction from database")
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) {
|
||||
func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
targetParts, err := portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, evt.GetTargetMessage())
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get target message for removal")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
} else if len(targetParts) == 0 {
|
||||
log.Debug().Msg("Target message not found")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
onlyForMeProvider, ok := evt.(RemoteDeleteOnlyForMe)
|
||||
onlyForMe := ok && onlyForMeProvider.DeleteOnlyForMe()
|
||||
|
|
@ -2656,7 +2731,10 @@ func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *Use
|
|||
// TODO check if there are other user logins before deleting
|
||||
}
|
||||
|
||||
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageRemove)
|
||||
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventMessageRemove)
|
||||
if !ok {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
if intent == portal.Bridge.Bot && len(targetParts) > 0 {
|
||||
senderIntent, err := portal.getIntentForMXID(ctx, targetParts[0].SenderMXID)
|
||||
if err != nil {
|
||||
|
|
@ -2665,15 +2743,17 @@ func (portal *Portal) handleRemoteMessageRemove(ctx context.Context, source *Use
|
|||
intent = senderIntent
|
||||
}
|
||||
}
|
||||
portal.redactMessageParts(ctx, targetParts, intent, getEventTS(evt))
|
||||
res := portal.redactMessageParts(ctx, targetParts, intent, getEventTS(evt))
|
||||
err = portal.Bridge.DB.Message.DeleteAllParts(ctx, portal.Receiver, evt.GetTargetMessage())
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to delete target message from database")
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) {
|
||||
func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
var anyFailed bool
|
||||
for _, part := range parts {
|
||||
if part.HasFakeMXID() {
|
||||
continue
|
||||
|
|
@ -2685,6 +2765,7 @@ func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.
|
|||
}, &MatrixSendExtra{Timestamp: ts, MessageMeta: part})
|
||||
if err != nil {
|
||||
log.Err(err).Stringer("part_mxid", part.MXID).Msg("Failed to redact message part")
|
||||
anyFailed = true
|
||||
} else {
|
||||
log.Debug().
|
||||
Stringer("redaction_event_id", resp.EventID).
|
||||
|
|
@ -2693,9 +2774,13 @@ func (portal *Portal) redactMessageParts(ctx context.Context, parts []*database.
|
|||
Msg("Sent redaction of message part to Matrix")
|
||||
}
|
||||
}
|
||||
if anyFailed {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) {
|
||||
func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
var err error
|
||||
var lastTarget *database.Message
|
||||
|
|
@ -2705,7 +2790,7 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
|
|||
if err != nil {
|
||||
log.Err(err).Str("last_target_id", string(lastTargetID)).
|
||||
Msg("Failed to get last target message for read receipt")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
} else if lastTarget == nil {
|
||||
log.Debug().Str("last_target_id", string(lastTargetID)).
|
||||
Msg("Last target message not found")
|
||||
|
|
@ -2724,7 +2809,7 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
|
|||
if err != nil {
|
||||
log.Err(err).Str("target_id", string(targetID)).
|
||||
Msg("Failed to get target message for read receipt")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
} else if target != nil && !target.HasFakeMXID() && (lastTarget == nil || target.Timestamp.After(lastTarget.Timestamp)) {
|
||||
lastTarget = target
|
||||
}
|
||||
|
|
@ -2737,14 +2822,17 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
|
|||
}
|
||||
}
|
||||
sender := evt.GetSender()
|
||||
intent := portal.GetIntentFor(ctx, sender, source, RemoteEventReadReceipt)
|
||||
intent, ok := portal.GetIntentFor(ctx, sender, source, RemoteEventReadReceipt)
|
||||
if !ok {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
var addTargetLog func(evt *zerolog.Event) *zerolog.Event
|
||||
if lastTarget == nil {
|
||||
sevt, evtOK := evt.(RemoteReadReceiptWithStreamOrder)
|
||||
soIntent, soIntentOK := intent.(StreamOrderReadingMatrixAPI)
|
||||
if !evtOK || !soIntentOK || sevt.GetReadUpToStreamOrder() == 0 {
|
||||
log.Warn().Msg("No target message found for read receipt")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
targetStreamOrder := sevt.GetReadUpToStreamOrder()
|
||||
addTargetLog = func(evt *zerolog.Event) *zerolog.Event {
|
||||
|
|
@ -2759,40 +2847,47 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
|
|||
}
|
||||
if err != nil {
|
||||
addTargetLog(log.Err(err)).Msg("Failed to bridge read receipt")
|
||||
return EventHandlingResultFailed
|
||||
} else {
|
||||
addTargetLog(log.Debug()).Msg("Bridged read receipt")
|
||||
}
|
||||
if sender.IsFromMe {
|
||||
portal.Bridge.DisappearLoop.StartAll(ctx, portal.MXID)
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) {
|
||||
func (portal *Portal) handleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) EventHandlingResult {
|
||||
if !evt.GetSender().IsFromMe {
|
||||
zerolog.Ctx(ctx).Warn().Msg("Ignoring mark unread event from non-self user")
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
dp := source.User.DoublePuppet(ctx)
|
||||
if dp == nil {
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
err := dp.MarkUnread(ctx, portal.MXID, evt.GetUnread())
|
||||
if err != nil {
|
||||
zerolog.Ctx(ctx).Err(err).Msg("Failed to bridge mark unread event")
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) {
|
||||
func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) EventHandlingResult {
|
||||
if portal.RoomType != database.RoomTypeDM || evt.GetSender().Sender != portal.OtherUserID {
|
||||
return
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventDeliveryReceipt)
|
||||
if !ok {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventDeliveryReceipt)
|
||||
log := zerolog.Ctx(ctx)
|
||||
for _, target := range evt.GetReceiptTargets() {
|
||||
targetParts, err := portal.Bridge.DB.Message.GetAllPartsByID(ctx, portal.Receiver, target)
|
||||
if err != nil {
|
||||
log.Err(err).Str("target_id", string(target)).Msg("Failed to get target message for delivery receipt")
|
||||
continue
|
||||
return EventHandlingResultFailed
|
||||
} else if len(targetParts) == 0 {
|
||||
continue
|
||||
} else if _, sentByGhost := portal.Bridge.Matrix.ParseGhostMXID(targetParts[0].SenderMXID); sentByGhost {
|
||||
|
|
@ -2811,36 +2906,43 @@ func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *U
|
|||
})
|
||||
}
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) {
|
||||
func (portal *Portal) handleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) EventHandlingResult {
|
||||
var typingType TypingType
|
||||
if typedEvt, ok := evt.(RemoteTypingWithType); ok {
|
||||
typingType = typedEvt.GetTypingType()
|
||||
}
|
||||
intent := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventTyping)
|
||||
intent, ok := portal.GetIntentFor(ctx, evt.GetSender(), source, RemoteEventTyping)
|
||||
if !ok {
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
timeout := evt.GetTimeout()
|
||||
err := intent.MarkTyping(ctx, portal.MXID, typingType, timeout)
|
||||
if err != nil {
|
||||
zerolog.Ctx(ctx).Err(err).Msg("Failed to bridge typing event")
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
if timeout == 0 {
|
||||
portal.currentlyTypingGhosts.Remove(intent.GetMXID())
|
||||
} else {
|
||||
portal.currentlyTypingGhosts.Add(intent.GetMXID())
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) {
|
||||
func (portal *Portal) handleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) EventHandlingResult {
|
||||
info, err := evt.GetChatInfoChange(ctx)
|
||||
if err != nil {
|
||||
zerolog.Ctx(ctx).Err(err).Msg("Failed to get chat info change")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
portal.ProcessChatInfoChange(ctx, evt.GetSender(), source, info, getEventTS(evt))
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) {
|
||||
func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
infoProvider, ok := evt.(RemoteChatResyncWithInfo)
|
||||
if ok {
|
||||
|
|
@ -2869,15 +2971,16 @@ func (portal *Portal) handleRemoteChatResync(ctx context.Context, source *UserLo
|
|||
portal.doForwardBackfill(ctx, source, latestMessage, bundle)
|
||||
}
|
||||
}
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) {
|
||||
func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) EventHandlingResult {
|
||||
log := zerolog.Ctx(ctx)
|
||||
if portal.Receiver == "" && evt.DeleteOnlyForMe() {
|
||||
logins, err := portal.Bridge.DB.UserPortal.GetAllInPortal(ctx, portal.PortalKey)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to check if portal has other logins")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
var ownUP *database.UserPortal
|
||||
logins = slices.DeleteFunc(logins, func(up *database.UserPortal) bool {
|
||||
|
|
@ -2907,31 +3010,35 @@ func (portal *Portal) handleRemoteChatDelete(ctx context.Context, source *UserLo
|
|||
)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to send leave state event for user after remote chat delete")
|
||||
return EventHandlingResultFailed
|
||||
} else {
|
||||
log.Debug().Msg("Sent leave state event for user after remote chat delete")
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
err := portal.Delete(ctx)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to delete portal from database")
|
||||
return
|
||||
return EventHandlingResultFailed
|
||||
}
|
||||
err = portal.Bridge.Bot.DeleteRoom(ctx, portal.MXID, false)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to delete Matrix room")
|
||||
return EventHandlingResultFailed
|
||||
} else {
|
||||
log.Info().Msg("Deleted room after remote chat delete event")
|
||||
return EventHandlingResultSuccess
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) {
|
||||
func (portal *Portal) handleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) (res EventHandlingResult) {
|
||||
//data, err := backfill.GetBackfillData(ctx, portal)
|
||||
//if err != nil {
|
||||
// zerolog.Ctx(ctx).Err(err).Msg("Failed to get backfill data")
|
||||
// return
|
||||
//}
|
||||
return
|
||||
}
|
||||
|
||||
type ChatInfoChange struct {
|
||||
|
|
@ -2944,7 +3051,10 @@ type ChatInfoChange struct {
|
|||
}
|
||||
|
||||
func (portal *Portal) ProcessChatInfoChange(ctx context.Context, sender EventSender, source *UserLogin, change *ChatInfoChange, ts time.Time) {
|
||||
intent := portal.GetIntentFor(ctx, sender, source, RemoteEventChatInfoChange)
|
||||
intent, ok := portal.GetIntentFor(ctx, sender, source, RemoteEventChatInfoChange)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if change.ChatInfo != nil {
|
||||
portal.UpdateInfo(ctx, change.ChatInfo, source, intent, ts)
|
||||
}
|
||||
|
|
@ -3339,7 +3449,10 @@ func (portal *Portal) getInitialMemberList(ctx context.Context, members *ChatMem
|
|||
ghost.UpdateInfo(ctx, member.UserInfo)
|
||||
}
|
||||
}
|
||||
intent, extraUserID := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0)
|
||||
intent, extraUserID, err := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if extraUserID != "" {
|
||||
invite = append(invite, extraUserID)
|
||||
if member.PowerLevel != nil {
|
||||
|
|
@ -3535,7 +3648,10 @@ func (portal *Portal) syncParticipants(
|
|||
ghost.UpdateInfo(ctx, member.UserInfo)
|
||||
}
|
||||
}
|
||||
intent, extraUserID := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0)
|
||||
intent, extraUserID, err := portal.getIntentAndUserMXIDFor(ctx, member.EventSender, source, loginsInPortal, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if intent != nil {
|
||||
syncIntent(intent, member)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -323,7 +323,10 @@ func (portal *Portal) compileBatchMessage(ctx context.Context, source *UserLogin
|
|||
if len(msg.Parts) == 0 {
|
||||
return
|
||||
}
|
||||
intent := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage)
|
||||
intent, ok := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
replyTo, threadRoot, prevThreadEvent := portal.getRelationMeta(ctx, msg.ID, msg.ReplyTo, msg.ThreadRoot, true)
|
||||
if threadRoot != nil && out.PrevThreadEvents[*msg.ThreadRoot] != "" {
|
||||
prevThreadEvent.MXID = out.PrevThreadEvents[*msg.ThreadRoot]
|
||||
|
|
@ -387,7 +390,10 @@ func (portal *Portal) compileBatchMessage(ctx context.Context, source *UserLogin
|
|||
}
|
||||
slices.Sort(partIDs)
|
||||
for _, reaction := range msg.Reactions {
|
||||
reactionIntent := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReactionRemove)
|
||||
reactionIntent, ok := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReactionRemove)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if reaction.TargetPart == nil {
|
||||
reaction.TargetPart = &partIDs[0]
|
||||
}
|
||||
|
|
@ -513,8 +519,11 @@ func (portal *Portal) sendBatch(ctx context.Context, source *UserLogin, messages
|
|||
func (portal *Portal) sendLegacyBackfill(ctx context.Context, source *UserLogin, messages []*BackfillMessage, markRead bool) {
|
||||
var lastPart id.EventID
|
||||
for _, msg := range messages {
|
||||
intent := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage)
|
||||
dbMessages := portal.sendConvertedMessage(ctx, msg.ID, intent, msg.Sender.Sender, msg.ConvertedMessage, msg.Timestamp, msg.StreamOrder, func(z *zerolog.Event) *zerolog.Event {
|
||||
intent, ok := portal.GetIntentFor(ctx, msg.Sender, source, RemoteEventMessage)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
dbMessages, _ := portal.sendConvertedMessage(ctx, msg.ID, intent, msg.Sender.Sender, msg.ConvertedMessage, msg.Timestamp, msg.StreamOrder, func(z *zerolog.Event) *zerolog.Event {
|
||||
return z.
|
||||
Str("message_id", string(msg.ID)).
|
||||
Any("sender_id", msg.Sender).
|
||||
|
|
@ -523,7 +532,10 @@ func (portal *Portal) sendLegacyBackfill(ctx context.Context, source *UserLogin,
|
|||
if len(dbMessages) > 0 {
|
||||
lastPart = dbMessages[len(dbMessages)-1].MXID
|
||||
for _, reaction := range msg.Reactions {
|
||||
reactionIntent := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReaction)
|
||||
reactionIntent, ok := portal.GetIntentFor(ctx, reaction.Sender, source, RemoteEventReaction)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
targetPart := dbMessages[0]
|
||||
if reaction.TargetPart != nil {
|
||||
targetPartIdx := slices.IndexFunc(dbMessages, func(dbMsg *database.Message) bool {
|
||||
|
|
|
|||
|
|
@ -29,23 +29,23 @@ func (portal *PortalInternals) UpdateLogger() {
|
|||
(*Portal)(portal).updateLogger()
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) QueueEvent(ctx context.Context, evt portalEvent) {
|
||||
(*Portal)(portal).queueEvent(ctx, evt)
|
||||
func (portal *PortalInternals) QueueEvent(ctx context.Context, evt portalEvent) EventHandlingResult {
|
||||
return (*Portal)(portal).queueEvent(ctx, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) EventLoop() {
|
||||
(*Portal)(portal).eventLoop()
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleSingleEventAsync(idx int, rawEvt any) {
|
||||
(*Portal)(portal).handleSingleEventAsync(idx, rawEvt)
|
||||
func (portal *PortalInternals) HandleSingleEventAsync(idx int, rawEvt any) (outerRes EventHandlingResult) {
|
||||
return (*Portal)(portal).handleSingleEventAsync(idx, rawEvt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) GetEventCtxWithLog(rawEvt any, idx int) context.Context {
|
||||
return (*Portal)(portal).getEventCtxWithLog(rawEvt, idx)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleSingleEvent(ctx context.Context, rawEvt any, doneCallback func()) {
|
||||
func (portal *PortalInternals) HandleSingleEvent(ctx context.Context, rawEvt any, doneCallback func(EventHandlingResult)) {
|
||||
(*Portal)(portal).handleSingleEvent(ctx, rawEvt, doneCallback)
|
||||
}
|
||||
|
||||
|
|
@ -129,11 +129,11 @@ func (portal *PortalInternals) HandleMatrixRedaction(ctx context.Context, sender
|
|||
(*Portal)(portal).handleMatrixRedaction(ctx, sender, origSender, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) {
|
||||
(*Portal)(portal).handleRemoteEvent(ctx, source, evtType, evt)
|
||||
func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) (res EventHandlingResult) {
|
||||
return (*Portal)(portal).handleRemoteEvent(ctx, source, evtType, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) GetIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID) {
|
||||
func (portal *PortalInternals) GetIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID, err error) {
|
||||
return (*Portal)(portal).getIntentAndUserMXIDFor(ctx, sender, source, otherLogins, evtType)
|
||||
}
|
||||
|
||||
|
|
@ -145,7 +145,7 @@ func (portal *PortalInternals) ApplyRelationMeta(ctx context.Context, content *e
|
|||
(*Portal)(portal).applyRelationMeta(ctx, content, replyTo, threadRoot, prevThreadEvent)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) SendConvertedMessage(ctx context.Context, id networkid.MessageID, intent MatrixAPI, senderID networkid.UserID, converted *ConvertedMessage, ts time.Time, streamOrder int64, logContext func(*zerolog.Event) *zerolog.Event) []*database.Message {
|
||||
func (portal *PortalInternals) SendConvertedMessage(ctx context.Context, id networkid.MessageID, intent MatrixAPI, senderID networkid.UserID, converted *ConvertedMessage, ts time.Time, streamOrder int64, logContext func(*zerolog.Event) *zerolog.Event) ([]*database.Message, EventHandlingResult) {
|
||||
return (*Portal)(portal).sendConvertedMessage(ctx, id, intent, senderID, converted, ts, streamOrder, logContext)
|
||||
}
|
||||
|
||||
|
|
@ -153,24 +153,24 @@ func (portal *PortalInternals) CheckPendingMessage(ctx context.Context, evt Remo
|
|||
return (*Portal)(portal).checkPendingMessage(ctx, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) bool {
|
||||
func (portal *PortalInternals) HandleRemoteUpsert(ctx context.Context, source *UserLogin, evt RemoteMessageUpsert, existing []*database.Message) (handleRes EventHandlingResult, continueHandling bool) {
|
||||
return (*Portal)(portal).handleRemoteUpsert(ctx, source, evt, existing)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) {
|
||||
(*Portal)(portal).handleRemoteMessage(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteMessage(ctx context.Context, source *UserLogin, evt RemoteMessage) (res EventHandlingResult) {
|
||||
return (*Portal)(portal).handleRemoteMessage(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) SendRemoteErrorNotice(ctx context.Context, intent MatrixAPI, err error, ts time.Time, evtTypeName string) {
|
||||
(*Portal)(portal).sendRemoteErrorNotice(ctx, intent, err, ts, evtTypeName)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) {
|
||||
(*Portal)(portal).handleRemoteEdit(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteEdit(ctx context.Context, source *UserLogin, evt RemoteEdit) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteEdit(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) SendConvertedEdit(ctx context.Context, targetID networkid.MessageID, senderID networkid.UserID, converted *ConvertedEdit, intent MatrixAPI, ts time.Time, streamOrder int64) {
|
||||
(*Portal)(portal).sendConvertedEdit(ctx, targetID, senderID, converted, intent, ts, streamOrder)
|
||||
func (portal *PortalInternals) SendConvertedEdit(ctx context.Context, targetID networkid.MessageID, senderID networkid.UserID, converted *ConvertedEdit, intent MatrixAPI, ts time.Time, streamOrder int64) EventHandlingResult {
|
||||
return (*Portal)(portal).sendConvertedEdit(ctx, targetID, senderID, converted, intent, ts, streamOrder)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) GetTargetMessagePart(ctx context.Context, evt RemoteEventWithTargetMessage) (*database.Message, error) {
|
||||
|
|
@ -181,64 +181,64 @@ func (portal *PortalInternals) GetTargetReaction(ctx context.Context, evt Remote
|
|||
return (*Portal)(portal).getTargetReaction(ctx, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) {
|
||||
(*Portal)(portal).handleRemoteReactionSync(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteReactionSync(ctx context.Context, source *UserLogin, evt RemoteReactionSync) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteReactionSync(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) {
|
||||
(*Portal)(portal).handleRemoteReaction(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteReaction(ctx context.Context, source *UserLogin, evt RemoteReaction) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteReaction(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) SendConvertedReaction(ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message, emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any, logContext func(*zerolog.Event) *zerolog.Event) {
|
||||
(*Portal)(portal).sendConvertedReaction(ctx, senderID, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extraContent, logContext)
|
||||
func (portal *PortalInternals) SendConvertedReaction(ctx context.Context, senderID networkid.UserID, intent MatrixAPI, targetMessage *database.Message, emojiID networkid.EmojiID, emoji string, ts time.Time, dbMetadata any, extraContent map[string]any, logContext func(*zerolog.Event) *zerolog.Event) EventHandlingResult {
|
||||
return (*Portal)(portal).sendConvertedReaction(ctx, senderID, intent, targetMessage, emojiID, emoji, ts, dbMetadata, extraContent, logContext)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) GetIntentForMXID(ctx context.Context, userID id.UserID) (MatrixAPI, error) {
|
||||
return (*Portal)(portal).getIntentForMXID(ctx, userID)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) {
|
||||
(*Portal)(portal).handleRemoteReactionRemove(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteReactionRemove(ctx context.Context, source *UserLogin, evt RemoteReactionRemove) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteReactionRemove(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) {
|
||||
(*Portal)(portal).handleRemoteMessageRemove(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteMessageRemove(ctx context.Context, source *UserLogin, evt RemoteMessageRemove) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteMessageRemove(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) RedactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) {
|
||||
(*Portal)(portal).redactMessageParts(ctx, parts, intent, ts)
|
||||
func (portal *PortalInternals) RedactMessageParts(ctx context.Context, parts []*database.Message, intent MatrixAPI, ts time.Time) EventHandlingResult {
|
||||
return (*Portal)(portal).redactMessageParts(ctx, parts, intent, ts)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) {
|
||||
(*Portal)(portal).handleRemoteReadReceipt(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteReadReceipt(ctx context.Context, source *UserLogin, evt RemoteReadReceipt) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteReadReceipt(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) {
|
||||
(*Portal)(portal).handleRemoteMarkUnread(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteMarkUnread(ctx context.Context, source *UserLogin, evt RemoteMarkUnread) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteMarkUnread(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) {
|
||||
(*Portal)(portal).handleRemoteDeliveryReceipt(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteDeliveryReceipt(ctx context.Context, source *UserLogin, evt RemoteDeliveryReceipt) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteDeliveryReceipt(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) {
|
||||
(*Portal)(portal).handleRemoteTyping(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteTyping(ctx context.Context, source *UserLogin, evt RemoteTyping) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteTyping(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) {
|
||||
(*Portal)(portal).handleRemoteChatInfoChange(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteChatInfoChange(ctx context.Context, source *UserLogin, evt RemoteChatInfoChange) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteChatInfoChange(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) {
|
||||
(*Portal)(portal).handleRemoteChatResync(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteChatResync(ctx context.Context, source *UserLogin, evt RemoteChatResync) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteChatResync(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) {
|
||||
(*Portal)(portal).handleRemoteChatDelete(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteChatDelete(ctx context.Context, source *UserLogin, evt RemoteChatDelete) EventHandlingResult {
|
||||
return (*Portal)(portal).handleRemoteChatDelete(ctx, source, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) {
|
||||
(*Portal)(portal).handleRemoteBackfill(ctx, source, backfill)
|
||||
func (portal *PortalInternals) HandleRemoteBackfill(ctx context.Context, source *UserLogin, backfill RemoteBackfill) (res EventHandlingResult) {
|
||||
return (*Portal)(portal).handleRemoteBackfill(ctx, source, backfill)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) UpdateName(ctx context.Context, name string, sender MatrixAPI, ts time.Time) bool {
|
||||
|
|
|
|||
|
|
@ -151,11 +151,24 @@ func (br *Bridge) QueueMatrixEvent(ctx context.Context, evt *event.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (ul *UserLogin) QueueRemoteEvent(evt RemoteEvent) {
|
||||
ul.Bridge.QueueRemoteEvent(ul, evt)
|
||||
type EventHandlingResult struct {
|
||||
Success bool
|
||||
Ignored bool
|
||||
Queued bool
|
||||
}
|
||||
|
||||
func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) {
|
||||
var (
|
||||
EventHandlingResultFailed = EventHandlingResult{}
|
||||
EventHandlingResultQueued = EventHandlingResult{Queued: true}
|
||||
EventHandlingResultSuccess = EventHandlingResult{Success: true}
|
||||
EventHandlingResultIgnored = EventHandlingResult{Success: true, Ignored: true}
|
||||
)
|
||||
|
||||
func (ul *UserLogin) QueueRemoteEvent(evt RemoteEvent) EventHandlingResult {
|
||||
return ul.Bridge.QueueRemoteEvent(ul, evt)
|
||||
}
|
||||
|
||||
func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) (res EventHandlingResult) {
|
||||
log := login.Log
|
||||
ctx := log.WithContext(br.BackgroundCtx)
|
||||
maybeUncertain, ok := evt.(RemoteEventWithUncertainPortalReceiver)
|
||||
|
|
@ -182,7 +195,7 @@ func (br *Bridge) QueueRemoteEvent(login *UserLogin, evt RemoteEvent) {
|
|||
}
|
||||
// TODO put this in a better place, and maybe cache to avoid constant db queries
|
||||
login.MarkInPortal(ctx, portal)
|
||||
portal.queueEvent(ctx, &portalRemoteEvent{
|
||||
return portal.queueEvent(ctx, &portalRemoteEvent{
|
||||
evt: evt,
|
||||
source: login,
|
||||
})
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue