mirror of
https://mau.dev/mautrix/go.git
synced 2026-03-14 14:25:53 +01:00
Make funcs in the SyncStore interface return errors
This should have been done in #144, but I forgot it. When context is being propagated, the context might be cancelled at any point, which will result in an error that needs to be handled.
This commit is contained in:
parent
db66b4f5d0
commit
02e4140236
3 changed files with 51 additions and 32 deletions
15
client.go
15
client.go
|
|
@ -174,8 +174,15 @@ func (cli *Client) SyncWithContext(ctx context.Context) error {
|
||||||
// We will keep syncing until the syncing state changes. Either because
|
// We will keep syncing until the syncing state changes. Either because
|
||||||
// Sync is called or StopSync is called.
|
// Sync is called or StopSync is called.
|
||||||
syncingID := cli.incrementSyncingID()
|
syncingID := cli.incrementSyncingID()
|
||||||
nextBatch := cli.Store.LoadNextBatch(ctx, cli.UserID)
|
nextBatch, err := cli.Store.LoadNextBatch(ctx, cli.UserID)
|
||||||
filterID := cli.Store.LoadFilterID(ctx, cli.UserID)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
filterID, err := cli.Store.LoadFilterID(ctx, cli.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if filterID == "" {
|
if filterID == "" {
|
||||||
filterJSON := cli.Syncer.GetFilterJSON(cli.UserID)
|
filterJSON := cli.Syncer.GetFilterJSON(cli.UserID)
|
||||||
resFilter, err := cli.CreateFilter(ctx, filterJSON)
|
resFilter, err := cli.CreateFilter(ctx, filterJSON)
|
||||||
|
|
@ -183,7 +190,9 @@ func (cli *Client) SyncWithContext(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
filterID = resFilter.FilterID
|
filterID = resFilter.FilterID
|
||||||
cli.Store.SaveFilterID(ctx, cli.UserID, filterID)
|
if err := cli.Store.SaveFilterID(ctx, cli.UserID, filterID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
lastSuccessfulSync := time.Now().Add(-cli.StreamSyncMinAge - 1*time.Hour)
|
lastSuccessfulSync := time.Now().Add(-cli.StreamSyncMinAge - 1*time.Hour)
|
||||||
for {
|
for {
|
||||||
|
|
|
||||||
|
|
@ -88,22 +88,27 @@ func (store *SQLCryptoStore) GetNextBatch(ctx context.Context) (string, error) {
|
||||||
|
|
||||||
var _ mautrix.SyncStore = (*SQLCryptoStore)(nil)
|
var _ mautrix.SyncStore = (*SQLCryptoStore)(nil)
|
||||||
|
|
||||||
func (store *SQLCryptoStore) SaveFilterID(ctx context.Context, _ id.UserID, _ string) {}
|
func (store *SQLCryptoStore) SaveFilterID(ctx context.Context, _ id.UserID, _ string) error {
|
||||||
func (store *SQLCryptoStore) LoadFilterID(ctx context.Context, _ id.UserID) string { return "" }
|
return nil
|
||||||
|
}
|
||||||
func (store *SQLCryptoStore) SaveNextBatch(ctx context.Context, _ id.UserID, nextBatchToken string) {
|
func (store *SQLCryptoStore) LoadFilterID(ctx context.Context, _ id.UserID) (string, error) {
|
||||||
err := store.PutNextBatch(ctx, nextBatchToken)
|
return "", nil
|
||||||
if err != nil {
|
|
||||||
// TODO handle error
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *SQLCryptoStore) LoadNextBatch(ctx context.Context, _ id.UserID) string {
|
func (store *SQLCryptoStore) SaveNextBatch(ctx context.Context, _ id.UserID, nextBatchToken string) error {
|
||||||
|
err := store.PutNextBatch(ctx, nextBatchToken)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to store batch: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store *SQLCryptoStore) LoadNextBatch(ctx context.Context, _ id.UserID) (string, error) {
|
||||||
nb, err := store.GetNextBatch(ctx)
|
nb, err := store.GetNextBatch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO handle error
|
return "", fmt.Errorf("unable to load batch: %w", err)
|
||||||
}
|
}
|
||||||
return nb
|
return nb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *SQLCryptoStore) FindDeviceID() (deviceID id.DeviceID) {
|
func (store *SQLCryptoStore) FindDeviceID() (deviceID id.DeviceID) {
|
||||||
|
|
|
||||||
49
syncstore.go
49
syncstore.go
|
|
@ -3,6 +3,7 @@ package mautrix
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"maunium.net/go/mautrix/id"
|
"maunium.net/go/mautrix/id"
|
||||||
)
|
)
|
||||||
|
|
@ -16,10 +17,10 @@ var _ SyncStore = (*AccountDataStore)(nil)
|
||||||
// provided "MemorySyncStore" which just keeps data around in-memory which is lost on
|
// provided "MemorySyncStore" which just keeps data around in-memory which is lost on
|
||||||
// restarts.
|
// restarts.
|
||||||
type SyncStore interface {
|
type SyncStore interface {
|
||||||
SaveFilterID(ctx context.Context, userID id.UserID, filterID string)
|
SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error
|
||||||
LoadFilterID(ctx context.Context, userID id.UserID) string
|
LoadFilterID(ctx context.Context, userID id.UserID) (string, error)
|
||||||
SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string)
|
SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error
|
||||||
LoadNextBatch(ctx context.Context, userID id.UserID) string
|
LoadNextBatch(ctx context.Context, userID id.UserID) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deprecated: renamed to SyncStore
|
// Deprecated: renamed to SyncStore
|
||||||
|
|
@ -36,23 +37,25 @@ type MemorySyncStore struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveFilterID to memory.
|
// SaveFilterID to memory.
|
||||||
func (s *MemorySyncStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) {
|
func (s *MemorySyncStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error {
|
||||||
s.Filters[userID] = filterID
|
s.Filters[userID] = filterID
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadFilterID from memory.
|
// LoadFilterID from memory.
|
||||||
func (s *MemorySyncStore) LoadFilterID(ctx context.Context, userID id.UserID) string {
|
func (s *MemorySyncStore) LoadFilterID(ctx context.Context, userID id.UserID) (string, error) {
|
||||||
return s.Filters[userID]
|
return s.Filters[userID], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveNextBatch to memory.
|
// SaveNextBatch to memory.
|
||||||
func (s *MemorySyncStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) {
|
func (s *MemorySyncStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error {
|
||||||
s.NextBatch[userID] = nextBatchToken
|
s.NextBatch[userID] = nextBatchToken
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadNextBatch from memory.
|
// LoadNextBatch from memory.
|
||||||
func (s *MemorySyncStore) LoadNextBatch(ctx context.Context, userID id.UserID) string {
|
func (s *MemorySyncStore) LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) {
|
||||||
return s.NextBatch[userID]
|
return s.NextBatch[userID], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMemorySyncStore constructs a new MemorySyncStore.
|
// NewMemorySyncStore constructs a new MemorySyncStore.
|
||||||
|
|
@ -76,25 +79,26 @@ type accountData struct {
|
||||||
NextBatch string `json:"next_batch"`
|
NextBatch string `json:"next_batch"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AccountDataStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) {
|
func (s *AccountDataStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error {
|
||||||
if userID.String() != s.client.UserID.String() {
|
if userID.String() != s.client.UserID.String() {
|
||||||
panic("AccountDataStore must only be used with a single account")
|
panic("AccountDataStore must only be used with a single account")
|
||||||
}
|
}
|
||||||
s.FilterID = filterID
|
s.FilterID = filterID
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AccountDataStore) LoadFilterID(ctx context.Context, userID id.UserID) string {
|
func (s *AccountDataStore) LoadFilterID(ctx context.Context, userID id.UserID) (string, error) {
|
||||||
if userID.String() != s.client.UserID.String() {
|
if userID.String() != s.client.UserID.String() {
|
||||||
panic("AccountDataStore must only be used with a single account")
|
panic("AccountDataStore must only be used with a single account")
|
||||||
}
|
}
|
||||||
return s.FilterID
|
return s.FilterID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) {
|
func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error {
|
||||||
if userID.String() != s.client.UserID.String() {
|
if userID.String() != s.client.UserID.String() {
|
||||||
panic("AccountDataStore must only be used with a single account")
|
return fmt.Errorf("AccountDataStore must only be used with a single account")
|
||||||
} else if nextBatchToken == s.nextBatch {
|
} else if nextBatchToken == s.nextBatch {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
data := accountData{
|
data := accountData{
|
||||||
|
|
@ -103,7 +107,7 @@ func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID,
|
||||||
|
|
||||||
err := s.client.SetAccountData(ctx, s.EventType, data)
|
err := s.client.SetAccountData(ctx, s.EventType, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.client.Log.Warn().Err(err).Msg("Failed to save next batch token to account data")
|
return fmt.Errorf("failed to save next batch token to account data: %w", err)
|
||||||
} else {
|
} else {
|
||||||
s.client.Log.Debug().
|
s.client.Log.Debug().
|
||||||
Str("old_token", s.nextBatch).
|
Str("old_token", s.nextBatch).
|
||||||
|
|
@ -111,11 +115,12 @@ func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID,
|
||||||
Msg("Saved next batch token")
|
Msg("Saved next batch token")
|
||||||
s.nextBatch = nextBatchToken
|
s.nextBatch = nextBatchToken
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID) string {
|
func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) {
|
||||||
if userID.String() != s.client.UserID.String() {
|
if userID.String() != s.client.UserID.String() {
|
||||||
panic("AccountDataStore must only be used with a single account")
|
return "", fmt.Errorf("AccountDataStore must only be used with a single account")
|
||||||
}
|
}
|
||||||
|
|
||||||
data := &accountData{}
|
data := &accountData{}
|
||||||
|
|
@ -124,15 +129,15 @@ func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, MNotFound) {
|
if errors.Is(err, MNotFound) {
|
||||||
s.client.Log.Debug().Msg("No next batch token found in account data")
|
s.client.Log.Debug().Msg("No next batch token found in account data")
|
||||||
|
return "", nil
|
||||||
} else {
|
} else {
|
||||||
s.client.Log.Warn().Err(err).Msg("Failed to load next batch token from account data")
|
return "", fmt.Errorf("failed to load next batch token from account data: %w", err)
|
||||||
}
|
}
|
||||||
return ""
|
|
||||||
}
|
}
|
||||||
s.nextBatch = data.NextBatch
|
s.nextBatch = data.NextBatch
|
||||||
s.client.Log.Debug().Str("next_batch", data.NextBatch).Msg("Loaded next batch token from account data")
|
s.client.Log.Debug().Str("next_batch", data.NextBatch).Msg("Loaded next batch token from account data")
|
||||||
|
|
||||||
return s.nextBatch
|
return s.nextBatch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAccountDataStore returns a new AccountDataStore, which stores
|
// NewAccountDataStore returns a new AccountDataStore, which stores
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue