diff --git a/bridgev2/errors.go b/bridgev2/errors.go index a6cf4ceb..c39f8707 100644 --- a/bridgev2/errors.go +++ b/bridgev2/errors.go @@ -38,6 +38,8 @@ var ErrNotLoggedIn = errors.New("not logged in") // but direct media is not enabled. var ErrDirectMediaNotEnabled = errors.New("direct media is not enabled") +var ErrPortalIsDeleted = errors.New("portal is deleted") + // Common message status errors var ( ErrPanicInEventHandler error = WrapErrorInStatus(errors.New("panic in event handler")).WithSendNotice(true).WithErrorAsMessage() diff --git a/bridgev2/portal.go b/bridgev2/portal.go index 7ca3ffab..273b1fd3 100644 --- a/bridgev2/portal.go +++ b/bridgev2/portal.go @@ -93,7 +93,7 @@ type Portal struct { functionalMembersCache *event.ElementFunctionalMembersContent events chan portalEvent - deleted bool + deleted *exsync.Event eventsLock sync.Mutex eventIdx int @@ -127,6 +127,7 @@ func (br *Bridge) loadPortal(ctx context.Context, dbPortal *database.Portal, que outgoingMessages: make(map[networkid.TransactionID]*outgoingMessage), RoomCreated: exsync.NewEvent(), + deleted: exsync.NewEvent(), } if portal.MXID != "" { portal.RoomCreated.Set() @@ -335,6 +336,9 @@ func (br *Bridge) GetExistingPortalByKey(ctx context.Context, key networkid.Port } func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) EventHandlingResult { + if portal.deleted.IsSet() { + return EventHandlingResultIgnored + } if PortalEventBuffer == 0 { portal.eventsLock.Lock() defer portal.eventsLock.Unlock() @@ -347,6 +351,8 @@ func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) EventHand select { case portal.events <- evt: return EventHandlingResultQueued + case <-portal.deleted.GetChan(): + return EventHandlingResultIgnored default: zerolog.Ctx(ctx).Error(). Str("portal_id", string(portal.ID)). @@ -371,16 +377,16 @@ func (portal *Portal) eventLoop() { go portal.pendingMessageTimeoutLoop(ctx, cfg) defer cancel() } - i := 0 - for rawEvt := range portal.events { - if portal.deleted { - return - } - i++ - if portal.Bridge.Config.AsyncEvents { - go portal.handleSingleEventWithDelayLogging(i, rawEvt) - } else { - portal.handleSingleEventWithDelayLogging(i, rawEvt) + deleteCh := portal.deleted.GetChan() + for i := 0; ; i++ { + select { + case rawEvt := <-portal.events: + if portal.Bridge.Config.AsyncEvents { + go portal.handleSingleEventWithDelayLogging(i, rawEvt) + } else { + portal.handleSingleEventWithDelayLogging(i, rawEvt) + } + case <-deleteCh: } } } @@ -4902,6 +4908,9 @@ func (portal *Portal) CreateMatrixRoom(ctx context.Context, source *UserLogin, i } return nil } + if portal.deleted.IsSet() { + return ErrPortalIsDeleted + } waiter := make(chan struct{}) closed := false evt := &portalCreateEvent{ @@ -4919,7 +4928,11 @@ func (portal *Portal) CreateMatrixRoom(ctx context.Context, source *UserLogin, i if PortalEventBuffer == 0 { go portal.queueEvent(ctx, evt) } else { - portal.events <- evt + select { + case portal.events <- evt: + case <-portal.deleted.GetChan(): + return ErrPortalIsDeleted + } } select { case <-ctx.Done(): @@ -5245,11 +5258,11 @@ func (portal *Portal) unlockedDeleteCache() { if portal.MXID != "" { delete(portal.Bridge.portalsByMXID, portal.MXID) } + portal.deleted.Set() if portal.events != nil { // TODO there's a small risk of this racing with a queueEvent call close(portal.events) } - portal.deleted = true } func (portal *Portal) Save(ctx context.Context) error {