diff --git a/bridge/bridgestate.go b/bridge/bridgestate.go index c16b5c2c..651585ec 100644 --- a/bridge/bridgestate.go +++ b/bridge/bridgestate.go @@ -18,90 +18,10 @@ import ( log "maunium.net/go/maulogger/v2" - "maunium.net/go/mautrix/id" + "maunium.net/go/mautrix/bridge/status" ) -type StateEvent string -type StateErrorCode string - -type StateErrorMap map[StateErrorCode]string - -func (bem StateErrorMap) Update(data StateErrorMap) { - for key, value := range data { - bem[key] = value - } -} - -var StateHumanErrors = make(StateErrorMap) - -const ( - StateUnconfigured StateEvent = "UNCONFIGURED" - StateRunning StateEvent = "RUNNING" - StateConnecting StateEvent = "CONNECTING" - StateBackfilling StateEvent = "BACKFILLING" - StateConnected StateEvent = "CONNECTED" - StateTransientDisconnect StateEvent = "TRANSIENT_DISCONNECT" - StateBadCredentials StateEvent = "BAD_CREDENTIALS" - StateUnknownError StateEvent = "UNKNOWN_ERROR" - StateLoggedOut StateEvent = "LOGGED_OUT" -) - -type State struct { - StateEvent StateEvent `json:"state_event"` - Timestamp int64 `json:"timestamp"` - TTL int `json:"ttl"` - - Source string `json:"source,omitempty"` - Error StateErrorCode `json:"error,omitempty"` - Message string `json:"message,omitempty"` - - UserID id.UserID `json:"user_id,omitempty"` - RemoteID string `json:"remote_id,omitempty"` - RemoteName string `json:"remote_name,omitempty"` - - Reason string `json:"reason,omitempty"` - Info map[string]interface{} `json:"info,omitempty"` -} - -type GlobalState struct { - RemoteStates map[string]State `json:"remoteState"` - BridgeState State `json:"bridgeState"` -} - -type StateFiller interface { - GetMXID() id.UserID - GetRemoteID() string - GetRemoteName() string -} - -func (pong State) Fill(user StateFiller) State { - if user != nil { - pong.UserID = user.GetMXID() - pong.RemoteID = user.GetRemoteID() - pong.RemoteName = user.GetRemoteName() - } - - pong.Timestamp = time.Now().Unix() - pong.Source = "bridge" - if len(pong.Error) > 0 { - pong.TTL = 60 - msg, ok := StateHumanErrors[pong.Error] - if ok { - pong.Message = msg - } - } else { - pong.TTL = 240 - } - return pong -} -func (pong *State) shouldDeduplicate(newPong *State) bool { - if pong == nil || pong.StateEvent != newPong.StateEvent || pong.Error != newPong.Error { - return false - } - return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix() -} - -func (br *Bridge) SendBridgeState(ctx context.Context, state *State) error { +func (br *Bridge) SendBridgeState(ctx context.Context, state *status.BridgeState) error { var body bytes.Buffer if err := json.NewEncoder(&body).Encode(&state); err != nil { return fmt.Errorf("failed to encode bridge state JSON: %w", err) @@ -130,7 +50,7 @@ func (br *Bridge) SendBridgeState(ctx context.Context, state *State) error { return nil } -func (br *Bridge) SendGlobalBridgeState(state State) { +func (br *Bridge) SendGlobalBridgeState(state status.BridgeState) { if len(br.Config.Homeserver.StatusEndpoint) == 0 { return } @@ -145,19 +65,19 @@ func (br *Bridge) SendGlobalBridgeState(state State) { } type BridgeStateQueue struct { - prev *State - ch chan State + prev *status.BridgeState + ch chan status.BridgeState log log.Logger bridge *Bridge - user StateFiller + user status.BridgeStateFiller } -func (br *Bridge) NewBridgeStateQueue(user StateFiller, log log.Logger) *BridgeStateQueue { +func (br *Bridge) NewBridgeStateQueue(user status.BridgeStateFiller, log log.Logger) *BridgeStateQueue { if len(br.Config.Homeserver.StatusEndpoint) == 0 { return nil } bsq := &BridgeStateQueue{ - ch: make(chan State, 10), + ch: make(chan status.BridgeState, 10), log: log, bridge: br, user: user, @@ -178,10 +98,10 @@ func (bsq *BridgeStateQueue) loop() { } } -func (bsq *BridgeStateQueue) immediateSendBridgeState(state State) { +func (bsq *BridgeStateQueue) immediateSendBridgeState(state status.BridgeState) { retryIn := 2 for { - if bsq.prev != nil && bsq.prev.shouldDeduplicate(&state) { + if bsq.prev != nil && bsq.prev.ShouldDeduplicate(&state) { bsq.log.Debugfln("Not sending bridge state %s as it's a duplicate", state.StateEvent) return } @@ -205,7 +125,7 @@ func (bsq *BridgeStateQueue) immediateSendBridgeState(state State) { } } -func (bsq *BridgeStateQueue) Send(state State) { +func (bsq *BridgeStateQueue) Send(state status.BridgeState) { if bsq == nil { return } @@ -226,14 +146,14 @@ func (bsq *BridgeStateQueue) Send(state State) { } } -func (bsq *BridgeStateQueue) GetPrev() State { +func (bsq *BridgeStateQueue) GetPrev() status.BridgeState { if bsq != nil && bsq.prev != nil { return *bsq.prev } - return State{} + return status.BridgeState{} } -func (bsq *BridgeStateQueue) SetPrev(prev State) { +func (bsq *BridgeStateQueue) SetPrev(prev status.BridgeState) { if bsq != nil { bsq.prev = &prev } diff --git a/bridge/matrix.go b/bridge/matrix.go index 9625ca5e..18cacd2e 100644 --- a/bridge/matrix.go +++ b/bridge/matrix.go @@ -17,6 +17,7 @@ import ( "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/bridge/bridgeconfig" + "maunium.net/go/mautrix/bridge/status" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/format" "maunium.net/go/mautrix/id" @@ -48,7 +49,7 @@ func NewMatrixHandler(br *Bridge) *MatrixHandler { TrackEventDuration: noopTrack, } - for evtType := range CheckpointTypes { + for evtType := range status.CheckpointTypes { br.EventProcessor.On(evtType, handler.sendBridgeCheckpoint) } br.EventProcessor.On(event.EventMessage, handler.HandleMessage) @@ -68,7 +69,7 @@ func NewMatrixHandler(br *Bridge) *MatrixHandler { func (mx *MatrixHandler) sendBridgeCheckpoint(evt *event.Event) { if !evt.Mautrix.CheckpointSent { - go mx.bridge.SendMessageSuccessCheckpoint(evt, MsgStepBridge, 0) + go mx.bridge.SendMessageSuccessCheckpoint(evt, status.MsgStepBridge, 0) } } @@ -323,7 +324,7 @@ func (mx *MatrixHandler) shouldIgnoreEvent(evt *event.Event) bool { const sessionWaitTimeout = 3 * time.Second func (mx *MatrixHandler) sendCryptoStatusError(evt *event.Event, editEvent id.EventID, err error, retryCount int, isFinal bool) id.EventID { - mx.bridge.SendMessageErrorCheckpoint(evt, MsgStepDecrypted, err, isFinal, retryCount) + mx.bridge.SendMessageErrorCheckpoint(evt, status.MsgStepDecrypted, err, isFinal, retryCount) if mx.bridge.Config.Bridge.EnableMessageStatusEvents() { statusEvent := &event.BeeperMessageStatusEventContent{ @@ -421,7 +422,7 @@ func (mx *MatrixHandler) postDecrypt(original, decrypted *event.Event, retryCoun } copySomeKeys(original, decrypted) - mx.bridge.SendMessageSuccessCheckpoint(decrypted, MsgStepDecrypted, retryCount) + mx.bridge.SendMessageSuccessCheckpoint(decrypted, status.MsgStepDecrypted, retryCount) decrypted.Mautrix.CheckpointSent = true decrypted.Mautrix.DecryptionDuration = duration mx.log.Debugfln("Successfully decrypted %s", decrypted.ID) @@ -445,7 +446,7 @@ func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) { if errors.Is(err, NoSessionFound) { decryptionRetryCount = 1 mx.log.Debugfln("Couldn't find session %s trying to decrypt %s, waiting %d seconds...", content.SessionID, evt.ID, int(sessionWaitTimeout.Seconds())) - mx.bridge.SendMessageErrorCheckpoint(evt, MsgStepDecrypted, err, false, 0) + mx.bridge.SendMessageErrorCheckpoint(evt, status.MsgStepDecrypted, err, false, 0) if mx.bridge.Crypto.WaitForSession(evt.RoomID, content.SenderKey, content.SessionID, sessionWaitTimeout) { mx.log.Debugfln("Got session %s after waiting, trying to decrypt %s again", content.SessionID, evt.ID) decrypted, err = mx.bridge.Crypto.Decrypt(evt) @@ -455,7 +456,7 @@ func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) { } } if err != nil { - mx.bridge.SendMessageErrorCheckpoint(evt, MsgStepDecrypted, err, true, decryptionRetryCount) + mx.bridge.SendMessageErrorCheckpoint(evt, status.MsgStepDecrypted, err, true, decryptionRetryCount) mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err) go mx.sendCryptoStatusError(evt, "", err, decryptionRetryCount, true) return diff --git a/bridge/messagecheckpoint.go b/bridge/messagecheckpoint.go index 9d6f37e1..d830a48a 100644 --- a/bridge/messagecheckpoint.go +++ b/bridge/messagecheckpoint.go @@ -7,144 +7,40 @@ package bridge import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "time" - - "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" + "maunium.net/go/mautrix/bridge/status" "maunium.net/go/mautrix/event" - "maunium.net/go/mautrix/id" ) -type MessageCheckpointStep string - -const ( - MsgStepClient MessageCheckpointStep = "CLIENT" - MsgStepHomeserver MessageCheckpointStep = "HOMESERVER" - MsgStepBridge MessageCheckpointStep = "BRIDGE" - MsgStepDecrypted MessageCheckpointStep = "DECRYPTED" - MsgStepRemote MessageCheckpointStep = "REMOTE" - MsgStepCommand MessageCheckpointStep = "COMMAND" -) - -type MessageCheckpointStatus string - -const ( - MsgStatusSuccess MessageCheckpointStatus = "SUCCESS" - MsgStatusWillRetry MessageCheckpointStatus = "WILL_RETRY" - MsgStatusPermFailure MessageCheckpointStatus = "PERM_FAILURE" - MsgStatusUnsupported MessageCheckpointStatus = "UNSUPPORTED" - MsgStatusTimeout MessageCheckpointStatus = "TIMEOUT" -) - -func ReasonToCheckpointStatus(reason event.MessageStatusReason, status event.MessageStatus) MessageCheckpointStatus { - if status == event.MessageStatusPending { - return MsgStatusWillRetry - } - switch reason { - case event.MessageStatusUnsupported: - return MsgStatusUnsupported - case event.MessageStatusTooOld: - return MsgStatusTimeout - default: - return MsgStatusPermFailure - } +func (br *Bridge) SendMessageSuccessCheckpoint(evt *event.Event, step status.MessageCheckpointStep, retryNum int) { + br.SendMessageCheckpoint(evt, step, nil, status.MsgStatusSuccess, retryNum) } -type MessageCheckpointReportedBy string - -const ( - MsgReportedByAsmux MessageCheckpointReportedBy = "ASMUX" - MsgReportedByBridge MessageCheckpointReportedBy = "BRIDGE" - MsgReportedByHungry MessageCheckpointReportedBy = "HUNGRYSERV" -) - -type MessageCheckpoint struct { - EventID id.EventID `json:"event_id"` - RoomID id.RoomID `json:"room_id"` - Step MessageCheckpointStep `json:"step"` - Timestamp int64 `json:"timestamp"` - Status MessageCheckpointStatus `json:"status"` - EventType event.Type `json:"event_type"` - ReportedBy MessageCheckpointReportedBy `json:"reported_by"` - RetryNum int `json:"retry_num"` - MessageType event.MessageType `json:"message_type,omitempty"` - Info string `json:"info,omitempty"` - - OriginalEventID id.EventID `json:"original_event_id"` - ManualRetryCount int `json:"manual_retry_count"` -} - -var CheckpointTypes = map[event.Type]struct{}{ - event.EventRedaction: {}, - event.EventMessage: {}, - event.EventEncrypted: {}, - event.EventSticker: {}, - event.EventReaction: {}, - //event.CallInvite: {}, - //event.CallCandidates: {}, - //event.CallSelectAnswer: {}, - //event.CallAnswer: {}, - //event.CallHangup: {}, - //event.CallReject: {}, - //event.CallNegotiate: {}, -} - -func NewMessageCheckpoint(evt *event.Event, step MessageCheckpointStep, status MessageCheckpointStatus, retryNum int) *MessageCheckpoint { - checkpoint := MessageCheckpoint{ - EventID: evt.ID, - RoomID: evt.RoomID, - Step: step, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - Status: status, - EventType: evt.Type, - ReportedBy: MsgReportedByBridge, - RetryNum: retryNum, - } - if evt.Type == event.EventMessage { - checkpoint.MessageType = evt.Content.AsMessage().MsgType - } - if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil { - checkpoint.OriginalEventID = retryMeta.OriginalEventID - checkpoint.ManualRetryCount = retryMeta.RetryCount - } - return &checkpoint -} - -func (br *Bridge) SendMessageSuccessCheckpoint(evt *event.Event, step MessageCheckpointStep, retryNum int) { - br.SendMessageCheckpoint(evt, step, nil, MsgStatusSuccess, retryNum) -} - -func (br *Bridge) SendMessageErrorCheckpoint(evt *event.Event, step MessageCheckpointStep, err error, permanent bool, retryNum int) { - status := MsgStatusWillRetry +func (br *Bridge) SendMessageErrorCheckpoint(evt *event.Event, step status.MessageCheckpointStep, err error, permanent bool, retryNum int) { + s := status.MsgStatusWillRetry if permanent { - status = MsgStatusPermFailure + s = status.MsgStatusPermFailure } - br.SendMessageCheckpoint(evt, step, err, status, retryNum) + br.SendMessageCheckpoint(evt, step, err, s, retryNum) } -func (br *Bridge) SendMessageCheckpoint(evt *event.Event, step MessageCheckpointStep, err error, status MessageCheckpointStatus, retryNum int) { - checkpoint := NewMessageCheckpoint(evt, step, status, retryNum) +func (br *Bridge) SendMessageCheckpoint(evt *event.Event, step status.MessageCheckpointStep, err error, s status.MessageCheckpointStatus, retryNum int) { + checkpoint := status.NewMessageCheckpoint(evt, step, s, retryNum) if err != nil { checkpoint.Info = err.Error() } - go checkpoint.Send(br) + go br.SendRawMessageCheckpoint(checkpoint) } -func (cp *MessageCheckpoint) Send(br *Bridge) { - err := br.SendMessageCheckpoints([]*MessageCheckpoint{cp}) +func (br *Bridge) SendRawMessageCheckpoint(cp *status.MessageCheckpoint) { + err := br.SendMessageCheckpoints([]*status.MessageCheckpoint{cp}) if err != nil { br.Log.Warnfln("Error sending checkpoint %s/%s for %s: %v", cp.Step, cp.Status, cp.EventID, err) } } -func (br *Bridge) SendMessageCheckpoints(checkpoints []*MessageCheckpoint) error { - checkpointsJSON := CheckpointsJSON{Checkpoints: checkpoints} +func (br *Bridge) SendMessageCheckpoints(checkpoints []*status.MessageCheckpoint) error { + checkpointsJSON := status.CheckpointsJSON{Checkpoints: checkpoints} if br.AS.HasWebsocket() { return br.AS.SendWebsocket(&appservice.WebsocketRequest{ @@ -160,39 +56,3 @@ func (br *Bridge) SendMessageCheckpoints(checkpoints []*MessageCheckpoint) error return checkpointsJSON.SendHTTP(endpoint, br.AS.Registration.AppToken) } - -type CheckpointsJSON struct { - Checkpoints []*MessageCheckpoint `json:"checkpoints"` -} - -func (cj *CheckpointsJSON) SendHTTP(endpoint string, token string) error { - var body bytes.Buffer - if err := json.NewEncoder(&body).Encode(cj); err != nil { - return fmt.Errorf("failed to encode message send checkpoint JSON: %w", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, &body) - if err != nil { - return err - } - - req.Header.Set("Authorization", "Bearer "+token) - req.Header.Set("User-Agent", mautrix.DefaultUserAgent+" checkpoint sender") - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to send bridge state update: %w", err) - } - defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode > 299 { - respBody, _ := io.ReadAll(resp.Body) - if respBody != nil { - respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n")) - } - return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody) - } - return nil -} diff --git a/bridge/status/bridgestate.go b/bridge/status/bridgestate.go new file mode 100644 index 00000000..8725d763 --- /dev/null +++ b/bridge/status/bridgestate.go @@ -0,0 +1,94 @@ +// Copyright (c) 2022 Tulir Asokan +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package status + +import ( + "time" + + "maunium.net/go/mautrix/id" +) + +type BridgeStateEvent string +type BridgeStateErrorCode string + +type BridgeStateErrorMap map[BridgeStateErrorCode]string + +func (bem BridgeStateErrorMap) Update(data BridgeStateErrorMap) { + for key, value := range data { + bem[key] = value + } +} + +var BridgeStateHumanErrors = make(BridgeStateErrorMap) + +const ( + StateUnconfigured BridgeStateEvent = "UNCONFIGURED" + StateRunning BridgeStateEvent = "RUNNING" + StateConnecting BridgeStateEvent = "CONNECTING" + StateBackfilling BridgeStateEvent = "BACKFILLING" + StateConnected BridgeStateEvent = "CONNECTED" + StateTransientDisconnect BridgeStateEvent = "TRANSIENT_DISCONNECT" + StateBadCredentials BridgeStateEvent = "BAD_CREDENTIALS" + StateUnknownError BridgeStateEvent = "UNKNOWN_ERROR" + StateLoggedOut BridgeStateEvent = "LOGGED_OUT" +) + +type BridgeState struct { + StateEvent BridgeStateEvent `json:"state_event"` + Timestamp int64 `json:"timestamp"` + TTL int `json:"ttl"` + + Source string `json:"source,omitempty"` + Error BridgeStateErrorCode `json:"error,omitempty"` + Message string `json:"message,omitempty"` + + UserID id.UserID `json:"user_id,omitempty"` + RemoteID string `json:"remote_id,omitempty"` + RemoteName string `json:"remote_name,omitempty"` + + Reason string `json:"reason,omitempty"` + Info map[string]interface{} `json:"info,omitempty"` +} + +type GlobalBridgeState struct { + RemoteStates map[string]BridgeState `json:"remoteState"` + BridgeState BridgeState `json:"bridgeState"` +} + +type BridgeStateFiller interface { + GetMXID() id.UserID + GetRemoteID() string + GetRemoteName() string +} + +func (pong BridgeState) Fill(user BridgeStateFiller) BridgeState { + if user != nil { + pong.UserID = user.GetMXID() + pong.RemoteID = user.GetRemoteID() + pong.RemoteName = user.GetRemoteName() + } + + pong.Timestamp = time.Now().Unix() + pong.Source = "bridge" + if len(pong.Error) > 0 { + pong.TTL = 60 + msg, ok := BridgeStateHumanErrors[pong.Error] + if ok { + pong.Message = msg + } + } else { + pong.TTL = 240 + } + return pong +} + +func (pong *BridgeState) ShouldDeduplicate(newPong *BridgeState) bool { + if pong == nil || pong.StateEvent != newPong.StateEvent || pong.Error != newPong.Error { + return false + } + return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix() +} diff --git a/bridge/status/messagecheckpoint.go b/bridge/status/messagecheckpoint.go new file mode 100644 index 00000000..54d7a804 --- /dev/null +++ b/bridge/status/messagecheckpoint.go @@ -0,0 +1,152 @@ +// Copyright (c) 2021 Sumner Evans +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package status + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +type MessageCheckpointStep string + +const ( + MsgStepClient MessageCheckpointStep = "CLIENT" + MsgStepHomeserver MessageCheckpointStep = "HOMESERVER" + MsgStepBridge MessageCheckpointStep = "BRIDGE" + MsgStepDecrypted MessageCheckpointStep = "DECRYPTED" + MsgStepRemote MessageCheckpointStep = "REMOTE" + MsgStepCommand MessageCheckpointStep = "COMMAND" +) + +type MessageCheckpointStatus string + +const ( + MsgStatusSuccess MessageCheckpointStatus = "SUCCESS" + MsgStatusWillRetry MessageCheckpointStatus = "WILL_RETRY" + MsgStatusPermFailure MessageCheckpointStatus = "PERM_FAILURE" + MsgStatusUnsupported MessageCheckpointStatus = "UNSUPPORTED" + MsgStatusTimeout MessageCheckpointStatus = "TIMEOUT" +) + +func ReasonToCheckpointStatus(reason event.MessageStatusReason, status event.MessageStatus) MessageCheckpointStatus { + if status == event.MessageStatusPending { + return MsgStatusWillRetry + } + switch reason { + case event.MessageStatusUnsupported: + return MsgStatusUnsupported + case event.MessageStatusTooOld: + return MsgStatusTimeout + default: + return MsgStatusPermFailure + } +} + +type MessageCheckpointReportedBy string + +const ( + MsgReportedByAsmux MessageCheckpointReportedBy = "ASMUX" + MsgReportedByBridge MessageCheckpointReportedBy = "BRIDGE" + MsgReportedByHungry MessageCheckpointReportedBy = "HUNGRYSERV" +) + +type MessageCheckpoint struct { + EventID id.EventID `json:"event_id"` + RoomID id.RoomID `json:"room_id"` + Step MessageCheckpointStep `json:"step"` + Timestamp int64 `json:"timestamp"` + Status MessageCheckpointStatus `json:"status"` + EventType event.Type `json:"event_type"` + ReportedBy MessageCheckpointReportedBy `json:"reported_by"` + RetryNum int `json:"retry_num"` + MessageType event.MessageType `json:"message_type,omitempty"` + Info string `json:"info,omitempty"` + + OriginalEventID id.EventID `json:"original_event_id"` + ManualRetryCount int `json:"manual_retry_count"` +} + +var CheckpointTypes = map[event.Type]struct{}{ + event.EventRedaction: {}, + event.EventMessage: {}, + event.EventEncrypted: {}, + event.EventSticker: {}, + event.EventReaction: {}, + //event.CallInvite: {}, + //event.CallCandidates: {}, + //event.CallSelectAnswer: {}, + //event.CallAnswer: {}, + //event.CallHangup: {}, + //event.CallReject: {}, + //event.CallNegotiate: {}, +} + +func NewMessageCheckpoint(evt *event.Event, step MessageCheckpointStep, status MessageCheckpointStatus, retryNum int) *MessageCheckpoint { + checkpoint := MessageCheckpoint{ + EventID: evt.ID, + RoomID: evt.RoomID, + Step: step, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Status: status, + EventType: evt.Type, + ReportedBy: MsgReportedByBridge, + RetryNum: retryNum, + } + if evt.Type == event.EventMessage { + checkpoint.MessageType = evt.Content.AsMessage().MsgType + } + if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil { + checkpoint.OriginalEventID = retryMeta.OriginalEventID + checkpoint.ManualRetryCount = retryMeta.RetryCount + } + return &checkpoint +} + +type CheckpointsJSON struct { + Checkpoints []*MessageCheckpoint `json:"checkpoints"` +} + +func (cj *CheckpointsJSON) SendHTTP(endpoint string, token string) error { + var body bytes.Buffer + if err := json.NewEncoder(&body).Encode(cj); err != nil { + return fmt.Errorf("failed to encode message send checkpoint JSON: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, &body) + if err != nil { + return err + } + + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("User-Agent", mautrix.DefaultUserAgent+" checkpoint sender") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send bridge state update: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode > 299 { + respBody, _ := io.ReadAll(resp.Body) + if respBody != nil { + respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n")) + } + return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody) + } + return nil +}