bridgev2: add option to deduplicate Matrix messages by event or transaction ID

This commit is contained in:
Tulir Asokan 2025-05-05 23:38:26 +03:00
commit 970ea996a2
7 changed files with 69 additions and 28 deletions

View file

@ -58,24 +58,25 @@ type CleanupOnLogouts struct {
}
type BridgeConfig struct {
CommandPrefix string `yaml:"command_prefix"`
PersonalFilteringSpaces bool `yaml:"personal_filtering_spaces"`
PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"`
AsyncEvents bool `yaml:"async_events"`
SplitPortals bool `yaml:"split_portals"`
ResendBridgeInfo bool `yaml:"resend_bridge_info"`
NoBridgeInfoStateKey bool `yaml:"no_bridge_info_state_key"`
BridgeStatusNotices string `yaml:"bridge_status_notices"`
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`
BridgeNotices bool `yaml:"bridge_notices"`
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
OnlyBridgeTags []event.RoomTag `yaml:"only_bridge_tags"`
MuteOnlyOnCreate bool `yaml:"mute_only_on_create"`
OutgoingMessageReID bool `yaml:"outgoing_message_re_id"`
CleanupOnLogout CleanupOnLogouts `yaml:"cleanup_on_logout"`
Relay RelayConfig `yaml:"relay"`
Permissions PermissionConfig `yaml:"permissions"`
Backfill BackfillConfig `yaml:"backfill"`
CommandPrefix string `yaml:"command_prefix"`
PersonalFilteringSpaces bool `yaml:"personal_filtering_spaces"`
PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"`
AsyncEvents bool `yaml:"async_events"`
SplitPortals bool `yaml:"split_portals"`
ResendBridgeInfo bool `yaml:"resend_bridge_info"`
NoBridgeInfoStateKey bool `yaml:"no_bridge_info_state_key"`
BridgeStatusNotices string `yaml:"bridge_status_notices"`
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`
BridgeNotices bool `yaml:"bridge_notices"`
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
OnlyBridgeTags []event.RoomTag `yaml:"only_bridge_tags"`
MuteOnlyOnCreate bool `yaml:"mute_only_on_create"`
DeduplicateMatrixMessages bool `yaml:"deduplicate_matrix_messages"`
OutgoingMessageReID bool `yaml:"outgoing_message_re_id"`
CleanupOnLogout CleanupOnLogouts `yaml:"cleanup_on_logout"`
Relay RelayConfig `yaml:"relay"`
Permissions PermissionConfig `yaml:"permissions"`
Backfill BackfillConfig `yaml:"backfill"`
}
type MatrixConfig struct {

View file

@ -37,6 +37,7 @@ func doUpgrade(helper up.Helper) {
helper.Copy(up.Bool, "bridge", "tag_only_on_create")
helper.Copy(up.List, "bridge", "only_bridge_tags")
helper.Copy(up.Bool, "bridge", "mute_only_on_create")
helper.Copy(up.Bool, "bridge", "deduplicate_matrix_messages")
helper.Copy(up.Bool, "bridge", "cleanup_on_logout", "enabled")
helper.Copy(up.Str, "bridge", "cleanup_on_logout", "manual", "private")
helper.Copy(up.Str, "bridge", "cleanup_on_logout", "manual", "relayed")

View file

@ -43,19 +43,23 @@ type Message struct {
ThreadRoot networkid.MessageID
ReplyTo networkid.MessageOptionalPartID
SendTxnID networkid.RawTransactionID
Metadata any
}
const (
getMessageBaseQuery = `
SELECT rowid, bridge_id, id, part_id, mxid, room_id, room_receiver, sender_id, sender_mxid,
timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id, metadata
timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id,
send_txn_id, metadata
FROM message
`
getAllMessagePartsByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3`
getMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 AND part_id=$4`
getMessagePartByRowIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND rowid=$2`
getMessageByMXIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND mxid=$2`
getMessageByTxnIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND (mxid=$3 OR send_txn_id=$4)`
getLastMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 ORDER BY part_id DESC LIMIT 1`
getFirstMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 ORDER BY part_id ASC LIMIT 1`
getMessagesBetweenTimeQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND room_id=$2 AND room_receiver=$3 AND timestamp>$4 AND timestamp<=$5`
@ -73,16 +77,17 @@ const (
insertMessageQuery = `
INSERT INTO message (
bridge_id, id, part_id, mxid, room_id, room_receiver, sender_id, sender_mxid,
timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id, metadata
timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id,
send_txn_id, metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING rowid
`
updateMessageQuery = `
UPDATE message SET id=$2, part_id=$3, mxid=$4, room_id=$5, room_receiver=$6, sender_id=$7, sender_mxid=$8,
timestamp=$9, edit_count=$10, double_puppeted=$11, thread_root_id=$12, reply_to_id=$13,
reply_to_part_id=$14, metadata=$15
WHERE bridge_id=$1 AND rowid=$16
reply_to_part_id=$14, send_txn_id=$15, metadata=$16
WHERE bridge_id=$1 AND rowid=$17
`
deleteAllMessagePartsByIDQuery = `
DELETE FROM message WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3
@ -104,6 +109,10 @@ func (mq *MessageQuery) GetPartByMXID(ctx context.Context, mxid id.EventID) (*Me
return mq.QueryOne(ctx, getMessageByMXIDQuery, mq.BridgeID, mxid)
}
func (mq *MessageQuery) GetPartByTxnID(ctx context.Context, receiver networkid.UserLoginID, mxid id.EventID, txnID networkid.RawTransactionID) (*Message, error) {
return mq.QueryOne(ctx, getMessageByTxnIDQuery, mq.BridgeID, mxid, txnID)
}
func (mq *MessageQuery) GetLastPartByID(ctx context.Context, receiver networkid.UserLoginID, id networkid.MessageID) (*Message, error) {
return mq.QueryOne(ctx, getLastMessagePartByIDQuery, mq.BridgeID, receiver, id)
}
@ -173,11 +182,12 @@ func (mq *MessageQuery) CountMessagesInPortal(ctx context.Context, key networkid
func (m *Message) Scan(row dbutil.Scannable) (*Message, error) {
var timestamp int64
var threadRootID, replyToID, replyToPartID sql.NullString
var threadRootID, replyToID, replyToPartID, sendTxnID sql.NullString
var doublePuppeted sql.NullBool
err := row.Scan(
&m.RowID, &m.BridgeID, &m.ID, &m.PartID, &m.MXID, &m.Room.ID, &m.Room.Receiver, &m.SenderID, &m.SenderMXID,
&timestamp, &m.EditCount, &doublePuppeted, &threadRootID, &replyToID, &replyToPartID, dbutil.JSON{Data: m.Metadata},
&timestamp, &m.EditCount, &doublePuppeted, &threadRootID, &replyToID, &replyToPartID, &sendTxnID,
dbutil.JSON{Data: m.Metadata},
)
if err != nil {
return nil, err
@ -191,6 +201,9 @@ func (m *Message) Scan(row dbutil.Scannable) (*Message, error) {
m.ReplyTo.PartID = (*networkid.PartID)(&replyToPartID.String)
}
}
if sendTxnID.Valid {
m.SendTxnID = networkid.RawTransactionID(sendTxnID.String)
}
return m, nil
}
@ -205,7 +218,8 @@ func (m *Message) sqlVariables() []any {
return []any{
m.BridgeID, m.ID, m.PartID, m.MXID, m.Room.ID, m.Room.Receiver, m.SenderID, m.SenderMXID,
m.Timestamp.UnixNano(), m.EditCount, m.IsDoublePuppeted, dbutil.StrPtr(m.ThreadRoot),
dbutil.StrPtr(m.ReplyTo.MessageID), m.ReplyTo.PartID, dbutil.JSON{Data: m.Metadata},
dbutil.StrPtr(m.ReplyTo.MessageID), m.ReplyTo.PartID, dbutil.StrPtr(m.SendTxnID),
dbutil.JSON{Data: m.Metadata},
}
}

View file

@ -1,4 +1,4 @@
-- v0 -> v21 (compatible with v9+): Latest revision
-- v0 -> v22 (compatible with v9+): Latest revision
CREATE TABLE "user" (
bridge_id TEXT NOT NULL,
mxid TEXT NOT NULL,
@ -108,6 +108,7 @@ CREATE TABLE message (
thread_root_id TEXT,
reply_to_id TEXT,
reply_to_part_id TEXT,
send_txn_id TEXT,
metadata jsonb NOT NULL,
CONSTRAINT message_room_fkey FOREIGN KEY (bridge_id, room_id, room_receiver)
@ -117,7 +118,8 @@ CREATE TABLE message (
REFERENCES ghost (bridge_id, id)
ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT message_real_pkey UNIQUE (bridge_id, room_receiver, id, part_id),
CONSTRAINT message_mxid_unique UNIQUE (bridge_id, mxid)
CONSTRAINT message_mxid_unique UNIQUE (bridge_id, mxid),
CONSTRAINT message_txn_id_unique UNIQUE (bridge_id, room_receiver, send_txn_id)
);
CREATE INDEX message_room_idx ON message (bridge_id, room_id, room_receiver);

View file

@ -0,0 +1,6 @@
-- v22 (compatible with v9+): Add message send transaction ID column
ALTER TABLE message ADD COLUMN send_txn_id TEXT;
-- only: postgres
ALTER TABLE message ADD CONSTRAINT message_txn_id_unique UNIQUE (bridge_id, room_receiver, send_txn_id);
-- only: sqlite
CREATE UNIQUE INDEX message_txn_id_unique ON message (bridge_id, room_receiver, send_txn_id);

View file

@ -38,6 +38,8 @@ bridge:
# Should room mute status only be synced when creating the portal?
# Like tags, mutes can't currently be synced back to the remote network.
mute_only_on_create: true
# Should the bridge check the db to ensure that incoming events haven't been handled before
deduplicate_matrix_messages: false
# What should be done to portal rooms when a user logs out or is logged out?
# Permitted values:

View file

@ -951,6 +951,18 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *UserLogin
ThreadRoot: threadRoot,
ReplyTo: replyTo,
}
if portal.Bridge.Config.DeduplicateMatrixMessages {
if part, err := portal.Bridge.DB.Message.GetPartByTxnID(ctx, portal.Receiver, evt.ID, wrappedMsgEvt.InputTransactionID); err != nil {
log.Err(err).Msg("Failed to check db if message is already sent")
} else if part != nil {
log.Debug().
Stringer("message_mxid", part.MXID).
Stringer("input_event_id", evt.ID).
Msg("Message already sent, ignoring")
return
}
}
var resp *MatrixMessageResponse
if msgContent != nil {
resp, err = sender.Client.HandleMatrixMessage(ctx, wrappedMsgEvt)
@ -1091,6 +1103,9 @@ func (evt *MatrixMessage) fillDBMessage(message *database.Message) *database.Mes
if message.SenderMXID == "" {
message.SenderMXID = evt.Event.Sender
}
if message.SendTxnID != "" {
message.SendTxnID = evt.InputTransactionID
}
return message
}