bridgev2/disappear: only start timers for read messages rather than all pending ones (#415)

This commit is contained in:
Tulir Asokan 2025-10-23 15:12:42 +03:00 committed by GitHub
commit 756196ad4f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 30 additions and 17 deletions

View file

@ -67,26 +67,27 @@ type DisappearingMessageQuery struct {
}
type DisappearingMessage struct {
BridgeID networkid.BridgeID
RoomID id.RoomID
EventID id.EventID
BridgeID networkid.BridgeID
RoomID id.RoomID
EventID id.EventID
Timestamp time.Time
DisappearingSetting
}
const (
upsertDisappearingMessageQuery = `
INSERT INTO disappearing_message (bridge_id, mx_room, mxid, type, timer, disappear_at)
VALUES ($1, $2, $3, $4, $5, $6)
INSERT INTO disappearing_message (bridge_id, mx_room, mxid, timestamp, type, timer, disappear_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (bridge_id, mxid) DO UPDATE SET timer=excluded.timer, disappear_at=excluded.disappear_at
`
startDisappearingMessagesQuery = `
UPDATE disappearing_message
SET disappear_at=$1 + timer
WHERE bridge_id=$2 AND mx_room=$3 AND disappear_at IS NULL AND type='after_read'
RETURNING bridge_id, mx_room, mxid, type, timer, disappear_at
WHERE bridge_id=$2 AND mx_room=$3 AND disappear_at IS NULL AND type='after_read' AND timestamp<=$4
RETURNING bridge_id, mx_room, mxid, timestamp, type, timer, disappear_at
`
getUpcomingDisappearingMessagesQuery = `
SELECT bridge_id, mx_room, mxid, type, timer, disappear_at
SELECT bridge_id, mx_room, mxid, timestamp, type, timer, disappear_at
FROM disappearing_message WHERE bridge_id = $1 AND disappear_at IS NOT NULL AND disappear_at < $2
ORDER BY disappear_at LIMIT $3
`
@ -100,8 +101,8 @@ func (dmq *DisappearingMessageQuery) Put(ctx context.Context, dm *DisappearingMe
return dmq.Exec(ctx, upsertDisappearingMessageQuery, dm.sqlVariables()...)
}
func (dmq *DisappearingMessageQuery) StartAll(ctx context.Context, roomID id.RoomID) ([]*DisappearingMessage, error) {
return dmq.QueryMany(ctx, startDisappearingMessagesQuery, time.Now().UnixNano(), dmq.BridgeID, roomID)
func (dmq *DisappearingMessageQuery) StartAllBefore(ctx context.Context, roomID id.RoomID, beforeTS time.Time) ([]*DisappearingMessage, error) {
return dmq.QueryMany(ctx, startDisappearingMessagesQuery, time.Now().UnixNano(), dmq.BridgeID, roomID, beforeTS.UnixNano())
}
func (dmq *DisappearingMessageQuery) GetUpcoming(ctx context.Context, duration time.Duration, limit int) ([]*DisappearingMessage, error) {
@ -113,17 +114,19 @@ func (dmq *DisappearingMessageQuery) Delete(ctx context.Context, eventID id.Even
}
func (d *DisappearingMessage) Scan(row dbutil.Scannable) (*DisappearingMessage, error) {
var timestamp int64
var disappearAt sql.NullInt64
err := row.Scan(&d.BridgeID, &d.RoomID, &d.EventID, &d.Type, &d.Timer, &disappearAt)
err := row.Scan(&d.BridgeID, &d.RoomID, &d.EventID, &timestamp, &d.Type, &d.Timer, &disappearAt)
if err != nil {
return nil, err
}
if disappearAt.Valid {
d.DisappearAt = time.Unix(0, disappearAt.Int64)
}
d.Timestamp = time.Unix(0, timestamp)
return d, nil
}
func (d *DisappearingMessage) sqlVariables() []any {
return []any{d.BridgeID, d.RoomID, d.EventID, d.Type, d.Timer, dbutil.ConvertedPtr(d.DisappearAt, time.Time.UnixNano)}
return []any{d.BridgeID, d.RoomID, d.EventID, d.Timestamp.UnixNano(), d.Type, d.Timer, dbutil.ConvertedPtr(d.DisappearAt, time.Time.UnixNano)}
}

View file

@ -1,4 +1,4 @@
-- v0 -> v22 (compatible with v9+): Latest revision
-- v0 -> v23 (compatible with v9+): Latest revision
CREATE TABLE "user" (
bridge_id TEXT NOT NULL,
mxid TEXT NOT NULL,
@ -127,6 +127,7 @@ CREATE TABLE disappearing_message (
bridge_id TEXT NOT NULL,
mx_room TEXT NOT NULL,
mxid TEXT NOT NULL,
timestamp BIGINT NOT NULL DEFAULT 0,
type TEXT NOT NULL,
timer BIGINT NOT NULL,
disappear_at BIGINT,

View file

@ -0,0 +1,2 @@
-- v23 (compatible with v9+): Add event timestamp for disappearing messages
ALTER TABLE disappearing_message ADD COLUMN timestamp BIGINT NOT NULL DEFAULT 0;

View file

@ -86,8 +86,8 @@ func (dl *DisappearLoop) Stop() {
}
}
func (dl *DisappearLoop) StartAll(ctx context.Context, roomID id.RoomID) {
startedMessages, err := dl.br.DB.DisappearingMessage.StartAll(ctx, roomID)
func (dl *DisappearLoop) StartAllBefore(ctx context.Context, roomID id.RoomID, beforeTS time.Time) {
startedMessages, err := dl.br.DB.DisappearingMessage.StartAllBefore(ctx, roomID, beforeTS)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to start disappearing messages")
return

View file

@ -845,7 +845,7 @@ func (portal *Portal) callReadReceiptHandler(
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to save user portal metadata")
}
portal.Bridge.DisappearLoop.StartAll(ctx, portal.MXID)
portal.Bridge.DisappearLoop.StartAllBefore(ctx, portal.MXID, evt.ReadUpTo)
}
func (portal *Portal) handleMatrixTyping(ctx context.Context, evt *event.Event) EventHandlingResult {
@ -1193,6 +1193,7 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *UserLogin
go portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{
RoomID: portal.MXID,
EventID: message.MXID,
Timestamp: message.Timestamp,
DisappearingSetting: portal.Disappear.StartingAt(message.Timestamp),
})
}
@ -2588,6 +2589,7 @@ func (portal *Portal) sendConvertedMessage(
portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{
RoomID: portal.MXID,
EventID: dbMessage.MXID,
Timestamp: dbMessage.Timestamp,
DisappearingSetting: converted.Disappear,
})
}
@ -3374,11 +3376,15 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
return evt.Int64("target_stream_order", targetStreamOrder)
}
err = soIntent.MarkStreamOrderRead(ctx, portal.MXID, targetStreamOrder, getEventTS(evt))
if readUpTo.IsZero() {
readUpTo = getEventTS(evt)
}
} else {
addTargetLog = func(evt *zerolog.Event) *zerolog.Event {
return evt.Stringer("target_mxid", lastTarget.MXID)
}
err = intent.MarkRead(ctx, portal.MXID, lastTarget.MXID, getEventTS(evt))
readUpTo = lastTarget.Timestamp
}
if err != nil {
addTargetLog(log.Err(err)).Msg("Failed to bridge read receipt")
@ -3387,7 +3393,7 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL
addTargetLog(log.Debug()).Msg("Bridged read receipt")
}
if sender.IsFromMe {
portal.Bridge.DisappearLoop.StartAll(ctx, portal.MXID)
portal.Bridge.DisappearLoop.StartAllBefore(ctx, portal.MXID, readUpTo)
}
return EventHandlingResultSuccess
}

View file

@ -387,6 +387,7 @@ func (portal *Portal) compileBatchMessage(ctx context.Context, source *UserLogin
out.Disappear = append(out.Disappear, &database.DisappearingMessage{
RoomID: portal.MXID,
EventID: evtID,
Timestamp: msg.Timestamp,
DisappearingSetting: msg.Disappear,
})
}