mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
Move bridge status and message checkpoints to subpackage
This commit is contained in:
parent
67b7b9948e
commit
ba1fce8fce
5 changed files with 281 additions and 254 deletions
|
|
@ -18,90 +18,10 @@ import (
|
|||
|
||||
log "maunium.net/go/maulogger/v2"
|
||||
|
||||
"maunium.net/go/mautrix/id"
|
||||
"maunium.net/go/mautrix/bridge/status"
|
||||
)
|
||||
|
||||
type StateEvent string
|
||||
type StateErrorCode string
|
||||
|
||||
type StateErrorMap map[StateErrorCode]string
|
||||
|
||||
func (bem StateErrorMap) Update(data StateErrorMap) {
|
||||
for key, value := range data {
|
||||
bem[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
var StateHumanErrors = make(StateErrorMap)
|
||||
|
||||
const (
|
||||
StateUnconfigured StateEvent = "UNCONFIGURED"
|
||||
StateRunning StateEvent = "RUNNING"
|
||||
StateConnecting StateEvent = "CONNECTING"
|
||||
StateBackfilling StateEvent = "BACKFILLING"
|
||||
StateConnected StateEvent = "CONNECTED"
|
||||
StateTransientDisconnect StateEvent = "TRANSIENT_DISCONNECT"
|
||||
StateBadCredentials StateEvent = "BAD_CREDENTIALS"
|
||||
StateUnknownError StateEvent = "UNKNOWN_ERROR"
|
||||
StateLoggedOut StateEvent = "LOGGED_OUT"
|
||||
)
|
||||
|
||||
type State struct {
|
||||
StateEvent StateEvent `json:"state_event"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
TTL int `json:"ttl"`
|
||||
|
||||
Source string `json:"source,omitempty"`
|
||||
Error StateErrorCode `json:"error,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
|
||||
UserID id.UserID `json:"user_id,omitempty"`
|
||||
RemoteID string `json:"remote_id,omitempty"`
|
||||
RemoteName string `json:"remote_name,omitempty"`
|
||||
|
||||
Reason string `json:"reason,omitempty"`
|
||||
Info map[string]interface{} `json:"info,omitempty"`
|
||||
}
|
||||
|
||||
type GlobalState struct {
|
||||
RemoteStates map[string]State `json:"remoteState"`
|
||||
BridgeState State `json:"bridgeState"`
|
||||
}
|
||||
|
||||
type StateFiller interface {
|
||||
GetMXID() id.UserID
|
||||
GetRemoteID() string
|
||||
GetRemoteName() string
|
||||
}
|
||||
|
||||
func (pong State) Fill(user StateFiller) State {
|
||||
if user != nil {
|
||||
pong.UserID = user.GetMXID()
|
||||
pong.RemoteID = user.GetRemoteID()
|
||||
pong.RemoteName = user.GetRemoteName()
|
||||
}
|
||||
|
||||
pong.Timestamp = time.Now().Unix()
|
||||
pong.Source = "bridge"
|
||||
if len(pong.Error) > 0 {
|
||||
pong.TTL = 60
|
||||
msg, ok := StateHumanErrors[pong.Error]
|
||||
if ok {
|
||||
pong.Message = msg
|
||||
}
|
||||
} else {
|
||||
pong.TTL = 240
|
||||
}
|
||||
return pong
|
||||
}
|
||||
func (pong *State) shouldDeduplicate(newPong *State) bool {
|
||||
if pong == nil || pong.StateEvent != newPong.StateEvent || pong.Error != newPong.Error {
|
||||
return false
|
||||
}
|
||||
return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix()
|
||||
}
|
||||
|
||||
func (br *Bridge) SendBridgeState(ctx context.Context, state *State) error {
|
||||
func (br *Bridge) SendBridgeState(ctx context.Context, state *status.BridgeState) error {
|
||||
var body bytes.Buffer
|
||||
if err := json.NewEncoder(&body).Encode(&state); err != nil {
|
||||
return fmt.Errorf("failed to encode bridge state JSON: %w", err)
|
||||
|
|
@ -130,7 +50,7 @@ func (br *Bridge) SendBridgeState(ctx context.Context, state *State) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (br *Bridge) SendGlobalBridgeState(state State) {
|
||||
func (br *Bridge) SendGlobalBridgeState(state status.BridgeState) {
|
||||
if len(br.Config.Homeserver.StatusEndpoint) == 0 {
|
||||
return
|
||||
}
|
||||
|
|
@ -145,19 +65,19 @@ func (br *Bridge) SendGlobalBridgeState(state State) {
|
|||
}
|
||||
|
||||
type BridgeStateQueue struct {
|
||||
prev *State
|
||||
ch chan State
|
||||
prev *status.BridgeState
|
||||
ch chan status.BridgeState
|
||||
log log.Logger
|
||||
bridge *Bridge
|
||||
user StateFiller
|
||||
user status.BridgeStateFiller
|
||||
}
|
||||
|
||||
func (br *Bridge) NewBridgeStateQueue(user StateFiller, log log.Logger) *BridgeStateQueue {
|
||||
func (br *Bridge) NewBridgeStateQueue(user status.BridgeStateFiller, log log.Logger) *BridgeStateQueue {
|
||||
if len(br.Config.Homeserver.StatusEndpoint) == 0 {
|
||||
return nil
|
||||
}
|
||||
bsq := &BridgeStateQueue{
|
||||
ch: make(chan State, 10),
|
||||
ch: make(chan status.BridgeState, 10),
|
||||
log: log,
|
||||
bridge: br,
|
||||
user: user,
|
||||
|
|
@ -178,10 +98,10 @@ func (bsq *BridgeStateQueue) loop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (bsq *BridgeStateQueue) immediateSendBridgeState(state State) {
|
||||
func (bsq *BridgeStateQueue) immediateSendBridgeState(state status.BridgeState) {
|
||||
retryIn := 2
|
||||
for {
|
||||
if bsq.prev != nil && bsq.prev.shouldDeduplicate(&state) {
|
||||
if bsq.prev != nil && bsq.prev.ShouldDeduplicate(&state) {
|
||||
bsq.log.Debugfln("Not sending bridge state %s as it's a duplicate", state.StateEvent)
|
||||
return
|
||||
}
|
||||
|
|
@ -205,7 +125,7 @@ func (bsq *BridgeStateQueue) immediateSendBridgeState(state State) {
|
|||
}
|
||||
}
|
||||
|
||||
func (bsq *BridgeStateQueue) Send(state State) {
|
||||
func (bsq *BridgeStateQueue) Send(state status.BridgeState) {
|
||||
if bsq == nil {
|
||||
return
|
||||
}
|
||||
|
|
@ -226,14 +146,14 @@ func (bsq *BridgeStateQueue) Send(state State) {
|
|||
}
|
||||
}
|
||||
|
||||
func (bsq *BridgeStateQueue) GetPrev() State {
|
||||
func (bsq *BridgeStateQueue) GetPrev() status.BridgeState {
|
||||
if bsq != nil && bsq.prev != nil {
|
||||
return *bsq.prev
|
||||
}
|
||||
return State{}
|
||||
return status.BridgeState{}
|
||||
}
|
||||
|
||||
func (bsq *BridgeStateQueue) SetPrev(prev State) {
|
||||
func (bsq *BridgeStateQueue) SetPrev(prev status.BridgeState) {
|
||||
if bsq != nil {
|
||||
bsq.prev = &prev
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import (
|
|||
"maunium.net/go/mautrix"
|
||||
"maunium.net/go/mautrix/appservice"
|
||||
"maunium.net/go/mautrix/bridge/bridgeconfig"
|
||||
"maunium.net/go/mautrix/bridge/status"
|
||||
"maunium.net/go/mautrix/event"
|
||||
"maunium.net/go/mautrix/format"
|
||||
"maunium.net/go/mautrix/id"
|
||||
|
|
@ -48,7 +49,7 @@ func NewMatrixHandler(br *Bridge) *MatrixHandler {
|
|||
|
||||
TrackEventDuration: noopTrack,
|
||||
}
|
||||
for evtType := range CheckpointTypes {
|
||||
for evtType := range status.CheckpointTypes {
|
||||
br.EventProcessor.On(evtType, handler.sendBridgeCheckpoint)
|
||||
}
|
||||
br.EventProcessor.On(event.EventMessage, handler.HandleMessage)
|
||||
|
|
@ -68,7 +69,7 @@ func NewMatrixHandler(br *Bridge) *MatrixHandler {
|
|||
|
||||
func (mx *MatrixHandler) sendBridgeCheckpoint(evt *event.Event) {
|
||||
if !evt.Mautrix.CheckpointSent {
|
||||
go mx.bridge.SendMessageSuccessCheckpoint(evt, MsgStepBridge, 0)
|
||||
go mx.bridge.SendMessageSuccessCheckpoint(evt, status.MsgStepBridge, 0)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -323,7 +324,7 @@ func (mx *MatrixHandler) shouldIgnoreEvent(evt *event.Event) bool {
|
|||
const sessionWaitTimeout = 3 * time.Second
|
||||
|
||||
func (mx *MatrixHandler) sendCryptoStatusError(evt *event.Event, editEvent id.EventID, err error, retryCount int, isFinal bool) id.EventID {
|
||||
mx.bridge.SendMessageErrorCheckpoint(evt, MsgStepDecrypted, err, isFinal, retryCount)
|
||||
mx.bridge.SendMessageErrorCheckpoint(evt, status.MsgStepDecrypted, err, isFinal, retryCount)
|
||||
|
||||
if mx.bridge.Config.Bridge.EnableMessageStatusEvents() {
|
||||
statusEvent := &event.BeeperMessageStatusEventContent{
|
||||
|
|
@ -421,7 +422,7 @@ func (mx *MatrixHandler) postDecrypt(original, decrypted *event.Event, retryCoun
|
|||
}
|
||||
copySomeKeys(original, decrypted)
|
||||
|
||||
mx.bridge.SendMessageSuccessCheckpoint(decrypted, MsgStepDecrypted, retryCount)
|
||||
mx.bridge.SendMessageSuccessCheckpoint(decrypted, status.MsgStepDecrypted, retryCount)
|
||||
decrypted.Mautrix.CheckpointSent = true
|
||||
decrypted.Mautrix.DecryptionDuration = duration
|
||||
mx.log.Debugfln("Successfully decrypted %s", decrypted.ID)
|
||||
|
|
@ -445,7 +446,7 @@ func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) {
|
|||
if errors.Is(err, NoSessionFound) {
|
||||
decryptionRetryCount = 1
|
||||
mx.log.Debugfln("Couldn't find session %s trying to decrypt %s, waiting %d seconds...", content.SessionID, evt.ID, int(sessionWaitTimeout.Seconds()))
|
||||
mx.bridge.SendMessageErrorCheckpoint(evt, MsgStepDecrypted, err, false, 0)
|
||||
mx.bridge.SendMessageErrorCheckpoint(evt, status.MsgStepDecrypted, err, false, 0)
|
||||
if mx.bridge.Crypto.WaitForSession(evt.RoomID, content.SenderKey, content.SessionID, sessionWaitTimeout) {
|
||||
mx.log.Debugfln("Got session %s after waiting, trying to decrypt %s again", content.SessionID, evt.ID)
|
||||
decrypted, err = mx.bridge.Crypto.Decrypt(evt)
|
||||
|
|
@ -455,7 +456,7 @@ func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) {
|
|||
}
|
||||
}
|
||||
if err != nil {
|
||||
mx.bridge.SendMessageErrorCheckpoint(evt, MsgStepDecrypted, err, true, decryptionRetryCount)
|
||||
mx.bridge.SendMessageErrorCheckpoint(evt, status.MsgStepDecrypted, err, true, decryptionRetryCount)
|
||||
mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err)
|
||||
go mx.sendCryptoStatusError(evt, "", err, decryptionRetryCount, true)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -7,144 +7,40 @@
|
|||
package bridge
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"maunium.net/go/mautrix"
|
||||
"maunium.net/go/mautrix/appservice"
|
||||
"maunium.net/go/mautrix/bridge/status"
|
||||
"maunium.net/go/mautrix/event"
|
||||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
||||
type MessageCheckpointStep string
|
||||
|
||||
const (
|
||||
MsgStepClient MessageCheckpointStep = "CLIENT"
|
||||
MsgStepHomeserver MessageCheckpointStep = "HOMESERVER"
|
||||
MsgStepBridge MessageCheckpointStep = "BRIDGE"
|
||||
MsgStepDecrypted MessageCheckpointStep = "DECRYPTED"
|
||||
MsgStepRemote MessageCheckpointStep = "REMOTE"
|
||||
MsgStepCommand MessageCheckpointStep = "COMMAND"
|
||||
)
|
||||
|
||||
type MessageCheckpointStatus string
|
||||
|
||||
const (
|
||||
MsgStatusSuccess MessageCheckpointStatus = "SUCCESS"
|
||||
MsgStatusWillRetry MessageCheckpointStatus = "WILL_RETRY"
|
||||
MsgStatusPermFailure MessageCheckpointStatus = "PERM_FAILURE"
|
||||
MsgStatusUnsupported MessageCheckpointStatus = "UNSUPPORTED"
|
||||
MsgStatusTimeout MessageCheckpointStatus = "TIMEOUT"
|
||||
)
|
||||
|
||||
func ReasonToCheckpointStatus(reason event.MessageStatusReason, status event.MessageStatus) MessageCheckpointStatus {
|
||||
if status == event.MessageStatusPending {
|
||||
return MsgStatusWillRetry
|
||||
}
|
||||
switch reason {
|
||||
case event.MessageStatusUnsupported:
|
||||
return MsgStatusUnsupported
|
||||
case event.MessageStatusTooOld:
|
||||
return MsgStatusTimeout
|
||||
default:
|
||||
return MsgStatusPermFailure
|
||||
}
|
||||
func (br *Bridge) SendMessageSuccessCheckpoint(evt *event.Event, step status.MessageCheckpointStep, retryNum int) {
|
||||
br.SendMessageCheckpoint(evt, step, nil, status.MsgStatusSuccess, retryNum)
|
||||
}
|
||||
|
||||
type MessageCheckpointReportedBy string
|
||||
|
||||
const (
|
||||
MsgReportedByAsmux MessageCheckpointReportedBy = "ASMUX"
|
||||
MsgReportedByBridge MessageCheckpointReportedBy = "BRIDGE"
|
||||
MsgReportedByHungry MessageCheckpointReportedBy = "HUNGRYSERV"
|
||||
)
|
||||
|
||||
type MessageCheckpoint struct {
|
||||
EventID id.EventID `json:"event_id"`
|
||||
RoomID id.RoomID `json:"room_id"`
|
||||
Step MessageCheckpointStep `json:"step"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Status MessageCheckpointStatus `json:"status"`
|
||||
EventType event.Type `json:"event_type"`
|
||||
ReportedBy MessageCheckpointReportedBy `json:"reported_by"`
|
||||
RetryNum int `json:"retry_num"`
|
||||
MessageType event.MessageType `json:"message_type,omitempty"`
|
||||
Info string `json:"info,omitempty"`
|
||||
|
||||
OriginalEventID id.EventID `json:"original_event_id"`
|
||||
ManualRetryCount int `json:"manual_retry_count"`
|
||||
}
|
||||
|
||||
var CheckpointTypes = map[event.Type]struct{}{
|
||||
event.EventRedaction: {},
|
||||
event.EventMessage: {},
|
||||
event.EventEncrypted: {},
|
||||
event.EventSticker: {},
|
||||
event.EventReaction: {},
|
||||
//event.CallInvite: {},
|
||||
//event.CallCandidates: {},
|
||||
//event.CallSelectAnswer: {},
|
||||
//event.CallAnswer: {},
|
||||
//event.CallHangup: {},
|
||||
//event.CallReject: {},
|
||||
//event.CallNegotiate: {},
|
||||
}
|
||||
|
||||
func NewMessageCheckpoint(evt *event.Event, step MessageCheckpointStep, status MessageCheckpointStatus, retryNum int) *MessageCheckpoint {
|
||||
checkpoint := MessageCheckpoint{
|
||||
EventID: evt.ID,
|
||||
RoomID: evt.RoomID,
|
||||
Step: step,
|
||||
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||
Status: status,
|
||||
EventType: evt.Type,
|
||||
ReportedBy: MsgReportedByBridge,
|
||||
RetryNum: retryNum,
|
||||
}
|
||||
if evt.Type == event.EventMessage {
|
||||
checkpoint.MessageType = evt.Content.AsMessage().MsgType
|
||||
}
|
||||
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
|
||||
checkpoint.OriginalEventID = retryMeta.OriginalEventID
|
||||
checkpoint.ManualRetryCount = retryMeta.RetryCount
|
||||
}
|
||||
return &checkpoint
|
||||
}
|
||||
|
||||
func (br *Bridge) SendMessageSuccessCheckpoint(evt *event.Event, step MessageCheckpointStep, retryNum int) {
|
||||
br.SendMessageCheckpoint(evt, step, nil, MsgStatusSuccess, retryNum)
|
||||
}
|
||||
|
||||
func (br *Bridge) SendMessageErrorCheckpoint(evt *event.Event, step MessageCheckpointStep, err error, permanent bool, retryNum int) {
|
||||
status := MsgStatusWillRetry
|
||||
func (br *Bridge) SendMessageErrorCheckpoint(evt *event.Event, step status.MessageCheckpointStep, err error, permanent bool, retryNum int) {
|
||||
s := status.MsgStatusWillRetry
|
||||
if permanent {
|
||||
status = MsgStatusPermFailure
|
||||
s = status.MsgStatusPermFailure
|
||||
}
|
||||
br.SendMessageCheckpoint(evt, step, err, status, retryNum)
|
||||
br.SendMessageCheckpoint(evt, step, err, s, retryNum)
|
||||
}
|
||||
|
||||
func (br *Bridge) SendMessageCheckpoint(evt *event.Event, step MessageCheckpointStep, err error, status MessageCheckpointStatus, retryNum int) {
|
||||
checkpoint := NewMessageCheckpoint(evt, step, status, retryNum)
|
||||
func (br *Bridge) SendMessageCheckpoint(evt *event.Event, step status.MessageCheckpointStep, err error, s status.MessageCheckpointStatus, retryNum int) {
|
||||
checkpoint := status.NewMessageCheckpoint(evt, step, s, retryNum)
|
||||
if err != nil {
|
||||
checkpoint.Info = err.Error()
|
||||
}
|
||||
go checkpoint.Send(br)
|
||||
go br.SendRawMessageCheckpoint(checkpoint)
|
||||
}
|
||||
|
||||
func (cp *MessageCheckpoint) Send(br *Bridge) {
|
||||
err := br.SendMessageCheckpoints([]*MessageCheckpoint{cp})
|
||||
func (br *Bridge) SendRawMessageCheckpoint(cp *status.MessageCheckpoint) {
|
||||
err := br.SendMessageCheckpoints([]*status.MessageCheckpoint{cp})
|
||||
if err != nil {
|
||||
br.Log.Warnfln("Error sending checkpoint %s/%s for %s: %v", cp.Step, cp.Status, cp.EventID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (br *Bridge) SendMessageCheckpoints(checkpoints []*MessageCheckpoint) error {
|
||||
checkpointsJSON := CheckpointsJSON{Checkpoints: checkpoints}
|
||||
func (br *Bridge) SendMessageCheckpoints(checkpoints []*status.MessageCheckpoint) error {
|
||||
checkpointsJSON := status.CheckpointsJSON{Checkpoints: checkpoints}
|
||||
|
||||
if br.AS.HasWebsocket() {
|
||||
return br.AS.SendWebsocket(&appservice.WebsocketRequest{
|
||||
|
|
@ -160,39 +56,3 @@ func (br *Bridge) SendMessageCheckpoints(checkpoints []*MessageCheckpoint) error
|
|||
|
||||
return checkpointsJSON.SendHTTP(endpoint, br.AS.Registration.AppToken)
|
||||
}
|
||||
|
||||
type CheckpointsJSON struct {
|
||||
Checkpoints []*MessageCheckpoint `json:"checkpoints"`
|
||||
}
|
||||
|
||||
func (cj *CheckpointsJSON) SendHTTP(endpoint string, token string) error {
|
||||
var body bytes.Buffer
|
||||
if err := json.NewEncoder(&body).Encode(cj); err != nil {
|
||||
return fmt.Errorf("failed to encode message send checkpoint JSON: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, &body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req.Header.Set("User-Agent", mautrix.DefaultUserAgent+" checkpoint sender")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send bridge state update: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
if respBody != nil {
|
||||
respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n"))
|
||||
}
|
||||
return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
94
bridge/status/bridgestate.go
Normal file
94
bridge/status/bridgestate.go
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
// Copyright (c) 2022 Tulir Asokan
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package status
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
||||
type BridgeStateEvent string
|
||||
type BridgeStateErrorCode string
|
||||
|
||||
type BridgeStateErrorMap map[BridgeStateErrorCode]string
|
||||
|
||||
func (bem BridgeStateErrorMap) Update(data BridgeStateErrorMap) {
|
||||
for key, value := range data {
|
||||
bem[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
var BridgeStateHumanErrors = make(BridgeStateErrorMap)
|
||||
|
||||
const (
|
||||
StateUnconfigured BridgeStateEvent = "UNCONFIGURED"
|
||||
StateRunning BridgeStateEvent = "RUNNING"
|
||||
StateConnecting BridgeStateEvent = "CONNECTING"
|
||||
StateBackfilling BridgeStateEvent = "BACKFILLING"
|
||||
StateConnected BridgeStateEvent = "CONNECTED"
|
||||
StateTransientDisconnect BridgeStateEvent = "TRANSIENT_DISCONNECT"
|
||||
StateBadCredentials BridgeStateEvent = "BAD_CREDENTIALS"
|
||||
StateUnknownError BridgeStateEvent = "UNKNOWN_ERROR"
|
||||
StateLoggedOut BridgeStateEvent = "LOGGED_OUT"
|
||||
)
|
||||
|
||||
type BridgeState struct {
|
||||
StateEvent BridgeStateEvent `json:"state_event"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
TTL int `json:"ttl"`
|
||||
|
||||
Source string `json:"source,omitempty"`
|
||||
Error BridgeStateErrorCode `json:"error,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
|
||||
UserID id.UserID `json:"user_id,omitempty"`
|
||||
RemoteID string `json:"remote_id,omitempty"`
|
||||
RemoteName string `json:"remote_name,omitempty"`
|
||||
|
||||
Reason string `json:"reason,omitempty"`
|
||||
Info map[string]interface{} `json:"info,omitempty"`
|
||||
}
|
||||
|
||||
type GlobalBridgeState struct {
|
||||
RemoteStates map[string]BridgeState `json:"remoteState"`
|
||||
BridgeState BridgeState `json:"bridgeState"`
|
||||
}
|
||||
|
||||
type BridgeStateFiller interface {
|
||||
GetMXID() id.UserID
|
||||
GetRemoteID() string
|
||||
GetRemoteName() string
|
||||
}
|
||||
|
||||
func (pong BridgeState) Fill(user BridgeStateFiller) BridgeState {
|
||||
if user != nil {
|
||||
pong.UserID = user.GetMXID()
|
||||
pong.RemoteID = user.GetRemoteID()
|
||||
pong.RemoteName = user.GetRemoteName()
|
||||
}
|
||||
|
||||
pong.Timestamp = time.Now().Unix()
|
||||
pong.Source = "bridge"
|
||||
if len(pong.Error) > 0 {
|
||||
pong.TTL = 60
|
||||
msg, ok := BridgeStateHumanErrors[pong.Error]
|
||||
if ok {
|
||||
pong.Message = msg
|
||||
}
|
||||
} else {
|
||||
pong.TTL = 240
|
||||
}
|
||||
return pong
|
||||
}
|
||||
|
||||
func (pong *BridgeState) ShouldDeduplicate(newPong *BridgeState) bool {
|
||||
if pong == nil || pong.StateEvent != newPong.StateEvent || pong.Error != newPong.Error {
|
||||
return false
|
||||
}
|
||||
return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix()
|
||||
}
|
||||
152
bridge/status/messagecheckpoint.go
Normal file
152
bridge/status/messagecheckpoint.go
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
// Copyright (c) 2021 Sumner Evans
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package status
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"maunium.net/go/mautrix"
|
||||
"maunium.net/go/mautrix/event"
|
||||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
||||
type MessageCheckpointStep string
|
||||
|
||||
const (
|
||||
MsgStepClient MessageCheckpointStep = "CLIENT"
|
||||
MsgStepHomeserver MessageCheckpointStep = "HOMESERVER"
|
||||
MsgStepBridge MessageCheckpointStep = "BRIDGE"
|
||||
MsgStepDecrypted MessageCheckpointStep = "DECRYPTED"
|
||||
MsgStepRemote MessageCheckpointStep = "REMOTE"
|
||||
MsgStepCommand MessageCheckpointStep = "COMMAND"
|
||||
)
|
||||
|
||||
type MessageCheckpointStatus string
|
||||
|
||||
const (
|
||||
MsgStatusSuccess MessageCheckpointStatus = "SUCCESS"
|
||||
MsgStatusWillRetry MessageCheckpointStatus = "WILL_RETRY"
|
||||
MsgStatusPermFailure MessageCheckpointStatus = "PERM_FAILURE"
|
||||
MsgStatusUnsupported MessageCheckpointStatus = "UNSUPPORTED"
|
||||
MsgStatusTimeout MessageCheckpointStatus = "TIMEOUT"
|
||||
)
|
||||
|
||||
func ReasonToCheckpointStatus(reason event.MessageStatusReason, status event.MessageStatus) MessageCheckpointStatus {
|
||||
if status == event.MessageStatusPending {
|
||||
return MsgStatusWillRetry
|
||||
}
|
||||
switch reason {
|
||||
case event.MessageStatusUnsupported:
|
||||
return MsgStatusUnsupported
|
||||
case event.MessageStatusTooOld:
|
||||
return MsgStatusTimeout
|
||||
default:
|
||||
return MsgStatusPermFailure
|
||||
}
|
||||
}
|
||||
|
||||
type MessageCheckpointReportedBy string
|
||||
|
||||
const (
|
||||
MsgReportedByAsmux MessageCheckpointReportedBy = "ASMUX"
|
||||
MsgReportedByBridge MessageCheckpointReportedBy = "BRIDGE"
|
||||
MsgReportedByHungry MessageCheckpointReportedBy = "HUNGRYSERV"
|
||||
)
|
||||
|
||||
type MessageCheckpoint struct {
|
||||
EventID id.EventID `json:"event_id"`
|
||||
RoomID id.RoomID `json:"room_id"`
|
||||
Step MessageCheckpointStep `json:"step"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Status MessageCheckpointStatus `json:"status"`
|
||||
EventType event.Type `json:"event_type"`
|
||||
ReportedBy MessageCheckpointReportedBy `json:"reported_by"`
|
||||
RetryNum int `json:"retry_num"`
|
||||
MessageType event.MessageType `json:"message_type,omitempty"`
|
||||
Info string `json:"info,omitempty"`
|
||||
|
||||
OriginalEventID id.EventID `json:"original_event_id"`
|
||||
ManualRetryCount int `json:"manual_retry_count"`
|
||||
}
|
||||
|
||||
var CheckpointTypes = map[event.Type]struct{}{
|
||||
event.EventRedaction: {},
|
||||
event.EventMessage: {},
|
||||
event.EventEncrypted: {},
|
||||
event.EventSticker: {},
|
||||
event.EventReaction: {},
|
||||
//event.CallInvite: {},
|
||||
//event.CallCandidates: {},
|
||||
//event.CallSelectAnswer: {},
|
||||
//event.CallAnswer: {},
|
||||
//event.CallHangup: {},
|
||||
//event.CallReject: {},
|
||||
//event.CallNegotiate: {},
|
||||
}
|
||||
|
||||
func NewMessageCheckpoint(evt *event.Event, step MessageCheckpointStep, status MessageCheckpointStatus, retryNum int) *MessageCheckpoint {
|
||||
checkpoint := MessageCheckpoint{
|
||||
EventID: evt.ID,
|
||||
RoomID: evt.RoomID,
|
||||
Step: step,
|
||||
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||
Status: status,
|
||||
EventType: evt.Type,
|
||||
ReportedBy: MsgReportedByBridge,
|
||||
RetryNum: retryNum,
|
||||
}
|
||||
if evt.Type == event.EventMessage {
|
||||
checkpoint.MessageType = evt.Content.AsMessage().MsgType
|
||||
}
|
||||
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
|
||||
checkpoint.OriginalEventID = retryMeta.OriginalEventID
|
||||
checkpoint.ManualRetryCount = retryMeta.RetryCount
|
||||
}
|
||||
return &checkpoint
|
||||
}
|
||||
|
||||
type CheckpointsJSON struct {
|
||||
Checkpoints []*MessageCheckpoint `json:"checkpoints"`
|
||||
}
|
||||
|
||||
func (cj *CheckpointsJSON) SendHTTP(endpoint string, token string) error {
|
||||
var body bytes.Buffer
|
||||
if err := json.NewEncoder(&body).Encode(cj); err != nil {
|
||||
return fmt.Errorf("failed to encode message send checkpoint JSON: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, &body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req.Header.Set("User-Agent", mautrix.DefaultUserAgent+" checkpoint sender")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send bridge state update: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
if respBody != nil {
|
||||
respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n"))
|
||||
}
|
||||
return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue