From 970ea996a2f47d43e48a754dd0c6f7f610ba9a05 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 5 May 2025 23:38:26 +0300 Subject: [PATCH] bridgev2: add option to deduplicate Matrix messages by event or transaction ID --- bridgev2/bridgeconfig/config.go | 37 ++++++++++--------- bridgev2/bridgeconfig/upgrade.go | 1 + bridgev2/database/message.go | 30 +++++++++++---- bridgev2/database/upgrades/00-latest.sql | 6 ++- .../upgrades/22-message-send-txn-id.sql | 6 +++ bridgev2/matrix/mxmain/example-config.yaml | 2 + bridgev2/portal.go | 15 ++++++++ 7 files changed, 69 insertions(+), 28 deletions(-) create mode 100644 bridgev2/database/upgrades/22-message-send-txn-id.sql diff --git a/bridgev2/bridgeconfig/config.go b/bridgev2/bridgeconfig/config.go index 937d9441..bd7746d1 100644 --- a/bridgev2/bridgeconfig/config.go +++ b/bridgev2/bridgeconfig/config.go @@ -58,24 +58,25 @@ type CleanupOnLogouts struct { } type BridgeConfig struct { - CommandPrefix string `yaml:"command_prefix"` - PersonalFilteringSpaces bool `yaml:"personal_filtering_spaces"` - PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"` - AsyncEvents bool `yaml:"async_events"` - SplitPortals bool `yaml:"split_portals"` - ResendBridgeInfo bool `yaml:"resend_bridge_info"` - NoBridgeInfoStateKey bool `yaml:"no_bridge_info_state_key"` - BridgeStatusNotices string `yaml:"bridge_status_notices"` - BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"` - BridgeNotices bool `yaml:"bridge_notices"` - TagOnlyOnCreate bool `yaml:"tag_only_on_create"` - OnlyBridgeTags []event.RoomTag `yaml:"only_bridge_tags"` - MuteOnlyOnCreate bool `yaml:"mute_only_on_create"` - OutgoingMessageReID bool `yaml:"outgoing_message_re_id"` - CleanupOnLogout CleanupOnLogouts `yaml:"cleanup_on_logout"` - Relay RelayConfig `yaml:"relay"` - Permissions PermissionConfig `yaml:"permissions"` - Backfill BackfillConfig `yaml:"backfill"` + CommandPrefix string `yaml:"command_prefix"` + PersonalFilteringSpaces bool `yaml:"personal_filtering_spaces"` + PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"` + AsyncEvents bool `yaml:"async_events"` + SplitPortals bool `yaml:"split_portals"` + ResendBridgeInfo bool `yaml:"resend_bridge_info"` + NoBridgeInfoStateKey bool `yaml:"no_bridge_info_state_key"` + BridgeStatusNotices string `yaml:"bridge_status_notices"` + BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"` + BridgeNotices bool `yaml:"bridge_notices"` + TagOnlyOnCreate bool `yaml:"tag_only_on_create"` + OnlyBridgeTags []event.RoomTag `yaml:"only_bridge_tags"` + MuteOnlyOnCreate bool `yaml:"mute_only_on_create"` + DeduplicateMatrixMessages bool `yaml:"deduplicate_matrix_messages"` + OutgoingMessageReID bool `yaml:"outgoing_message_re_id"` + CleanupOnLogout CleanupOnLogouts `yaml:"cleanup_on_logout"` + Relay RelayConfig `yaml:"relay"` + Permissions PermissionConfig `yaml:"permissions"` + Backfill BackfillConfig `yaml:"backfill"` } type MatrixConfig struct { diff --git a/bridgev2/bridgeconfig/upgrade.go b/bridgev2/bridgeconfig/upgrade.go index 95370681..18b98263 100644 --- a/bridgev2/bridgeconfig/upgrade.go +++ b/bridgev2/bridgeconfig/upgrade.go @@ -37,6 +37,7 @@ func doUpgrade(helper up.Helper) { helper.Copy(up.Bool, "bridge", "tag_only_on_create") helper.Copy(up.List, "bridge", "only_bridge_tags") helper.Copy(up.Bool, "bridge", "mute_only_on_create") + helper.Copy(up.Bool, "bridge", "deduplicate_matrix_messages") helper.Copy(up.Bool, "bridge", "cleanup_on_logout", "enabled") helper.Copy(up.Str, "bridge", "cleanup_on_logout", "manual", "private") helper.Copy(up.Str, "bridge", "cleanup_on_logout", "manual", "relayed") diff --git a/bridgev2/database/message.go b/bridgev2/database/message.go index fd6b65d8..880bd0f1 100644 --- a/bridgev2/database/message.go +++ b/bridgev2/database/message.go @@ -43,19 +43,23 @@ type Message struct { ThreadRoot networkid.MessageID ReplyTo networkid.MessageOptionalPartID + SendTxnID networkid.RawTransactionID + Metadata any } const ( getMessageBaseQuery = ` SELECT rowid, bridge_id, id, part_id, mxid, room_id, room_receiver, sender_id, sender_mxid, - timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id, metadata + timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id, + send_txn_id, metadata FROM message ` getAllMessagePartsByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3` getMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 AND part_id=$4` getMessagePartByRowIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND rowid=$2` getMessageByMXIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND mxid=$2` + getMessageByTxnIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND (mxid=$3 OR send_txn_id=$4)` getLastMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 ORDER BY part_id DESC LIMIT 1` getFirstMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 ORDER BY part_id ASC LIMIT 1` getMessagesBetweenTimeQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND room_id=$2 AND room_receiver=$3 AND timestamp>$4 AND timestamp<=$5` @@ -73,16 +77,17 @@ const ( insertMessageQuery = ` INSERT INTO message ( bridge_id, id, part_id, mxid, room_id, room_receiver, sender_id, sender_mxid, - timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id, metadata + timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id, + send_txn_id, metadata ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING rowid ` updateMessageQuery = ` UPDATE message SET id=$2, part_id=$3, mxid=$4, room_id=$5, room_receiver=$6, sender_id=$7, sender_mxid=$8, timestamp=$9, edit_count=$10, double_puppeted=$11, thread_root_id=$12, reply_to_id=$13, - reply_to_part_id=$14, metadata=$15 - WHERE bridge_id=$1 AND rowid=$16 + reply_to_part_id=$14, send_txn_id=$15, metadata=$16 + WHERE bridge_id=$1 AND rowid=$17 ` deleteAllMessagePartsByIDQuery = ` DELETE FROM message WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 @@ -104,6 +109,10 @@ func (mq *MessageQuery) GetPartByMXID(ctx context.Context, mxid id.EventID) (*Me return mq.QueryOne(ctx, getMessageByMXIDQuery, mq.BridgeID, mxid) } +func (mq *MessageQuery) GetPartByTxnID(ctx context.Context, receiver networkid.UserLoginID, mxid id.EventID, txnID networkid.RawTransactionID) (*Message, error) { + return mq.QueryOne(ctx, getMessageByTxnIDQuery, mq.BridgeID, mxid, txnID) +} + func (mq *MessageQuery) GetLastPartByID(ctx context.Context, receiver networkid.UserLoginID, id networkid.MessageID) (*Message, error) { return mq.QueryOne(ctx, getLastMessagePartByIDQuery, mq.BridgeID, receiver, id) } @@ -173,11 +182,12 @@ func (mq *MessageQuery) CountMessagesInPortal(ctx context.Context, key networkid func (m *Message) Scan(row dbutil.Scannable) (*Message, error) { var timestamp int64 - var threadRootID, replyToID, replyToPartID sql.NullString + var threadRootID, replyToID, replyToPartID, sendTxnID sql.NullString var doublePuppeted sql.NullBool err := row.Scan( &m.RowID, &m.BridgeID, &m.ID, &m.PartID, &m.MXID, &m.Room.ID, &m.Room.Receiver, &m.SenderID, &m.SenderMXID, - ×tamp, &m.EditCount, &doublePuppeted, &threadRootID, &replyToID, &replyToPartID, dbutil.JSON{Data: m.Metadata}, + ×tamp, &m.EditCount, &doublePuppeted, &threadRootID, &replyToID, &replyToPartID, &sendTxnID, + dbutil.JSON{Data: m.Metadata}, ) if err != nil { return nil, err @@ -191,6 +201,9 @@ func (m *Message) Scan(row dbutil.Scannable) (*Message, error) { m.ReplyTo.PartID = (*networkid.PartID)(&replyToPartID.String) } } + if sendTxnID.Valid { + m.SendTxnID = networkid.RawTransactionID(sendTxnID.String) + } return m, nil } @@ -205,7 +218,8 @@ func (m *Message) sqlVariables() []any { return []any{ m.BridgeID, m.ID, m.PartID, m.MXID, m.Room.ID, m.Room.Receiver, m.SenderID, m.SenderMXID, m.Timestamp.UnixNano(), m.EditCount, m.IsDoublePuppeted, dbutil.StrPtr(m.ThreadRoot), - dbutil.StrPtr(m.ReplyTo.MessageID), m.ReplyTo.PartID, dbutil.JSON{Data: m.Metadata}, + dbutil.StrPtr(m.ReplyTo.MessageID), m.ReplyTo.PartID, dbutil.StrPtr(m.SendTxnID), + dbutil.JSON{Data: m.Metadata}, } } diff --git a/bridgev2/database/upgrades/00-latest.sql b/bridgev2/database/upgrades/00-latest.sql index 7ad01a87..4eea05bb 100644 --- a/bridgev2/database/upgrades/00-latest.sql +++ b/bridgev2/database/upgrades/00-latest.sql @@ -1,4 +1,4 @@ --- v0 -> v21 (compatible with v9+): Latest revision +-- v0 -> v22 (compatible with v9+): Latest revision CREATE TABLE "user" ( bridge_id TEXT NOT NULL, mxid TEXT NOT NULL, @@ -108,6 +108,7 @@ CREATE TABLE message ( thread_root_id TEXT, reply_to_id TEXT, reply_to_part_id TEXT, + send_txn_id TEXT, metadata jsonb NOT NULL, CONSTRAINT message_room_fkey FOREIGN KEY (bridge_id, room_id, room_receiver) @@ -117,7 +118,8 @@ CREATE TABLE message ( REFERENCES ghost (bridge_id, id) ON DELETE CASCADE ON UPDATE CASCADE, CONSTRAINT message_real_pkey UNIQUE (bridge_id, room_receiver, id, part_id), - CONSTRAINT message_mxid_unique UNIQUE (bridge_id, mxid) + CONSTRAINT message_mxid_unique UNIQUE (bridge_id, mxid), + CONSTRAINT message_txn_id_unique UNIQUE (bridge_id, room_receiver, send_txn_id) ); CREATE INDEX message_room_idx ON message (bridge_id, room_id, room_receiver); diff --git a/bridgev2/database/upgrades/22-message-send-txn-id.sql b/bridgev2/database/upgrades/22-message-send-txn-id.sql new file mode 100644 index 00000000..8933984e --- /dev/null +++ b/bridgev2/database/upgrades/22-message-send-txn-id.sql @@ -0,0 +1,6 @@ +-- v22 (compatible with v9+): Add message send transaction ID column +ALTER TABLE message ADD COLUMN send_txn_id TEXT; +-- only: postgres +ALTER TABLE message ADD CONSTRAINT message_txn_id_unique UNIQUE (bridge_id, room_receiver, send_txn_id); +-- only: sqlite +CREATE UNIQUE INDEX message_txn_id_unique ON message (bridge_id, room_receiver, send_txn_id); diff --git a/bridgev2/matrix/mxmain/example-config.yaml b/bridgev2/matrix/mxmain/example-config.yaml index 1d4e18cf..4dee2650 100644 --- a/bridgev2/matrix/mxmain/example-config.yaml +++ b/bridgev2/matrix/mxmain/example-config.yaml @@ -38,6 +38,8 @@ bridge: # Should room mute status only be synced when creating the portal? # Like tags, mutes can't currently be synced back to the remote network. mute_only_on_create: true + # Should the bridge check the db to ensure that incoming events haven't been handled before + deduplicate_matrix_messages: false # What should be done to portal rooms when a user logs out or is logged out? # Permitted values: diff --git a/bridgev2/portal.go b/bridgev2/portal.go index d88f5a7c..8256f41e 100644 --- a/bridgev2/portal.go +++ b/bridgev2/portal.go @@ -951,6 +951,18 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *UserLogin ThreadRoot: threadRoot, ReplyTo: replyTo, } + if portal.Bridge.Config.DeduplicateMatrixMessages { + if part, err := portal.Bridge.DB.Message.GetPartByTxnID(ctx, portal.Receiver, evt.ID, wrappedMsgEvt.InputTransactionID); err != nil { + log.Err(err).Msg("Failed to check db if message is already sent") + } else if part != nil { + log.Debug(). + Stringer("message_mxid", part.MXID). + Stringer("input_event_id", evt.ID). + Msg("Message already sent, ignoring") + return + } + } + var resp *MatrixMessageResponse if msgContent != nil { resp, err = sender.Client.HandleMatrixMessage(ctx, wrappedMsgEvt) @@ -1091,6 +1103,9 @@ func (evt *MatrixMessage) fillDBMessage(message *database.Message) *database.Mes if message.SenderMXID == "" { message.SenderMXID = evt.Event.Sender } + if message.SendTxnID != "" { + message.SendTxnID = evt.InputTransactionID + } return message }