mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
153 lines
4.6 KiB
Go
153 lines
4.6 KiB
Go
// Copyright (c) 2024 Tulir Asokan
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package bridgev2
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog"
|
|
"golang.org/x/exp/slices"
|
|
|
|
"maunium.net/go/mautrix/bridgev2/database"
|
|
"maunium.net/go/mautrix/event"
|
|
"maunium.net/go/mautrix/id"
|
|
)
|
|
|
|
type DisappearLoop struct {
|
|
br *Bridge
|
|
nextCheck atomic.Pointer[time.Time]
|
|
stop atomic.Pointer[context.CancelFunc]
|
|
}
|
|
|
|
const DisappearCheckInterval = 1 * time.Hour
|
|
|
|
func (dl *DisappearLoop) Start() {
|
|
log := dl.br.Log.With().Str("component", "disappear loop").Logger()
|
|
ctx, stop := context.WithCancel(log.WithContext(context.Background()))
|
|
if oldStop := dl.stop.Swap(&stop); oldStop != nil {
|
|
(*oldStop)()
|
|
}
|
|
log.Debug().Msg("Disappearing message loop starting")
|
|
for {
|
|
nextCheck := time.Now().Add(DisappearCheckInterval)
|
|
dl.nextCheck.Store(&nextCheck)
|
|
const MessageLimit = 200
|
|
messages, err := dl.br.DB.DisappearingMessage.GetUpcoming(ctx, DisappearCheckInterval, MessageLimit)
|
|
if err != nil {
|
|
log.Err(err).Msg("Failed to get upcoming disappearing messages")
|
|
} else if len(messages) > 0 {
|
|
if len(messages) >= MessageLimit {
|
|
lastDisappearTime := messages[len(messages)-1].DisappearAt
|
|
log.Debug().
|
|
Int("message_count", len(messages)).
|
|
Time("last_due", lastDisappearTime).
|
|
Msg("Deleting disappearing messages synchronously and checking again immediately")
|
|
// Store the expected next check time to avoid Add spawning unnecessary goroutines.
|
|
// This can be in the past, in which case Add will put everything in the db, which is also fine.
|
|
dl.nextCheck.Store(&lastDisappearTime)
|
|
// If there are many messages, process them synchronously and then check again.
|
|
dl.sleepAndDisappear(ctx, messages...)
|
|
continue
|
|
}
|
|
go dl.sleepAndDisappear(ctx, messages...)
|
|
}
|
|
select {
|
|
case <-time.After(time.Until(dl.GetNextCheck())):
|
|
case <-ctx.Done():
|
|
log.Debug().Msg("Disappearing message loop stopping")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (dl *DisappearLoop) GetNextCheck() time.Time {
|
|
if dl == nil {
|
|
return time.Time{}
|
|
}
|
|
nextCheck := dl.nextCheck.Load()
|
|
if nextCheck == nil {
|
|
return time.Time{}
|
|
}
|
|
return *nextCheck
|
|
}
|
|
|
|
func (dl *DisappearLoop) Stop() {
|
|
if dl == nil {
|
|
return
|
|
}
|
|
if stop := dl.stop.Load(); stop != nil {
|
|
(*stop)()
|
|
}
|
|
}
|
|
|
|
func (dl *DisappearLoop) StartAllBefore(ctx context.Context, roomID id.RoomID, beforeTS time.Time) {
|
|
startedMessages, err := dl.br.DB.DisappearingMessage.StartAllBefore(ctx, roomID, beforeTS)
|
|
if err != nil {
|
|
zerolog.Ctx(ctx).Err(err).Msg("Failed to start disappearing messages")
|
|
return
|
|
}
|
|
startedMessages = slices.DeleteFunc(startedMessages, func(dm *database.DisappearingMessage) bool {
|
|
return dm.DisappearAt.After(dl.GetNextCheck())
|
|
})
|
|
slices.SortFunc(startedMessages, func(a, b *database.DisappearingMessage) int {
|
|
return a.DisappearAt.Compare(b.DisappearAt)
|
|
})
|
|
if len(startedMessages) > 0 {
|
|
go dl.sleepAndDisappear(ctx, startedMessages...)
|
|
}
|
|
}
|
|
|
|
func (dl *DisappearLoop) Add(ctx context.Context, dm *database.DisappearingMessage) {
|
|
err := dl.br.DB.DisappearingMessage.Put(ctx, dm)
|
|
if err != nil {
|
|
zerolog.Ctx(ctx).Err(err).
|
|
Stringer("event_id", dm.EventID).
|
|
Msg("Failed to save disappearing message")
|
|
}
|
|
if !dm.DisappearAt.IsZero() && dm.DisappearAt.Before(dl.GetNextCheck()) {
|
|
go dl.sleepAndDisappear(zerolog.Ctx(ctx).WithContext(dl.br.BackgroundCtx), dm)
|
|
}
|
|
}
|
|
|
|
func (dl *DisappearLoop) sleepAndDisappear(ctx context.Context, dms ...*database.DisappearingMessage) {
|
|
for _, msg := range dms {
|
|
timeUntilDisappear := time.Until(msg.DisappearAt)
|
|
if timeUntilDisappear <= 0 {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
} else {
|
|
select {
|
|
case <-time.After(timeUntilDisappear):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
resp, err := dl.br.Bot.SendMessage(ctx, msg.RoomID, event.EventRedaction, &event.Content{
|
|
Parsed: &event.RedactionEventContent{
|
|
Redacts: msg.EventID,
|
|
Reason: "Message disappeared",
|
|
},
|
|
}, nil)
|
|
if err != nil {
|
|
zerolog.Ctx(ctx).Err(err).Stringer("target_event_id", msg.EventID).Msg("Failed to disappear message")
|
|
} else {
|
|
zerolog.Ctx(ctx).Debug().
|
|
Stringer("target_event_id", msg.EventID).
|
|
Stringer("redaction_event_id", resp.EventID).
|
|
Msg("Disappeared message")
|
|
}
|
|
err = dl.br.DB.DisappearingMessage.Delete(ctx, msg.EventID)
|
|
if err != nil {
|
|
zerolog.Ctx(ctx).Err(err).
|
|
Stringer("event_id", msg.EventID).
|
|
Msg("Failed to delete disappearing message entry from database")
|
|
}
|
|
}
|
|
}
|