bridgev2: add option to use deterministic ID for outgoing messages (#292)

This commit is contained in:
Tulir Asokan 2024-10-10 15:51:20 +02:00 committed by GitHub
commit 691c834144
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 53 additions and 42 deletions

View file

@ -66,6 +66,7 @@ type BridgeConfig struct {
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
MuteOnlyOnCreate bool `yaml:"mute_only_on_create"`
OutgoingMessageReID bool `yaml:"outgoing_message_re_id"`
CleanupOnLogout CleanupOnLogouts `yaml:"cleanup_on_logout"`
Relay RelayConfig `yaml:"relay"`
Permissions PermissionConfig `yaml:"permissions"`

View file

@ -77,10 +77,10 @@ func (proc *Processor) Handle(ctx context.Context, roomID id.RoomID, eventID id.
log := &logCopy
defer func() {
statusInfo := &bridgev2.MessageStatusEventInfo{
RoomID: roomID,
EventID: eventID,
EventType: event.EventMessage,
Sender: user.MXID,
RoomID: roomID,
SourceEventID: eventID,
EventType: event.EventMessage,
Sender: user.MXID,
}
err := recover()
if err != nil {

View file

@ -446,7 +446,7 @@ func (br *Connector) SendMessageStatus(ctx context.Context, ms *bridgev2.Message
}
func (br *Connector) internalSendMessageStatus(ctx context.Context, ms *bridgev2.MessageStatus, evt *bridgev2.MessageStatusEventInfo, editEvent id.EventID) id.EventID {
if evt.EventType.IsEphemeral() || evt.EventID == "" {
if evt.EventType.IsEphemeral() || evt.SourceEventID == "" {
return ""
}
log := zerolog.Ctx(ctx)
@ -460,7 +460,7 @@ func (br *Connector) internalSendMessageStatus(ctx context.Context, ms *bridgev2
if err != nil {
log.Err(err).
Stringer("room_id", evt.RoomID).
Stringer("event_id", evt.EventID).
Stringer("event_id", evt.SourceEventID).
Any("mss_content", mssEvt).
Msg("Failed to send MSS event")
}
@ -474,7 +474,7 @@ func (br *Connector) internalSendMessageStatus(ctx context.Context, ms *bridgev2
if err != nil {
log.Err(err).
Stringer("room_id", evt.RoomID).
Stringer("event_id", evt.EventID).
Stringer("event_id", evt.SourceEventID).
Str("notice_message", content.Body).
Msg("Failed to send notice event")
} else {
@ -482,11 +482,11 @@ func (br *Connector) internalSendMessageStatus(ctx context.Context, ms *bridgev2
}
}
if ms.Status == event.MessageStatusSuccess && br.Config.Matrix.DeliveryReceipts {
err = br.Bot.SendReceipt(ctx, evt.RoomID, evt.EventID, event.ReceiptTypeRead, nil)
err = br.Bot.SendReceipt(ctx, evt.RoomID, evt.SourceEventID, event.ReceiptTypeRead, nil)
if err != nil {
log.Err(err).
Stringer("room_id", evt.RoomID).
Stringer("event_id", evt.EventID).
Stringer("event_id", evt.SourceEventID).
Msg("Failed to send Matrix delivery receipt")
}
}

View file

@ -18,13 +18,14 @@ import (
)
type MessageStatusEventInfo struct {
RoomID id.RoomID
EventID id.EventID
EventType event.Type
MessageType event.MessageType
Sender id.UserID
ThreadRoot id.EventID
StreamOrder int64
RoomID id.RoomID
SourceEventID id.EventID
NewEventID id.EventID
EventType event.Type
MessageType event.MessageType
Sender id.UserID
ThreadRoot id.EventID
StreamOrder int64
}
func StatusEventInfoFromEvent(evt *event.Event) *MessageStatusEventInfo {
@ -33,12 +34,12 @@ func StatusEventInfoFromEvent(evt *event.Event) *MessageStatusEventInfo {
threadRoot = relatable.OptionalGetRelatesTo().GetThreadParent()
}
return &MessageStatusEventInfo{
RoomID: evt.RoomID,
EventID: evt.ID,
EventType: evt.Type,
MessageType: evt.Content.AsMessage().MsgType,
Sender: evt.Sender,
ThreadRoot: threadRoot,
RoomID: evt.RoomID,
SourceEventID: evt.ID,
EventType: evt.Type,
MessageType: evt.Content.AsMessage().MsgType,
Sender: evt.Sender,
ThreadRoot: threadRoot,
}
}
@ -150,7 +151,7 @@ func (ms *MessageStatus) ToCheckpoint(evt *MessageStatusEventInfo) *status.Messa
}
checkpoint := &status.MessageCheckpoint{
RoomID: evt.RoomID,
EventID: evt.EventID,
EventID: evt.SourceEventID,
Step: step,
Timestamp: jsontime.UnixMilliNow(),
Status: ms.checkpointStatus(),
@ -171,7 +172,7 @@ func (ms *MessageStatus) ToMSSEvent(evt *MessageStatusEventInfo) *event.BeeperMe
content := &event.BeeperMessageStatusEventContent{
RelatesTo: event.RelatesTo{
Type: event.RelReference,
EventID: evt.EventID,
EventID: evt.SourceEventID,
},
Status: ms.Status,
Reason: ms.ErrorReason,
@ -216,9 +217,9 @@ func (ms *MessageStatus) ToNoticeEvent(evt *MessageStatusEventInfo) *event.Messa
Mentions: &event.Mentions{},
}
if evt.ThreadRoot != "" {
content.RelatesTo.SetThread(evt.ThreadRoot, evt.EventID)
content.RelatesTo.SetThread(evt.ThreadRoot, evt.SourceEventID)
} else {
content.RelatesTo.SetReplyTo(evt.EventID)
content.RelatesTo.SetReplyTo(evt.SourceEventID)
}
if evt.Sender != "" {
content.Mentions.UserIDs = []id.UserID{evt.Sender}

View file

@ -440,9 +440,12 @@ func (portal *Portal) FindPreferredLogin(ctx context.Context, user *User, allowR
}
}
func (portal *Portal) sendSuccessStatus(ctx context.Context, evt *event.Event, streamOrder int64) {
func (portal *Portal) sendSuccessStatus(ctx context.Context, evt *event.Event, streamOrder int64, newEventID id.EventID) {
info := StatusEventInfoFromEvent(evt)
info.StreamOrder = streamOrder
if newEventID != evt.ID {
info.NewEventID = newEventID
}
portal.Bridge.Matrix.SendMessageStatus(ctx, &MessageStatus{Status: event.MessageStatusSuccess}, info)
}
@ -922,6 +925,9 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *UserLogin
if resp.DB == nil {
log.Error().Msg("Network connector didn't return a message to save")
} else {
if portal.Bridge.Config.OutgoingMessageReID {
message.MXID = portal.Bridge.Matrix.GenerateDeterministicEventID(portal.MXID, portal.PortalKey, message.ID, message.PartID)
}
// Hack to ensure the ghost row exists
// TODO move to better place (like login)
portal.Bridge.GetGhostByID(ctx, message.SenderID)
@ -937,7 +943,7 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *UserLogin
portal.outgoingMessagesLock.Unlock()
}
}
portal.sendSuccessStatus(ctx, evt, resp.StreamOrder)
portal.sendSuccessStatus(ctx, evt, resp.StreamOrder, message.MXID)
}
if portal.Disappear.Type != database.DisappearingTypeNone {
go portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{
@ -1090,7 +1096,7 @@ func (portal *Portal) handleMatrixEdit(ctx context.Context, sender *UserLogin, o
log.Err(err).Msg("Failed to save message to database after editing")
}
// TODO allow returning stream order from HandleMatrixEdit
portal.sendSuccessStatus(ctx, evt, 0)
portal.sendSuccessStatus(ctx, evt, 0, "")
}
func (portal *Portal) handleMatrixReaction(ctx context.Context, sender *UserLogin, evt *event.Event) {
@ -1144,7 +1150,7 @@ func (portal *Portal) handleMatrixReaction(ctx context.Context, sender *UserLogi
} else if existing != nil {
if existing.EmojiID != "" || existing.Emoji == preResp.Emoji {
log.Debug().Msg("Ignoring duplicate reaction")
portal.sendSuccessStatus(ctx, evt, 0)
portal.sendSuccessStatus(ctx, evt, 0, "")
return
}
react.ReactionToOverride = existing
@ -1226,7 +1232,7 @@ func (portal *Portal) handleMatrixReaction(ctx context.Context, sender *UserLogi
if err != nil {
log.Err(err).Msg("Failed to save reaction to database")
}
portal.sendSuccessStatus(ctx, evt, 0)
portal.sendSuccessStatus(ctx, evt, 0, "")
}
func handleMatrixRoomMeta[APIType any, ContentType any](
@ -1252,17 +1258,17 @@ func handleMatrixRoomMeta[APIType any, ContentType any](
switch typedContent := evt.Content.Parsed.(type) {
case *event.RoomNameEventContent:
if typedContent.Name == portal.Name {
portal.sendSuccessStatus(ctx, evt, 0)
portal.sendSuccessStatus(ctx, evt, 0, "")
return
}
case *event.TopicEventContent:
if typedContent.Topic == portal.Topic {
portal.sendSuccessStatus(ctx, evt, 0)
portal.sendSuccessStatus(ctx, evt, 0, "")
return
}
case *event.RoomAvatarEventContent:
if typedContent.URL == portal.AvatarMXC {
portal.sendSuccessStatus(ctx, evt, 0)
portal.sendSuccessStatus(ctx, evt, 0, "")
return
}
}
@ -1293,7 +1299,7 @@ func handleMatrixRoomMeta[APIType any, ContentType any](
log.Err(err).Msg("Failed to save portal after updating room metadata")
}
}
portal.sendSuccessStatus(ctx, evt, 0)
portal.sendSuccessStatus(ctx, evt, 0, "")
}
func handleMatrixAccountData[APIType any, ContentType any](
@ -1583,7 +1589,7 @@ func (portal *Portal) handleMatrixRedaction(ctx context.Context, sender *UserLog
return
}
// TODO delete msg/reaction db row
portal.sendSuccessStatus(ctx, evt, 0)
portal.sendSuccessStatus(ctx, evt, 0, "")
}
func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) {
@ -1894,6 +1900,9 @@ func (portal *Portal) checkPendingMessage(ctx context.Context, evt RemoteMessage
saveMessage, statusErr = pending.handle(evt, pending.db)
}
if saveMessage {
if portal.Bridge.Config.OutgoingMessageReID {
pending.db.MXID = portal.Bridge.Matrix.GenerateDeterministicEventID(portal.MXID, portal.PortalKey, pending.db.ID, pending.db.PartID)
}
// Hack to ensure the ghost row exists
// TODO move to better place (like login)
portal.Bridge.GetGhostByID(ctx, pending.db.SenderID)
@ -1906,7 +1915,7 @@ func (portal *Portal) checkPendingMessage(ctx context.Context, evt RemoteMessage
if statusErr != nil {
portal.sendErrorStatus(ctx, pending.evt, statusErr)
} else {
portal.sendSuccessStatus(ctx, pending.evt, getStreamOrder(evt))
portal.sendSuccessStatus(ctx, pending.evt, getStreamOrder(evt), pending.evt.ID)
}
}
zerolog.Ctx(ctx).Debug().Stringer("event_id", pending.evt.ID).Msg("Received remote echo for message")
@ -2571,9 +2580,9 @@ func (portal *Portal) handleRemoteDeliveryReceipt(ctx context.Context, source *U
Status: event.MessageStatusSuccess,
DeliveredTo: []id.UserID{intent.GetMXID()},
}, &MessageStatusEventInfo{
RoomID: portal.MXID,
EventID: part.MXID,
Sender: part.SenderMXID,
RoomID: portal.MXID,
SourceEventID: part.MXID,
Sender: part.SenderMXID,
})
}
}

View file

@ -49,8 +49,8 @@ func (portal *PortalInternals) HandleSingleEvent(ctx context.Context, rawEvt any
(*Portal)(portal).handleSingleEvent(ctx, rawEvt, doneCallback)
}
func (portal *PortalInternals) SendSuccessStatus(ctx context.Context, evt *event.Event, streamOrder int64) {
(*Portal)(portal).sendSuccessStatus(ctx, evt, streamOrder)
func (portal *PortalInternals) SendSuccessStatus(ctx context.Context, evt *event.Event, streamOrder int64, newEventID id.EventID) {
(*Portal)(portal).sendSuccessStatus(ctx, evt, streamOrder, newEventID)
}
func (portal *PortalInternals) SendErrorStatus(ctx context.Context, evt *event.Event, err error) {