Move message send checkpoints to bridge module

This commit is contained in:
Tulir Asokan 2022-05-30 14:22:33 +03:00
commit fe1dfbeb4e
3 changed files with 35 additions and 31 deletions

View file

@ -198,10 +198,6 @@ func (as *AppService) handleEvents(evts []*event.Event, defaultTypeClass event.T
as.Log.Debugfln("Failed to parse content of %s (type %s): %v", evt.ID, evt.Type.Type, err)
}
if _, ok := CheckpointTypes[evt.Type]; ok {
go as.SendMessageSendCheckpoint(evt, StepBridge, 0)
}
if evt.Type.IsState() {
// TODO remove this check after https://github.com/matrix-org/synapse/pull/11265
historical, ok := evt.Content.Raw["org.matrix.msc2716.historical"].(bool)

View file

@ -48,6 +48,9 @@ func NewMatrixHandler(br *Bridge) *MatrixHandler {
TrackEventDuration: noopTrack,
}
for evtType := range CheckpointTypes {
br.EventProcessor.On(evtType, handler.sendBridgeCheckpoint)
}
br.EventProcessor.On(event.EventMessage, handler.HandleMessage)
br.EventProcessor.On(event.EventEncrypted, handler.HandleEncrypted)
br.EventProcessor.On(event.EventSticker, handler.HandleMessage)
@ -63,6 +66,10 @@ func NewMatrixHandler(br *Bridge) *MatrixHandler {
return handler
}
func (mx *MatrixHandler) sendBridgeCheckpoint(evt *event.Event) {
go mx.bridge.SendMessageSuccessCheckpoint(evt, StepBridge, 0)
}
func (mx *MatrixHandler) HandleEncryption(evt *event.Event) {
defer mx.TrackEventDuration(evt.Type)()
if evt.Content.AsEncryption().Algorithm != id.AlgorithmMegolmV1 {
@ -326,26 +333,26 @@ func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) {
if errors.Is(err, NoSessionFound) {
content := evt.Content.AsEncrypted()
mx.log.Debugfln("Couldn't find session %s trying to decrypt %s, waiting %d seconds...", content.SessionID, evt.ID, int(sessionWaitTimeout.Seconds()))
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, false, decryptionRetryCount)
mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, err, false, decryptionRetryCount)
decryptionRetryCount++
if mx.bridge.Crypto.WaitForSession(evt.RoomID, content.SenderKey, content.SessionID, sessionWaitTimeout) {
mx.log.Debugfln("Got session %s after waiting, trying to decrypt %s again", content.SessionID, evt.ID)
decrypted, err = mx.bridge.Crypto.Decrypt(evt)
} else {
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, fmt.Errorf("didn't receive encryption keys"), false, decryptionRetryCount)
mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, fmt.Errorf("didn't receive encryption keys"), false, decryptionRetryCount)
go mx.waitLongerForSession(evt)
return
}
}
if err != nil {
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, true, decryptionRetryCount)
mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, err, true, decryptionRetryCount)
mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err)
_, _ = mx.bridge.Bot.SendNotice(evt.RoomID, fmt.Sprintf(
"\u26a0 Your message was not bridged: %v", err))
return
}
mx.as.SendMessageSendCheckpoint(decrypted, appservice.StepDecrypted, decryptionRetryCount)
mx.bridge.SendMessageSuccessCheckpoint(decrypted, StepDecrypted, decryptionRetryCount)
mx.bridge.EventProcessor.Dispatch(decrypted)
}
@ -371,17 +378,17 @@ func (mx *MatrixHandler) waitLongerForSession(evt *event.Event) {
mx.log.Debugfln("Got session %s after waiting more, trying to decrypt %s again", content.SessionID, evt.ID)
decrypted, err := mx.bridge.Crypto.Decrypt(evt)
if err == nil {
mx.as.SendMessageSendCheckpoint(decrypted, appservice.StepDecrypted, 2)
mx.bridge.SendMessageSuccessCheckpoint(decrypted, StepDecrypted, 2)
mx.bridge.EventProcessor.Dispatch(decrypted)
_, _ = mx.bridge.Bot.RedactEvent(evt.RoomID, resp.EventID)
return
}
mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err)
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, true, 2)
mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, err, true, 2)
update.Body = fmt.Sprintf("\u26a0 Your message was not bridged: %v", err)
} else {
mx.log.Debugfln("Didn't get %s, giving up on %s", content.SessionID, evt.ID)
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, fmt.Errorf("didn't receive encryption keys"), true, 2)
mx.bridge.SendMessageErrorCheckpoint(evt, StepDecrypted, fmt.Errorf("didn't receive encryption keys"), true, 2)
update.Body = "\u26a0 Your message was not bridged: the bridge hasn't received the decryption keys. " +
"If this error keeps happening, try restarting your client."
}

View file

@ -4,7 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package appservice
package bridge
import (
"bytes"
@ -16,6 +16,7 @@ import (
"time"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)
@ -62,18 +63,18 @@ type MessageSendCheckpoint struct {
}
var CheckpointTypes = map[event.Type]struct{}{
event.EventRedaction: {},
event.EventMessage: {},
event.EventEncrypted: {},
event.EventSticker: {},
event.EventReaction: {},
event.CallInvite: {},
event.CallCandidates: {},
event.CallSelectAnswer: {},
event.CallAnswer: {},
event.CallHangup: {},
event.CallReject: {},
event.CallNegotiate: {},
event.EventRedaction: {},
event.EventMessage: {},
event.EventEncrypted: {},
event.EventSticker: {},
event.EventReaction: {},
//event.CallInvite: {},
//event.CallCandidates: {},
//event.CallSelectAnswer: {},
//event.CallAnswer: {},
//event.CallHangup: {},
//event.CallReject: {},
//event.CallNegotiate: {},
}
func NewMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, status MessageSendCheckpointStatus, retryNum int) *MessageSendCheckpoint {
@ -93,22 +94,22 @@ func NewMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep,
return &checkpoint
}
func (as *AppService) SendMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, retryNum int) {
func (br *Bridge) SendMessageSuccessCheckpoint(evt *event.Event, step MessageSendCheckpointStep, retryNum int) {
checkpoint := NewMessageSendCheckpoint(evt, step, StatusSuccesss, retryNum)
go checkpoint.Send(as)
go checkpoint.Send(br.AS)
}
func (as *AppService) SendErrorMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, err error, permanent bool, retryNum int) {
func (br *Bridge) SendMessageErrorCheckpoint(evt *event.Event, step MessageSendCheckpointStep, err error, permanent bool, retryNum int) {
status := StatusWillRetry
if permanent {
status = StatusPermFailure
}
checkpoint := NewMessageSendCheckpoint(evt, step, status, retryNum)
checkpoint.Info = err.Error()
go checkpoint.Send(as)
go checkpoint.Send(br.AS)
}
func (cp *MessageSendCheckpoint) Send(as *AppService) {
func (cp *MessageSendCheckpoint) Send(as *appservice.AppService) {
err := SendCheckpoints(as, []*MessageSendCheckpoint{cp})
if err != nil {
as.Log.Warnfln("Error sending checkpoint %s/%s for %s: %v", cp.Step, cp.Status, cp.EventID, err)
@ -119,11 +120,11 @@ type CheckpointsJSON struct {
Checkpoints []*MessageSendCheckpoint `json:"checkpoints"`
}
func SendCheckpoints(as *AppService, checkpoints []*MessageSendCheckpoint) error {
func SendCheckpoints(as *appservice.AppService, checkpoints []*MessageSendCheckpoint) error {
checkpointsJSON := CheckpointsJSON{Checkpoints: checkpoints}
if as.HasWebsocket() {
return as.SendWebsocket(&WebsocketRequest{
return as.SendWebsocket(&appservice.WebsocketRequest{
Command: "message_checkpoint",
Data: checkpointsJSON,
})