diff --git a/appservice/http.go b/appservice/http.go index a478d3ec..15c99345 100644 --- a/appservice/http.go +++ b/appservice/http.go @@ -198,10 +198,6 @@ func (as *AppService) handleEvents(evts []*event.Event, defaultTypeClass event.T as.Log.Debugfln("Failed to parse content of %s (type %s): %v", evt.ID, evt.Type.Type, err) } - if _, ok := CheckpointTypes[evt.Type]; ok { - go as.SendMessageSendCheckpoint(evt, StepBridge, 0) - } - if evt.Type.IsState() { // TODO remove this check after https://github.com/matrix-org/synapse/pull/11265 historical, ok := evt.Content.Raw["org.matrix.msc2716.historical"].(bool) diff --git a/bridge/matrix.go b/bridge/matrix.go index b7cfd2ff..f8c1c7fd 100644 --- a/bridge/matrix.go +++ b/bridge/matrix.go @@ -48,6 +48,9 @@ func NewMatrixHandler(br *Bridge) *MatrixHandler { TrackEventDuration: noopTrack, } + for evtType := range CheckpointTypes { + br.EventProcessor.On(evtType, handler.sendBridgeCheckpoint) + } br.EventProcessor.On(event.EventMessage, handler.HandleMessage) br.EventProcessor.On(event.EventEncrypted, handler.HandleEncrypted) br.EventProcessor.On(event.EventSticker, handler.HandleMessage) @@ -63,6 +66,10 @@ func NewMatrixHandler(br *Bridge) *MatrixHandler { return handler } +func (mx *MatrixHandler) sendBridgeCheckpoint(evt *event.Event) { + go mx.bridge.SendMessageSuccessCheckpoint(evt, StepBridge, 0) +} + func (mx *MatrixHandler) HandleEncryption(evt *event.Event) { defer mx.TrackEventDuration(evt.Type)() if evt.Content.AsEncryption().Algorithm != id.AlgorithmMegolmV1 { @@ -326,26 +333,26 @@ func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) { if errors.Is(err, NoSessionFound) { content := evt.Content.AsEncrypted() mx.log.Debugfln("Couldn't find session %s trying to decrypt %s, waiting %d seconds...", content.SessionID, evt.ID, int(sessionWaitTimeout.Seconds())) - mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, false, decryptionRetryCount) + mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, err, false, decryptionRetryCount) decryptionRetryCount++ if mx.bridge.Crypto.WaitForSession(evt.RoomID, content.SenderKey, content.SessionID, sessionWaitTimeout) { mx.log.Debugfln("Got session %s after waiting, trying to decrypt %s again", content.SessionID, evt.ID) decrypted, err = mx.bridge.Crypto.Decrypt(evt) } else { - mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, fmt.Errorf("didn't receive encryption keys"), false, decryptionRetryCount) + mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, fmt.Errorf("didn't receive encryption keys"), false, decryptionRetryCount) go mx.waitLongerForSession(evt) return } } if err != nil { - mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, true, decryptionRetryCount) + mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, err, true, decryptionRetryCount) mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err) _, _ = mx.bridge.Bot.SendNotice(evt.RoomID, fmt.Sprintf( "\u26a0 Your message was not bridged: %v", err)) return } - mx.as.SendMessageSendCheckpoint(decrypted, appservice.StepDecrypted, decryptionRetryCount) + mx.bridge.SendMessageSuccessCheckpoint(decrypted, StepDecrypted, decryptionRetryCount) mx.bridge.EventProcessor.Dispatch(decrypted) } @@ -371,17 +378,17 @@ func (mx *MatrixHandler) waitLongerForSession(evt *event.Event) { mx.log.Debugfln("Got session %s after waiting more, trying to decrypt %s again", content.SessionID, evt.ID) decrypted, err := mx.bridge.Crypto.Decrypt(evt) if err == nil { - mx.as.SendMessageSendCheckpoint(decrypted, appservice.StepDecrypted, 2) + mx.bridge.SendMessageSuccessCheckpoint(decrypted, StepDecrypted, 2) mx.bridge.EventProcessor.Dispatch(decrypted) _, _ = mx.bridge.Bot.RedactEvent(evt.RoomID, resp.EventID) return } mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err) - mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, true, 2) + mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, err, true, 2) update.Body = fmt.Sprintf("\u26a0 Your message was not bridged: %v", err) } else { mx.log.Debugfln("Didn't get %s, giving up on %s", content.SessionID, evt.ID) - mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, fmt.Errorf("didn't receive encryption keys"), true, 2) + mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, fmt.Errorf("didn't receive encryption keys"), true, 2) update.Body = "\u26a0 Your message was not bridged: the bridge hasn't received the decryption keys. " + "If this error keeps happening, try restarting your client." } diff --git a/appservice/message_send_checkpoint.go b/bridge/message_send_checkpoint.go similarity index 82% rename from appservice/message_send_checkpoint.go rename to bridge/message_send_checkpoint.go index c1ef5ba2..83771396 100644 --- a/appservice/message_send_checkpoint.go +++ b/bridge/message_send_checkpoint.go @@ -4,7 +4,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -package appservice +package bridge import ( "bytes" @@ -16,6 +16,7 @@ import ( "time" "maunium.net/go/mautrix" + "maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" ) @@ -62,18 +63,18 @@ type MessageSendCheckpoint struct { } var CheckpointTypes = map[event.Type]struct{}{ - event.EventRedaction: {}, - event.EventMessage: {}, - event.EventEncrypted: {}, - event.EventSticker: {}, - event.EventReaction: {}, - event.CallInvite: {}, - event.CallCandidates: {}, - event.CallSelectAnswer: {}, - event.CallAnswer: {}, - event.CallHangup: {}, - event.CallReject: {}, - event.CallNegotiate: {}, + event.EventRedaction: {}, + event.EventMessage: {}, + event.EventEncrypted: {}, + event.EventSticker: {}, + event.EventReaction: {}, + //event.CallInvite: {}, + //event.CallCandidates: {}, + //event.CallSelectAnswer: {}, + //event.CallAnswer: {}, + //event.CallHangup: {}, + //event.CallReject: {}, + //event.CallNegotiate: {}, } func NewMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, status MessageSendCheckpointStatus, retryNum int) *MessageSendCheckpoint { @@ -93,22 +94,22 @@ func NewMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, return &checkpoint } -func (as *AppService) SendMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, retryNum int) { +func (br *Bridge) SendMessageSuccessCheckpoint(evt *event.Event, step MessageSendCheckpointStep, retryNum int) { checkpoint := NewMessageSendCheckpoint(evt, step, StatusSuccesss, retryNum) - go checkpoint.Send(as) + go checkpoint.Send(br.AS) } -func (as *AppService) SendErrorMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, err error, permanent bool, retryNum int) { +func (br *Bridge) SendMessageErrorCheckpoint(evt *event.Event, step MessageSendCheckpointStep, err error, permanent bool, retryNum int) { status := StatusWillRetry if permanent { status = StatusPermFailure } checkpoint := NewMessageSendCheckpoint(evt, step, status, retryNum) checkpoint.Info = err.Error() - go checkpoint.Send(as) + go checkpoint.Send(br.AS) } -func (cp *MessageSendCheckpoint) Send(as *AppService) { +func (cp *MessageSendCheckpoint) Send(as *appservice.AppService) { err := SendCheckpoints(as, []*MessageSendCheckpoint{cp}) if err != nil { as.Log.Warnfln("Error sending checkpoint %s/%s for %s: %v", cp.Step, cp.Status, cp.EventID, err) @@ -119,11 +120,11 @@ type CheckpointsJSON struct { Checkpoints []*MessageSendCheckpoint `json:"checkpoints"` } -func SendCheckpoints(as *AppService, checkpoints []*MessageSendCheckpoint) error { +func SendCheckpoints(as *appservice.AppService, checkpoints []*MessageSendCheckpoint) error { checkpointsJSON := CheckpointsJSON{Checkpoints: checkpoints} if as.HasWebsocket() { - return as.SendWebsocket(&WebsocketRequest{ + return as.SendWebsocket(&appservice.WebsocketRequest{ Command: "message_checkpoint", Data: checkpointsJSON, })