mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
bridgev2/database: delete messages in chunks if portal has too many
This commit is contained in:
parent
e9b262e671
commit
e38d758a52
2 changed files with 86 additions and 3 deletions
|
|
@ -11,9 +11,11 @@ import (
|
|||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"go.mau.fi/util/dbutil"
|
||||
|
||||
"maunium.net/go/mautrix/bridgev2/networkid"
|
||||
|
|
@ -96,6 +98,10 @@ const (
|
|||
deleteMessagePartByRowIDQuery = `
|
||||
DELETE FROM message WHERE bridge_id=$1 AND rowid=$2
|
||||
`
|
||||
deleteMessageChunkQuery = `
|
||||
DELETE FROM message WHERE bridge_id=$1 AND room_id=$2 AND room_receiver=$3 AND rowid > $4 AND rowid <= $5
|
||||
`
|
||||
getMaxMessageRowIDQuery = `SELECT MAX(rowid) FROM message WHERE bridge_id=$1`
|
||||
)
|
||||
|
||||
func (mq *MessageQuery) GetAllPartsByID(ctx context.Context, receiver networkid.UserLoginID, id networkid.MessageID) ([]*Message, error) {
|
||||
|
|
@ -180,6 +186,75 @@ func (mq *MessageQuery) Delete(ctx context.Context, rowID int64) error {
|
|||
return mq.Exec(ctx, deleteMessagePartByRowIDQuery, mq.BridgeID, rowID)
|
||||
}
|
||||
|
||||
func (mq *MessageQuery) deleteChunk(ctx context.Context, portal networkid.PortalKey, minRowID, maxRowID int64) (int64, error) {
|
||||
res, err := mq.GetDB().Exec(ctx, deleteMessageChunkQuery, mq.BridgeID, portal.ID, portal.Receiver, minRowID, maxRowID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
func (mq *MessageQuery) getMaxRowID(ctx context.Context) (maxRowID int64, err error) {
|
||||
err = mq.GetDB().QueryRow(ctx, getMaxMessageRowIDQuery, mq.BridgeID).Scan(&maxRowID)
|
||||
return
|
||||
}
|
||||
|
||||
const deleteChunkSize = 100_000
|
||||
|
||||
func (mq *MessageQuery) DeleteInChunks(ctx context.Context, portal networkid.PortalKey) error {
|
||||
if mq.GetDB().Dialect != dbutil.SQLite {
|
||||
return nil
|
||||
}
|
||||
total, err := mq.CountMessagesInPortal(ctx, portal)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count messages in portal: %w", err)
|
||||
} else if total < deleteChunkSize {
|
||||
return nil
|
||||
}
|
||||
globalMaxRowID, err := mq.getMaxRowID(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get max row ID: %w", err)
|
||||
}
|
||||
zerolog.Ctx(ctx).Debug().
|
||||
Int("total_count", total).
|
||||
Int64("global_max_row_id", globalMaxRowID).
|
||||
Msg("Portal has lots of messages, deleting in chunks to avoid database locks")
|
||||
maxRowID := int64(deleteChunkSize)
|
||||
globalMaxRowID += deleteChunkSize * 1.2
|
||||
var dbTimeUsed time.Duration
|
||||
globalStart := time.Now()
|
||||
for total > 500 && maxRowID < globalMaxRowID {
|
||||
start := time.Now()
|
||||
count, err := mq.deleteChunk(ctx, portal, maxRowID-deleteChunkSize, maxRowID)
|
||||
duration := time.Since(start)
|
||||
dbTimeUsed += duration
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete chunk of messages before %d: %w", maxRowID, err)
|
||||
}
|
||||
total -= int(count)
|
||||
maxRowID += deleteChunkSize
|
||||
sleepTime := max(10*time.Millisecond, min(250*time.Millisecond, time.Duration(count/100)*time.Millisecond))
|
||||
zerolog.Ctx(ctx).Debug().
|
||||
Int64("max_row_id", maxRowID).
|
||||
Int64("deleted_count", count).
|
||||
Int("remaining_count", total).
|
||||
Dur("duration", duration).
|
||||
Dur("sleep_time", sleepTime).
|
||||
Msg("Deleted chunk of messages")
|
||||
select {
|
||||
case <-time.After(sleepTime):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
zerolog.Ctx(ctx).Debug().
|
||||
Int("remaining_count", total).
|
||||
Dur("db_time_used", dbTimeUsed).
|
||||
Dur("total_duration", time.Since(globalStart)).
|
||||
Msg("Finished chunked delete of messages in portal")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mq *MessageQuery) CountMessagesInPortal(ctx context.Context, key networkid.PortalKey) (count int, err error) {
|
||||
err = mq.GetDB().QueryRow(ctx, countMessagesInPortalQuery, mq.BridgeID, key.ID, key.Receiver).Scan(&count)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -5202,7 +5202,7 @@ func (portal *Portal) Delete(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
portal.removeInPortalCache(ctx)
|
||||
err := portal.Bridge.DB.Portal.Delete(ctx, portal.PortalKey)
|
||||
err := portal.safeDBDelete(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -5212,6 +5212,15 @@ func (portal *Portal) Delete(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (portal *Portal) safeDBDelete(ctx context.Context) error {
|
||||
err := portal.Bridge.DB.Message.DeleteInChunks(ctx, portal.PortalKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete messages in portal: %w", err)
|
||||
}
|
||||
// TODO delete child portals?
|
||||
return portal.Bridge.DB.Portal.Delete(ctx, portal.PortalKey)
|
||||
}
|
||||
|
||||
func (portal *Portal) RemoveMXID(ctx context.Context) error {
|
||||
if portal.MXID == "" {
|
||||
return nil
|
||||
|
|
@ -5250,8 +5259,7 @@ func (portal *Portal) removeInPortalCache(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (portal *Portal) unlockedDelete(ctx context.Context) error {
|
||||
// TODO delete child portals?
|
||||
err := portal.Bridge.DB.Portal.Delete(ctx, portal.PortalKey)
|
||||
err := portal.safeDBDelete(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue