bridgev2/database: only allow one chunked portal deletion at a time

This commit is contained in:
Tulir Asokan 2025-12-16 18:57:39 +02:00
commit b44f81d114

View file

@ -13,6 +13,7 @@ import (
"encoding/base64"
"fmt"
"strings"
"sync"
"time"
"github.com/rs/zerolog"
@ -26,6 +27,7 @@ type MessageQuery struct {
BridgeID networkid.BridgeID
MetaType MetaTypeCreator
*dbutil.QueryHelper[*Message]
chunkDeleteLock sync.Mutex
}
type Message struct {
@ -205,6 +207,16 @@ func (mq *MessageQuery) DeleteInChunks(ctx context.Context, portal networkid.Por
if mq.GetDB().Dialect != dbutil.SQLite {
return nil
}
log := zerolog.Ctx(ctx).With().
Str("action", "delete messages in chunks").
Stringer("portal_key", portal).
Logger()
if !mq.chunkDeleteLock.TryLock() {
log.Warn().Msg("Portal deletion lock is being held, waiting...")
mq.chunkDeleteLock.Lock()
log.Debug().Msg("Acquired portal deletion lock after waiting")
}
defer mq.chunkDeleteLock.Unlock()
total, err := mq.CountMessagesInPortal(ctx, portal)
if err != nil {
return fmt.Errorf("failed to count messages in portal: %w", err)
@ -215,7 +227,7 @@ func (mq *MessageQuery) DeleteInChunks(ctx context.Context, portal networkid.Por
if err != nil {
return fmt.Errorf("failed to get max row ID: %w", err)
}
zerolog.Ctx(ctx).Debug().
log.Debug().
Int("total_count", total).
Int64("global_max_row_id", globalMaxRowID).
Msg("Portal has lots of messages, deleting in chunks to avoid database locks")
@ -234,7 +246,7 @@ func (mq *MessageQuery) DeleteInChunks(ctx context.Context, portal networkid.Por
total -= int(count)
maxRowID += deleteChunkSize
sleepTime := max(10*time.Millisecond, min(250*time.Millisecond, time.Duration(count/100)*time.Millisecond))
zerolog.Ctx(ctx).Debug().
log.Debug().
Int64("max_row_id", maxRowID).
Int64("deleted_count", count).
Int("remaining_count", total).
@ -247,7 +259,7 @@ func (mq *MessageQuery) DeleteInChunks(ctx context.Context, portal networkid.Por
return ctx.Err()
}
}
zerolog.Ctx(ctx).Debug().
log.Debug().
Int("remaining_count", total).
Dur("db_time_used", dbTimeUsed).
Dur("total_duration", time.Since(globalStart)).