mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
bridgev2/portal: make queueEvent slightly safer when deleting portals
This commit is contained in:
parent
31579be20a
commit
2c62641c73
2 changed files with 28 additions and 13 deletions
|
|
@ -38,6 +38,8 @@ var ErrNotLoggedIn = errors.New("not logged in")
|
|||
// but direct media is not enabled.
|
||||
var ErrDirectMediaNotEnabled = errors.New("direct media is not enabled")
|
||||
|
||||
var ErrPortalIsDeleted = errors.New("portal is deleted")
|
||||
|
||||
// Common message status errors
|
||||
var (
|
||||
ErrPanicInEventHandler error = WrapErrorInStatus(errors.New("panic in event handler")).WithSendNotice(true).WithErrorAsMessage()
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@ type Portal struct {
|
|||
functionalMembersCache *event.ElementFunctionalMembersContent
|
||||
|
||||
events chan portalEvent
|
||||
deleted bool
|
||||
deleted *exsync.Event
|
||||
|
||||
eventsLock sync.Mutex
|
||||
eventIdx int
|
||||
|
|
@ -127,6 +127,7 @@ func (br *Bridge) loadPortal(ctx context.Context, dbPortal *database.Portal, que
|
|||
outgoingMessages: make(map[networkid.TransactionID]*outgoingMessage),
|
||||
|
||||
RoomCreated: exsync.NewEvent(),
|
||||
deleted: exsync.NewEvent(),
|
||||
}
|
||||
if portal.MXID != "" {
|
||||
portal.RoomCreated.Set()
|
||||
|
|
@ -335,6 +336,9 @@ func (br *Bridge) GetExistingPortalByKey(ctx context.Context, key networkid.Port
|
|||
}
|
||||
|
||||
func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) EventHandlingResult {
|
||||
if portal.deleted.IsSet() {
|
||||
return EventHandlingResultIgnored
|
||||
}
|
||||
if PortalEventBuffer == 0 {
|
||||
portal.eventsLock.Lock()
|
||||
defer portal.eventsLock.Unlock()
|
||||
|
|
@ -347,6 +351,8 @@ func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) EventHand
|
|||
select {
|
||||
case portal.events <- evt:
|
||||
return EventHandlingResultQueued
|
||||
case <-portal.deleted.GetChan():
|
||||
return EventHandlingResultIgnored
|
||||
default:
|
||||
zerolog.Ctx(ctx).Error().
|
||||
Str("portal_id", string(portal.ID)).
|
||||
|
|
@ -371,16 +377,16 @@ func (portal *Portal) eventLoop() {
|
|||
go portal.pendingMessageTimeoutLoop(ctx, cfg)
|
||||
defer cancel()
|
||||
}
|
||||
i := 0
|
||||
for rawEvt := range portal.events {
|
||||
if portal.deleted {
|
||||
return
|
||||
}
|
||||
i++
|
||||
if portal.Bridge.Config.AsyncEvents {
|
||||
go portal.handleSingleEventWithDelayLogging(i, rawEvt)
|
||||
} else {
|
||||
portal.handleSingleEventWithDelayLogging(i, rawEvt)
|
||||
deleteCh := portal.deleted.GetChan()
|
||||
for i := 0; ; i++ {
|
||||
select {
|
||||
case rawEvt := <-portal.events:
|
||||
if portal.Bridge.Config.AsyncEvents {
|
||||
go portal.handleSingleEventWithDelayLogging(i, rawEvt)
|
||||
} else {
|
||||
portal.handleSingleEventWithDelayLogging(i, rawEvt)
|
||||
}
|
||||
case <-deleteCh:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4902,6 +4908,9 @@ func (portal *Portal) CreateMatrixRoom(ctx context.Context, source *UserLogin, i
|
|||
}
|
||||
return nil
|
||||
}
|
||||
if portal.deleted.IsSet() {
|
||||
return ErrPortalIsDeleted
|
||||
}
|
||||
waiter := make(chan struct{})
|
||||
closed := false
|
||||
evt := &portalCreateEvent{
|
||||
|
|
@ -4919,7 +4928,11 @@ func (portal *Portal) CreateMatrixRoom(ctx context.Context, source *UserLogin, i
|
|||
if PortalEventBuffer == 0 {
|
||||
go portal.queueEvent(ctx, evt)
|
||||
} else {
|
||||
portal.events <- evt
|
||||
select {
|
||||
case portal.events <- evt:
|
||||
case <-portal.deleted.GetChan():
|
||||
return ErrPortalIsDeleted
|
||||
}
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
@ -5245,11 +5258,11 @@ func (portal *Portal) unlockedDeleteCache() {
|
|||
if portal.MXID != "" {
|
||||
delete(portal.Bridge.portalsByMXID, portal.MXID)
|
||||
}
|
||||
portal.deleted.Set()
|
||||
if portal.events != nil {
|
||||
// TODO there's a small risk of this racing with a queueEvent call
|
||||
close(portal.events)
|
||||
}
|
||||
portal.deleted = true
|
||||
}
|
||||
|
||||
func (portal *Portal) Save(ctx context.Context) error {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue