diff --git a/bridgev2/database/disappear.go b/bridgev2/database/disappear.go index 9874e472..c2d7d56c 100644 --- a/bridgev2/database/disappear.go +++ b/bridgev2/database/disappear.go @@ -67,26 +67,27 @@ type DisappearingMessageQuery struct { } type DisappearingMessage struct { - BridgeID networkid.BridgeID - RoomID id.RoomID - EventID id.EventID + BridgeID networkid.BridgeID + RoomID id.RoomID + EventID id.EventID + Timestamp time.Time DisappearingSetting } const ( upsertDisappearingMessageQuery = ` - INSERT INTO disappearing_message (bridge_id, mx_room, mxid, type, timer, disappear_at) - VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO disappearing_message (bridge_id, mx_room, mxid, timestamp, type, timer, disappear_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (bridge_id, mxid) DO UPDATE SET timer=excluded.timer, disappear_at=excluded.disappear_at ` startDisappearingMessagesQuery = ` UPDATE disappearing_message SET disappear_at=$1 + timer - WHERE bridge_id=$2 AND mx_room=$3 AND disappear_at IS NULL AND type='after_read' - RETURNING bridge_id, mx_room, mxid, type, timer, disappear_at + WHERE bridge_id=$2 AND mx_room=$3 AND disappear_at IS NULL AND type='after_read' AND timestamp<=$4 + RETURNING bridge_id, mx_room, mxid, timestamp, type, timer, disappear_at ` getUpcomingDisappearingMessagesQuery = ` - SELECT bridge_id, mx_room, mxid, type, timer, disappear_at + SELECT bridge_id, mx_room, mxid, timestamp, type, timer, disappear_at FROM disappearing_message WHERE bridge_id = $1 AND disappear_at IS NOT NULL AND disappear_at < $2 ORDER BY disappear_at LIMIT $3 ` @@ -100,8 +101,8 @@ func (dmq *DisappearingMessageQuery) Put(ctx context.Context, dm *DisappearingMe return dmq.Exec(ctx, upsertDisappearingMessageQuery, dm.sqlVariables()...) } -func (dmq *DisappearingMessageQuery) StartAll(ctx context.Context, roomID id.RoomID) ([]*DisappearingMessage, error) { - return dmq.QueryMany(ctx, startDisappearingMessagesQuery, time.Now().UnixNano(), dmq.BridgeID, roomID) +func (dmq *DisappearingMessageQuery) StartAllBefore(ctx context.Context, roomID id.RoomID, beforeTS time.Time) ([]*DisappearingMessage, error) { + return dmq.QueryMany(ctx, startDisappearingMessagesQuery, time.Now().UnixNano(), dmq.BridgeID, roomID, beforeTS.UnixNano()) } func (dmq *DisappearingMessageQuery) GetUpcoming(ctx context.Context, duration time.Duration, limit int) ([]*DisappearingMessage, error) { @@ -113,17 +114,19 @@ func (dmq *DisappearingMessageQuery) Delete(ctx context.Context, eventID id.Even } func (d *DisappearingMessage) Scan(row dbutil.Scannable) (*DisappearingMessage, error) { + var timestamp int64 var disappearAt sql.NullInt64 - err := row.Scan(&d.BridgeID, &d.RoomID, &d.EventID, &d.Type, &d.Timer, &disappearAt) + err := row.Scan(&d.BridgeID, &d.RoomID, &d.EventID, ×tamp, &d.Type, &d.Timer, &disappearAt) if err != nil { return nil, err } if disappearAt.Valid { d.DisappearAt = time.Unix(0, disappearAt.Int64) } + d.Timestamp = time.Unix(0, timestamp) return d, nil } func (d *DisappearingMessage) sqlVariables() []any { - return []any{d.BridgeID, d.RoomID, d.EventID, d.Type, d.Timer, dbutil.ConvertedPtr(d.DisappearAt, time.Time.UnixNano)} + return []any{d.BridgeID, d.RoomID, d.EventID, d.Timestamp.UnixNano(), d.Type, d.Timer, dbutil.ConvertedPtr(d.DisappearAt, time.Time.UnixNano)} } diff --git a/bridgev2/database/upgrades/00-latest.sql b/bridgev2/database/upgrades/00-latest.sql index 4eea05bb..a8bb5c64 100644 --- a/bridgev2/database/upgrades/00-latest.sql +++ b/bridgev2/database/upgrades/00-latest.sql @@ -1,4 +1,4 @@ --- v0 -> v22 (compatible with v9+): Latest revision +-- v0 -> v23 (compatible with v9+): Latest revision CREATE TABLE "user" ( bridge_id TEXT NOT NULL, mxid TEXT NOT NULL, @@ -127,6 +127,7 @@ CREATE TABLE disappearing_message ( bridge_id TEXT NOT NULL, mx_room TEXT NOT NULL, mxid TEXT NOT NULL, + timestamp BIGINT NOT NULL DEFAULT 0, type TEXT NOT NULL, timer BIGINT NOT NULL, disappear_at BIGINT, diff --git a/bridgev2/database/upgrades/23-disappearing-timer-ts.sql b/bridgev2/database/upgrades/23-disappearing-timer-ts.sql new file mode 100644 index 00000000..ecd00b8d --- /dev/null +++ b/bridgev2/database/upgrades/23-disappearing-timer-ts.sql @@ -0,0 +1,2 @@ +-- v23 (compatible with v9+): Add event timestamp for disappearing messages +ALTER TABLE disappearing_message ADD COLUMN timestamp BIGINT NOT NULL DEFAULT 0; diff --git a/bridgev2/disappear.go b/bridgev2/disappear.go index f072c01f..b5c37e8f 100644 --- a/bridgev2/disappear.go +++ b/bridgev2/disappear.go @@ -86,8 +86,8 @@ func (dl *DisappearLoop) Stop() { } } -func (dl *DisappearLoop) StartAll(ctx context.Context, roomID id.RoomID) { - startedMessages, err := dl.br.DB.DisappearingMessage.StartAll(ctx, roomID) +func (dl *DisappearLoop) StartAllBefore(ctx context.Context, roomID id.RoomID, beforeTS time.Time) { + startedMessages, err := dl.br.DB.DisappearingMessage.StartAllBefore(ctx, roomID, beforeTS) if err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to start disappearing messages") return diff --git a/bridgev2/portal.go b/bridgev2/portal.go index 8fd29bb3..e87ce9d5 100644 --- a/bridgev2/portal.go +++ b/bridgev2/portal.go @@ -845,7 +845,7 @@ func (portal *Portal) callReadReceiptHandler( if err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to save user portal metadata") } - portal.Bridge.DisappearLoop.StartAll(ctx, portal.MXID) + portal.Bridge.DisappearLoop.StartAllBefore(ctx, portal.MXID, evt.ReadUpTo) } func (portal *Portal) handleMatrixTyping(ctx context.Context, evt *event.Event) EventHandlingResult { @@ -1193,6 +1193,7 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *UserLogin go portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{ RoomID: portal.MXID, EventID: message.MXID, + Timestamp: message.Timestamp, DisappearingSetting: portal.Disappear.StartingAt(message.Timestamp), }) } @@ -2588,6 +2589,7 @@ func (portal *Portal) sendConvertedMessage( portal.Bridge.DisappearLoop.Add(ctx, &database.DisappearingMessage{ RoomID: portal.MXID, EventID: dbMessage.MXID, + Timestamp: dbMessage.Timestamp, DisappearingSetting: converted.Disappear, }) } @@ -3374,11 +3376,15 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL return evt.Int64("target_stream_order", targetStreamOrder) } err = soIntent.MarkStreamOrderRead(ctx, portal.MXID, targetStreamOrder, getEventTS(evt)) + if readUpTo.IsZero() { + readUpTo = getEventTS(evt) + } } else { addTargetLog = func(evt *zerolog.Event) *zerolog.Event { return evt.Stringer("target_mxid", lastTarget.MXID) } err = intent.MarkRead(ctx, portal.MXID, lastTarget.MXID, getEventTS(evt)) + readUpTo = lastTarget.Timestamp } if err != nil { addTargetLog(log.Err(err)).Msg("Failed to bridge read receipt") @@ -3387,7 +3393,7 @@ func (portal *Portal) handleRemoteReadReceipt(ctx context.Context, source *UserL addTargetLog(log.Debug()).Msg("Bridged read receipt") } if sender.IsFromMe { - portal.Bridge.DisappearLoop.StartAll(ctx, portal.MXID) + portal.Bridge.DisappearLoop.StartAllBefore(ctx, portal.MXID, readUpTo) } return EventHandlingResultSuccess } diff --git a/bridgev2/portalbackfill.go b/bridgev2/portalbackfill.go index f7819968..cbbce596 100644 --- a/bridgev2/portalbackfill.go +++ b/bridgev2/portalbackfill.go @@ -387,6 +387,7 @@ func (portal *Portal) compileBatchMessage(ctx context.Context, source *UserLogin out.Disappear = append(out.Disappear, &database.DisappearingMessage{ RoomID: portal.MXID, EventID: evtID, + Timestamp: msg.Timestamp, DisappearingSetting: msg.Disappear, }) }