mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
bridgev2: fix issues in event loop debug logger
This commit is contained in:
parent
e750881a4a
commit
059d9a36e5
2 changed files with 68 additions and 58 deletions
|
|
@ -38,8 +38,9 @@ type portalMatrixEvent struct {
|
|||
}
|
||||
|
||||
type portalRemoteEvent struct {
|
||||
evt RemoteEvent
|
||||
source *UserLogin
|
||||
evt RemoteEvent
|
||||
source *UserLogin
|
||||
evtType RemoteEventType
|
||||
}
|
||||
|
||||
type portalCreateEvent struct {
|
||||
|
|
@ -275,24 +276,31 @@ func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) {
|
|||
}
|
||||
|
||||
func (portal *Portal) eventLoop() {
|
||||
i := 0
|
||||
for rawEvt := range portal.events {
|
||||
portal.handleSingleEventAsync(rawEvt)
|
||||
i++
|
||||
portal.handleSingleEventAsync(i, rawEvt)
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) handleSingleEventAsync(rawEvt any) {
|
||||
log := portal.Log.With().Logger()
|
||||
func (portal *Portal) handleSingleEventAsync(idx int, rawEvt any) {
|
||||
ctx := portal.getEventCtxWithLog(rawEvt, idx)
|
||||
if _, isCreate := rawEvt.(*portalCreateEvent); isCreate {
|
||||
portal.handleSingleEvent(&log, rawEvt, func() {})
|
||||
portal.handleSingleEvent(ctx, rawEvt, func() {})
|
||||
} else if portal.Bridge.Config.AsyncEvents {
|
||||
go portal.handleSingleEvent(&log, rawEvt, func() {})
|
||||
go portal.handleSingleEvent(ctx, rawEvt, func() {})
|
||||
} else {
|
||||
log := zerolog.Ctx(ctx)
|
||||
doneCh := make(chan struct{})
|
||||
var backgrounded atomic.Bool
|
||||
go portal.handleSingleEvent(&log, rawEvt, func() {
|
||||
start := time.Now()
|
||||
var handleDuration time.Duration
|
||||
go portal.handleSingleEvent(ctx, rawEvt, func() {
|
||||
handleDuration = time.Since(start)
|
||||
close(doneCh)
|
||||
if backgrounded.Load() {
|
||||
log.Debug().Msg("Event that took too long finally finished handling")
|
||||
log.Debug().Stringer("duration", handleDuration).
|
||||
Msg("Event that took too long finally finished handling")
|
||||
}
|
||||
})
|
||||
tick := time.NewTicker(30 * time.Second)
|
||||
|
|
@ -301,7 +309,8 @@ func (portal *Portal) handleSingleEventAsync(rawEvt any) {
|
|||
select {
|
||||
case <-doneCh:
|
||||
if i > 0 {
|
||||
log.Debug().Msg("Event that took long finished handling")
|
||||
log.Debug().Stringer("duration", handleDuration).
|
||||
Msg("Event that took long finished handling")
|
||||
}
|
||||
return
|
||||
case <-tick.C:
|
||||
|
|
@ -313,8 +322,34 @@ func (portal *Portal) handleSingleEventAsync(rawEvt any) {
|
|||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) handleSingleEvent(log *zerolog.Logger, rawEvt any, doneCallback func()) {
|
||||
ctx := log.WithContext(context.Background())
|
||||
func (portal *Portal) getEventCtxWithLog(rawEvt any, idx int) context.Context {
|
||||
var logWith zerolog.Context
|
||||
switch evt := rawEvt.(type) {
|
||||
case *portalMatrixEvent:
|
||||
logWith = portal.Log.With().Int("event_loop_index", idx).
|
||||
Str("action", "handle matrix event").
|
||||
Stringer("event_id", evt.evt.ID).
|
||||
Str("event_type", evt.evt.Type.Type)
|
||||
if evt.evt.Mautrix.EventSource&event.SourceEphemeral == 0 {
|
||||
logWith = logWith.
|
||||
Stringer("event_id", evt.evt.ID).
|
||||
Stringer("sender", evt.sender.MXID)
|
||||
}
|
||||
case *portalRemoteEvent:
|
||||
evt.evtType = evt.evt.GetType()
|
||||
logWith = portal.Log.With().Int("event_loop_index", idx).
|
||||
Str("action", "handle remote event").
|
||||
Str("source_id", string(evt.source.ID)).
|
||||
Stringer("bridge_evt_type", evt.evtType)
|
||||
logWith = evt.evt.AddLogContext(logWith)
|
||||
case *portalCreateEvent:
|
||||
return evt.ctx
|
||||
}
|
||||
return logWith.Logger().WithContext(context.Background())
|
||||
}
|
||||
|
||||
func (portal *Portal) handleSingleEvent(ctx context.Context, rawEvt any, doneCallback func()) {
|
||||
log := zerolog.Ctx(ctx)
|
||||
defer func() {
|
||||
doneCallback()
|
||||
if err := recover(); err != nil {
|
||||
|
|
@ -339,20 +374,10 @@ func (portal *Portal) handleSingleEvent(log *zerolog.Logger, rawEvt any, doneCal
|
|||
}()
|
||||
switch evt := rawEvt.(type) {
|
||||
case *portalMatrixEvent:
|
||||
log.UpdateContext(func(c zerolog.Context) zerolog.Context {
|
||||
return c.Str("action", "handle matrix event").
|
||||
Stringer("event_id", evt.evt.ID).
|
||||
Str("event_type", evt.evt.Type.Type)
|
||||
})
|
||||
portal.handleMatrixEvent(ctx, evt.sender, evt.evt)
|
||||
case *portalRemoteEvent:
|
||||
log.UpdateContext(func(c zerolog.Context) zerolog.Context {
|
||||
return c.Str("action", "handle remote event").
|
||||
Str("source_id", string(evt.source.ID))
|
||||
})
|
||||
portal.handleRemoteEvent(ctx, evt.source, evt.evt)
|
||||
portal.handleRemoteEvent(ctx, evt.source, evt.evtType, evt.evt)
|
||||
case *portalCreateEvent:
|
||||
*log = *zerolog.Ctx(evt.ctx)
|
||||
evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil))
|
||||
default:
|
||||
panic(fmt.Errorf("illegal type %T in eventLoop", evt))
|
||||
|
|
@ -457,11 +482,6 @@ func (portal *Portal) handleMatrixEvent(ctx context.Context, sender *User, evt *
|
|||
}
|
||||
return
|
||||
}
|
||||
log.UpdateContext(func(c zerolog.Context) zerolog.Context {
|
||||
return c.
|
||||
Stringer("event_id", evt.ID).
|
||||
Stringer("sender", sender.MXID)
|
||||
})
|
||||
login, _, err := portal.FindPreferredLogin(ctx, sender, true)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get user login to handle Matrix event")
|
||||
|
|
@ -497,9 +517,8 @@ func (portal *Portal) handleMatrixEvent(ctx context.Context, sender *User, evt *
|
|||
}
|
||||
origSender.FormattedName = portal.Bridge.Config.Relay.FormatName(origSender)
|
||||
}
|
||||
log.UpdateContext(func(c zerolog.Context) zerolog.Context {
|
||||
return c.Str("login_id", string(login.ID))
|
||||
})
|
||||
// Copy logger because many of the handlers will use UpdateContext
|
||||
ctx = log.With().Str("login_id", string(login.ID)).Logger().WithContext(ctx)
|
||||
switch evt.Type {
|
||||
case event.EventMessage, event.EventSticker:
|
||||
portal.handleMatrixMessage(ctx, login, origSender, evt)
|
||||
|
|
@ -1490,26 +1509,8 @@ func (portal *Portal) handleMatrixRedaction(ctx context.Context, sender *UserLog
|
|||
portal.sendSuccessStatus(ctx, evt)
|
||||
}
|
||||
|
||||
func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evt RemoteEvent) {
|
||||
func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) {
|
||||
log := zerolog.Ctx(ctx)
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logEvt := log.Error()
|
||||
if realErr, ok := err.(error); ok {
|
||||
logEvt = logEvt.Err(realErr)
|
||||
} else {
|
||||
logEvt = logEvt.Any(zerolog.ErrorFieldName, err)
|
||||
}
|
||||
logEvt.
|
||||
Bytes("stack", debug.Stack()).
|
||||
Msg("Remote event handler panicked")
|
||||
}
|
||||
}()
|
||||
evtType := evt.GetType()
|
||||
log.UpdateContext(func(c zerolog.Context) zerolog.Context {
|
||||
c = c.Stringer("bridge_evt_type", evtType)
|
||||
return evt.AddLogContext(c)
|
||||
})
|
||||
if portal.MXID == "" {
|
||||
mcp, ok := evt.(RemoteEventThatMayCreatePortal)
|
||||
if !ok || !mcp.ShouldCreatePortal() {
|
||||
|
|
@ -1851,8 +1852,13 @@ func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin,
|
|||
}
|
||||
if len(res.SubEvents) > 0 {
|
||||
for _, subEvt := range res.SubEvents {
|
||||
log := portal.Log.With().Str("source_id", string(source.ID)).Str("action", "handle remote subevent").Logger()
|
||||
portal.handleRemoteEvent(log.WithContext(ctx), source, subEvt)
|
||||
subType := subEvt.GetType()
|
||||
log := portal.Log.With().
|
||||
Str("source_id", string(source.ID)).
|
||||
Str("action", "handle remote subevent").
|
||||
Stringer("bridge_evt_type", subType).
|
||||
Logger()
|
||||
portal.handleRemoteEvent(log.WithContext(ctx), source, subType, subEvt)
|
||||
}
|
||||
}
|
||||
return res.ContinueMessageHandling
|
||||
|
|
|
|||
|
|
@ -37,12 +37,16 @@ func (portal *PortalInternals) EventLoop() {
|
|||
(*Portal)(portal).eventLoop()
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleSingleEventAsync(rawEvt any) {
|
||||
(*Portal)(portal).handleSingleEventAsync(rawEvt)
|
||||
func (portal *PortalInternals) HandleSingleEventAsync(idx int, rawEvt any) {
|
||||
(*Portal)(portal).handleSingleEventAsync(idx, rawEvt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleSingleEvent(log *zerolog.Logger, rawEvt any, doneCallback func()) {
|
||||
(*Portal)(portal).handleSingleEvent(log, rawEvt, doneCallback)
|
||||
func (portal *PortalInternals) GetEventCtxWithLog(rawEvt any, idx int) context.Context {
|
||||
return (*Portal)(portal).getEventCtxWithLog(rawEvt, idx)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleSingleEvent(ctx context.Context, rawEvt any, doneCallback func()) {
|
||||
(*Portal)(portal).handleSingleEvent(ctx, rawEvt, doneCallback)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) SendSuccessStatus(ctx context.Context, evt *event.Event) {
|
||||
|
|
@ -113,8 +117,8 @@ func (portal *PortalInternals) HandleMatrixRedaction(ctx context.Context, sender
|
|||
(*Portal)(portal).handleMatrixRedaction(ctx, sender, origSender, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evt RemoteEvent) {
|
||||
(*Portal)(portal).handleRemoteEvent(ctx, source, evt)
|
||||
func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evtType RemoteEventType, evt RemoteEvent) {
|
||||
(*Portal)(portal).handleRemoteEvent(ctx, source, evtType, evt)
|
||||
}
|
||||
|
||||
func (portal *PortalInternals) GetIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue