From 675652044735a85e80b751463148246931ad0aba Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 15 Dec 2025 10:04:32 +0100 Subject: [PATCH] Move etcd client code to "etcd" package. --- Makefile | 2 +- api_backend.go | 14 +- backend_client.go | 3 +- backend_configuration.go | 5 +- backend_configuration_test.go | 27 +- backend_storage_etcd.go | 14 +- backend_storage_etcd_test.go | 12 +- etcd/api.go | 32 ++ etcd_client.go => etcd/client.go | 51 +-- etcd_client_test.go => etcd/client_test.go | 58 +-- etcd/etcdtest/etcdtest.go | 466 +++++++++++++++++++++ etcd/etcdtest/etcdtest_test.go | 319 ++++++++++++++ grpc_client.go | 15 +- grpc_client_test.go | 41 +- hub.go | 5 +- mcu_proxy.go | 3 +- mcu_proxy_test.go | 75 ++-- proxy/proxy_tokens_etcd.go | 6 +- proxy_config_etcd.go | 15 +- proxy_config_etcd_test.go | 26 +- server/main.go | 3 +- 21 files changed, 1004 insertions(+), 188 deletions(-) rename etcd_client.go => etcd/client.go (83%) rename etcd_client_test.go => etcd/client_test.go (87%) create mode 100644 etcd/etcdtest/etcdtest.go create mode 100644 etcd/etcdtest/etcdtest_test.go diff --git a/Makefile b/Makefile index 28f78fe..b726693 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ PROTO_FILES := $(filter-out $(GRPC_PROTO_FILES),$(basename $(wildcard *.proto))) PROTO_GO_FILES := $(addsuffix .pb.go,$(PROTO_FILES)) GRPC_PROTO_GO_FILES := $(addsuffix .pb.go,$(GRPC_PROTO_FILES)) $(addsuffix _grpc.pb.go,$(GRPC_PROTO_FILES)) TEST_GO_FILES := $(wildcard *_test.go)) -EASYJSON_FILES := $(filter-out $(TEST_GO_FILES),$(wildcard api*.go api/signaling.go talk/ocs.go)) +EASYJSON_FILES := $(filter-out $(TEST_GO_FILES),$(wildcard api*.go api/signaling.go */api.go talk/ocs.go)) EASYJSON_GO_FILES := $(patsubst %.go,%_easyjson.go,$(EASYJSON_FILES)) COMMON_GO_FILES := $(filter-out continentmap.go $(PROTO_GO_FILES) $(GRPC_PROTO_GO_FILES) $(EASYJSON_GO_FILES) $(TEST_GO_FILES),$(wildcard *.go)) CLIENT_TEST_GO_FILES := $(wildcard client/*_test.go)) diff --git a/api_backend.go b/api_backend.go index 1a1a236..322d075 100644 --- a/api_backend.go +++ b/api_backend.go @@ -34,6 +34,7 @@ import ( "time" "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" ) const ( @@ -503,13 +504,6 @@ type BackendServerInfoGrpc struct { Version string `json:"version,omitempty"` } -type BackendServerInfoEtcd struct { - Endpoints []string `json:"endpoints"` - - Active string `json:"active,omitempty"` - Connected *bool `json:"connected,omitempty"` -} - type BackendServerInfo struct { Version string `json:"version"` Features []string `json:"features"` @@ -517,7 +511,7 @@ type BackendServerInfo struct { Sfu *BackendServerInfoSfu `json:"sfu,omitempty"` Dialout []BackendServerInfoDialout `json:"dialout,omitempty"` - Nats *BackendServerInfoNats `json:"nats,omitempty"` - Grpc []BackendServerInfoGrpc `json:"grpc,omitempty"` - Etcd *BackendServerInfoEtcd `json:"etcd,omitempty"` + Nats *BackendServerInfoNats `json:"nats,omitempty"` + Grpc []BackendServerInfoGrpc `json:"grpc,omitempty"` + Etcd *etcd.BackendServerInfoEtcd `json:"etcd,omitempty"` } diff --git a/backend_client.go b/backend_client.go index 2d22e91..54a7def 100644 --- a/backend_client.go +++ b/backend_client.go @@ -33,6 +33,7 @@ import ( "github.com/dlintw/goconf" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/pool" "github.com/strukturag/nextcloud-spreed-signaling/talk" @@ -59,7 +60,7 @@ type BackendClient struct { buffers pool.BufferPool } -func NewBackendClient(ctx context.Context, config *goconf.ConfigFile, maxConcurrentRequestsPerHost int, version string, etcdClient *EtcdClient) (*BackendClient, error) { +func NewBackendClient(ctx context.Context, config *goconf.ConfigFile, maxConcurrentRequestsPerHost int, version string, etcdClient etcd.Client) (*BackendClient, error) { logger := log.LoggerFromContext(ctx) backends, err := NewBackendConfiguration(logger, config, etcdClient) if err != nil { diff --git a/backend_configuration.go b/backend_configuration.go index 99569dc..6329930 100644 --- a/backend_configuration.go +++ b/backend_configuration.go @@ -30,6 +30,7 @@ import ( "github.com/dlintw/goconf" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/talk" @@ -135,11 +136,11 @@ var ( defaultBackendStats = &prometheusBackendStats{} ) -func NewBackendConfiguration(logger log.Logger, config *goconf.ConfigFile, etcdClient *EtcdClient) (*BackendConfiguration, error) { +func NewBackendConfiguration(logger log.Logger, config *goconf.ConfigFile, etcdClient etcd.Client) (*BackendConfiguration, error) { return NewBackendConfigurationWithStats(logger, config, etcdClient, nil) } -func NewBackendConfigurationWithStats(logger log.Logger, config *goconf.ConfigFile, etcdClient *EtcdClient, stats BackendStorageStats) (*BackendConfiguration, error) { +func NewBackendConfigurationWithStats(logger log.Logger, config *goconf.ConfigFile, etcdClient etcd.Client, stats BackendStorageStats) (*BackendConfiguration, error) { backendType, _ := config.GetString("backend", "backendtype") if backendType == "" { backendType = DefaultBackendType diff --git a/backend_configuration_test.go b/backend_configuration_test.go index e460035..a452df4 100644 --- a/backend_configuration_test.go +++ b/backend_configuration_test.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/strukturag/nextcloud-spreed-signaling/etcd/etcdtest" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/talk" ) @@ -510,13 +511,13 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) { logger := log.NewLoggerForTest(t) require := require.New(t) assert := assert.New(t) - etcd, client := NewEtcdClientForTest(t) + embedEtcd, client := etcdtest.NewClientForTest(t) url1 := "https://domain1.invalid/foo" initialSecret1 := string(testBackendSecret) + "-backend1-initial" secret1 := string(testBackendSecret) + "-backend1" - SetEtcdValue(etcd, "/backends/1_one", []byte("{\"url\":\""+url1+"\",\"secret\":\""+initialSecret1+"\"}")) + embedEtcd.SetValue("/backends/1_one", []byte("{\"url\":\""+url1+"\",\"secret\":\""+initialSecret1+"\"}")) config := goconf.NewConfigFile() config.AddOption("backend", "backendtype", "etcd") @@ -543,7 +544,7 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) { } drainWakeupChannel(ch) - SetEtcdValue(etcd, "/backends/1_one", []byte("{\"url\":\""+url1+"\",\"secret\":\""+secret1+"\"}")) + embedEtcd.SetValue("/backends/1_one", []byte("{\"url\":\""+url1+"\",\"secret\":\""+secret1+"\"}")) <-ch assert.Equal(1, stats.value) if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) && @@ -558,7 +559,7 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) { secret2 := string(testBackendSecret) + "-backend2" drainWakeupChannel(ch) - SetEtcdValue(etcd, "/backends/2_two", []byte("{\"url\":\""+url2+"\",\"secret\":\""+secret2+"\"}")) + embedEtcd.SetValue("/backends/2_two", []byte("{\"url\":\""+url2+"\",\"secret\":\""+secret2+"\"}")) <-ch assert.Equal(2, stats.value) if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 2) && @@ -577,7 +578,7 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) { secret3 := string(testBackendSecret) + "-backend3" drainWakeupChannel(ch) - SetEtcdValue(etcd, "/backends/3_three", []byte("{\"url\":\""+url3+"\",\"secret\":\""+secret3+"\"}")) + embedEtcd.SetValue("/backends/3_three", []byte("{\"url\":\""+url3+"\",\"secret\":\""+secret3+"\"}")) <-ch assert.Equal(3, stats.value) if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 3) && @@ -597,7 +598,7 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) { } drainWakeupChannel(ch) - DeleteEtcdValue(etcd, "/backends/1_one") + embedEtcd.DeleteValue("/backends/1_one") <-ch assert.Equal(2, stats.value) if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 2) { @@ -608,7 +609,7 @@ func TestBackendConfiguration_EtcdCompat(t *testing.T) { } drainWakeupChannel(ch) - DeleteEtcdValue(etcd, "/backends/2_two") + embedEtcd.DeleteValue("/backends/2_two") <-ch assert.Equal(1, stats.value) if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) { @@ -760,13 +761,13 @@ func TestBackendConfiguration_EtcdChangeUrls(t *testing.T) { logger := log.NewLoggerForTest(t) require := require.New(t) assert := assert.New(t) - etcd, client := NewEtcdClientForTest(t) + embedEtcd, client := etcdtest.NewClientForTest(t) url1 := "https://domain1.invalid/foo" initialSecret1 := string(testBackendSecret) + "-backend1-initial" secret1 := string(testBackendSecret) + "-backend1" - SetEtcdValue(etcd, "/backends/1_one", []byte("{\"urls\":[\""+url1+"\"],\"secret\":\""+initialSecret1+"\"}")) + embedEtcd.SetValue("/backends/1_one", []byte("{\"urls\":[\""+url1+"\"],\"secret\":\""+initialSecret1+"\"}")) config := goconf.NewConfigFile() config.AddOption("backend", "backendtype", "etcd") @@ -796,7 +797,7 @@ func TestBackendConfiguration_EtcdChangeUrls(t *testing.T) { url2 := "https://domain1.invalid/bar" drainWakeupChannel(ch) - SetEtcdValue(etcd, "/backends/1_one", []byte("{\"urls\":[\""+url1+"\",\""+url2+"\"],\"secret\":\""+secret1+"\"}")) + embedEtcd.SetValue("/backends/1_one", []byte("{\"urls\":[\""+url1+"\",\""+url2+"\"],\"secret\":\""+secret1+"\"}")) <-ch assert.Equal(1, stats.value) if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) && @@ -816,7 +817,7 @@ func TestBackendConfiguration_EtcdChangeUrls(t *testing.T) { url4 := "https://domain3.invalid/foo" drainWakeupChannel(ch) - SetEtcdValue(etcd, "/backends/3_three", []byte("{\"urls\":[\""+url3+"\",\""+url4+"\"],\"secret\":\""+secret3+"\"}")) + embedEtcd.SetValue("/backends/3_three", []byte("{\"urls\":[\""+url3+"\",\""+url4+"\"],\"secret\":\""+secret3+"\"}")) <-ch assert.Equal(2, stats.value) if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 2) && @@ -836,7 +837,7 @@ func TestBackendConfiguration_EtcdChangeUrls(t *testing.T) { } drainWakeupChannel(ch) - DeleteEtcdValue(etcd, "/backends/1_one") + embedEtcd.DeleteValue("/backends/1_one") <-ch assert.Equal(1, stats.value) if backends := sortBackends(cfg.GetBackends()); assert.Len(backends, 1) { @@ -845,7 +846,7 @@ func TestBackendConfiguration_EtcdChangeUrls(t *testing.T) { } drainWakeupChannel(ch) - DeleteEtcdValue(etcd, "/backends/3_three") + embedEtcd.DeleteValue("/backends/3_three") <-ch assert.Equal(0, stats.value) diff --git a/backend_storage_etcd.go b/backend_storage_etcd.go index acb4b7c..03a04ac 100644 --- a/backend_storage_etcd.go +++ b/backend_storage_etcd.go @@ -42,7 +42,7 @@ type backendStorageEtcd struct { backendStorageCommon logger log.Logger - etcdClient *EtcdClient + etcdClient etcd.Client keyPrefix string keyInfos map[string]*etcd.BackendInformationEtcd @@ -54,7 +54,7 @@ type backendStorageEtcd struct { closeFunc context.CancelFunc } -func NewBackendStorageEtcd(logger log.Logger, config *goconf.ConfigFile, etcdClient *EtcdClient, stats BackendStorageStats) (BackendStorage, error) { +func NewBackendStorageEtcd(logger log.Logger, config *goconf.ConfigFile, etcdClient etcd.Client, stats BackendStorageStats) (BackendStorage, error) { if etcdClient == nil || !etcdClient.IsConfigured() { return nil, errors.New("no etcd endpoints configured") } @@ -106,7 +106,7 @@ func (s *backendStorageEtcd) wakeupForTesting() { } } -func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { +func (s *backendStorageEtcd) EtcdClientCreated(client etcd.Client) { go func() { if err := client.WaitForConnection(s.closeCtx); err != nil { if errors.Is(err, context.Canceled) { @@ -164,17 +164,17 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { }() } -func (s *backendStorageEtcd) EtcdWatchCreated(client *EtcdClient, key string) { +func (s *backendStorageEtcd) EtcdWatchCreated(client etcd.Client, key string) { } -func (s *backendStorageEtcd) getBackends(ctx context.Context, client *EtcdClient, keyPrefix string) (*clientv3.GetResponse, error) { +func (s *backendStorageEtcd) getBackends(ctx context.Context, client etcd.Client, keyPrefix string) (*clientv3.GetResponse, error) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() return client.Get(ctx, keyPrefix, clientv3.WithPrefix()) } -func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) { +func (s *backendStorageEtcd) EtcdKeyUpdated(client etcd.Client, key string, data []byte, prevValue []byte) { var info etcd.BackendInformationEtcd if err := json.Unmarshal(data, &info); err != nil { s.logger.Printf("Could not decode backend information %s: %s", string(data), err) @@ -228,7 +228,7 @@ func (s *backendStorageEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data s.wakeupForTesting() } -func (s *backendStorageEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { +func (s *backendStorageEtcd) EtcdKeyDeleted(client etcd.Client, key string, prevValue []byte) { s.mu.Lock() defer s.mu.Unlock() diff --git a/backend_storage_etcd_test.go b/backend_storage_etcd_test.go index 255882a..0ff8efe 100644 --- a/backend_storage_etcd_test.go +++ b/backend_storage_etcd_test.go @@ -26,8 +26,9 @@ import ( "github.com/dlintw/goconf" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/server/v3/embed" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" + "github.com/strukturag/nextcloud-spreed-signaling/etcd/etcdtest" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/test" ) @@ -46,21 +47,20 @@ func (s *backendStorageEtcd) getWakeupChannelForTesting() <-chan struct{} { } type testListener struct { - etcd *embed.Etcd + etcd *etcdtest.TestServer closed chan struct{} } -func (tl *testListener) EtcdClientCreated(client *EtcdClient) { - tl.etcd.Server.Stop() +func (tl *testListener) EtcdClientCreated(client etcd.Client) { close(tl.closed) } func Test_BackendStorageEtcdNoLeak(t *testing.T) { // nolint:paralleltest logger := log.NewLoggerForTest(t) test.EnsureNoGoroutinesLeak(t, func(t *testing.T) { - etcd, client := NewEtcdClientForTest(t) + embedEtcd, client := etcdtest.NewClientForTest(t) tl := &testListener{ - etcd: etcd, + etcd: embedEtcd, closed: make(chan struct{}), } client.AddListener(tl) diff --git a/etcd/api.go b/etcd/api.go index 60b5ad1..557269a 100644 --- a/etcd/api.go +++ b/etcd/api.go @@ -22,6 +22,7 @@ package etcd import ( + "context" "errors" "fmt" "net/url" @@ -29,8 +30,32 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/internal" + clientv3 "go.etcd.io/etcd/client/v3" ) +type ClientListener interface { + EtcdClientCreated(client Client) +} + +type ClientWatcher interface { + EtcdWatchCreated(client Client, key string) + EtcdKeyUpdated(client Client, key string, value []byte, prevValue []byte) + EtcdKeyDeleted(client Client, key string, prevValue []byte) +} + +type Client interface { + IsConfigured() bool + WaitForConnection(ctx context.Context) error + GetServerInfoEtcd() *BackendServerInfoEtcd + Close() error + + AddListener(listener ClientListener) + RemoveListener(listener ClientListener) + + Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) + Watch(ctx context.Context, key string, nextRevision int64, watcher ClientWatcher, opts ...clientv3.OpOption) (int64, error) +} + // Information on a backend in the etcd cluster. type BackendInformationEtcd struct { @@ -97,3 +122,10 @@ func (p *BackendInformationEtcd) CheckValid() (err error) { return nil } + +type BackendServerInfoEtcd struct { + Endpoints []string `json:"endpoints"` + + Active string `json:"active,omitempty"` + Connected *bool `json:"connected,omitempty"` +} diff --git a/etcd_client.go b/etcd/client.go similarity index 83% rename from etcd_client.go rename to etcd/client.go index ef498ab..a9cbc93 100644 --- a/etcd_client.go +++ b/etcd/client.go @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package etcd import ( "context" @@ -43,28 +43,23 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/log" ) -type EtcdClientListener interface { - EtcdClientCreated(client *EtcdClient) -} +var ( + initialWaitDelay = time.Second + maxWaitDelay = 8 * time.Second +) -type EtcdClientWatcher interface { - EtcdWatchCreated(client *EtcdClient, key string) - EtcdKeyUpdated(client *EtcdClient, key string, value []byte, prevValue []byte) - EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) -} - -type EtcdClient struct { +type etcdClient struct { logger log.Logger compatSection string mu sync.Mutex client atomic.Value // +checklocks:mu - listeners map[EtcdClientListener]bool + listeners map[ClientListener]bool } -func NewEtcdClient(logger log.Logger, config *goconf.ConfigFile, compatSection string) (*EtcdClient, error) { - result := &EtcdClient{ +func NewClient(logger log.Logger, config *goconf.ConfigFile, compatSection string) (Client, error) { + result := &etcdClient{ logger: logger, compatSection: compatSection, } @@ -75,7 +70,7 @@ func NewEtcdClient(logger log.Logger, config *goconf.ConfigFile, compatSection s return result, nil } -func (c *EtcdClient) GetServerInfoEtcd() *BackendServerInfoEtcd { +func (c *etcdClient) GetServerInfoEtcd() *BackendServerInfoEtcd { client := c.getEtcdClient() if client == nil { return nil @@ -94,7 +89,7 @@ func (c *EtcdClient) GetServerInfoEtcd() *BackendServerInfoEtcd { return result } -func (c *EtcdClient) getConfigStringWithFallback(config *goconf.ConfigFile, option string) string { +func (c *etcdClient) getConfigStringWithFallback(config *goconf.ConfigFile, option string) string { value, _ := config.GetString("etcd", option) if value == "" && c.compatSection != "" { value, _ = config.GetString(c.compatSection, option) @@ -106,7 +101,7 @@ func (c *EtcdClient) getConfigStringWithFallback(config *goconf.ConfigFile, opti return value } -func (c *EtcdClient) load(config *goconf.ConfigFile, ignoreErrors bool) error { +func (c *etcdClient) load(config *goconf.ConfigFile, ignoreErrors bool) error { var endpoints []string if endpointsString := c.getConfigStringWithFallback(config, "endpoints"); endpointsString != "" { endpoints = slices.Collect(internal.SplitEntries(endpointsString, ",")) @@ -189,7 +184,7 @@ func (c *EtcdClient) load(config *goconf.ConfigFile, ignoreErrors bool) error { return nil } -func (c *EtcdClient) Close() error { +func (c *etcdClient) Close() error { client := c.getEtcdClient() if client != nil { return client.Close() @@ -198,11 +193,11 @@ func (c *EtcdClient) Close() error { return nil } -func (c *EtcdClient) IsConfigured() bool { +func (c *etcdClient) IsConfigured() bool { return c.getEtcdClient() != nil } -func (c *EtcdClient) getEtcdClient() *clientv3.Client { +func (c *etcdClient) getEtcdClient() *clientv3.Client { client := c.client.Load() if client == nil { return nil @@ -211,14 +206,14 @@ func (c *EtcdClient) getEtcdClient() *clientv3.Client { return client.(*clientv3.Client) } -func (c *EtcdClient) syncClient(ctx context.Context) error { +func (c *etcdClient) syncClient(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() return c.getEtcdClient().Sync(ctx) } -func (c *EtcdClient) notifyListeners() { +func (c *etcdClient) notifyListeners() { c.mu.Lock() defer c.mu.Unlock() @@ -227,12 +222,12 @@ func (c *EtcdClient) notifyListeners() { } } -func (c *EtcdClient) AddListener(listener EtcdClientListener) { +func (c *etcdClient) AddListener(listener ClientListener) { c.mu.Lock() defer c.mu.Unlock() if c.listeners == nil { - c.listeners = make(map[EtcdClientListener]bool) + c.listeners = make(map[ClientListener]bool) } c.listeners[listener] = true if client := c.getEtcdClient(); client != nil { @@ -240,14 +235,14 @@ func (c *EtcdClient) AddListener(listener EtcdClientListener) { } } -func (c *EtcdClient) RemoveListener(listener EtcdClientListener) { +func (c *etcdClient) RemoveListener(listener ClientListener) { c.mu.Lock() defer c.mu.Unlock() delete(c.listeners, listener) } -func (c *EtcdClient) WaitForConnection(ctx context.Context) error { +func (c *etcdClient) WaitForConnection(ctx context.Context) error { backoff, err := async.NewExponentialBackoff(initialWaitDelay, maxWaitDelay) if err != nil { return err @@ -276,11 +271,11 @@ func (c *EtcdClient) WaitForConnection(ctx context.Context) error { } } -func (c *EtcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { +func (c *etcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { return c.getEtcdClient().Get(ctx, key, opts...) } -func (c *EtcdClient) Watch(ctx context.Context, key string, nextRevision int64, watcher EtcdClientWatcher, opts ...clientv3.OpOption) (int64, error) { +func (c *etcdClient) Watch(ctx context.Context, key string, nextRevision int64, watcher ClientWatcher, opts ...clientv3.OpOption) (int64, error) { c.logger.Printf("Wait for leader and start watching on %s (rev=%d)", key, nextRevision) opts = append(opts, clientv3.WithRev(nextRevision), clientv3.WithPrevKV()) ch := c.getEtcdClient().Watch(clientv3.WithRequireLeader(ctx), key, opts...) diff --git a/etcd_client_test.go b/etcd/client_test.go similarity index 87% rename from etcd_client_test.go rename to etcd/client_test.go index da8b383..b789125 100644 --- a/etcd_client_test.go +++ b/etcd/client_test.go @@ -19,7 +19,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package signaling +package etcd import ( "context" @@ -48,6 +48,10 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/test" ) +const ( + testTimeout = 10 * time.Second +) + var ( etcdListenUrl = "http://localhost:8080" ) @@ -129,7 +133,7 @@ func NewEtcdForTest(t *testing.T) *embed.Etcd { return etcd } -func NewEtcdClientForTest(t *testing.T) (*embed.Etcd, *EtcdClient) { +func NewClientForTest(t *testing.T) (*embed.Etcd, Client) { etcd := NewEtcdForTest(t) config := goconf.NewConfigFile() @@ -137,7 +141,7 @@ func NewEtcdClientForTest(t *testing.T) (*embed.Etcd, *EtcdClient) { config.AddOption("etcd", "loglevel", "error") logger := log.NewLoggerForTest(t) - client, err := NewEtcdClient(logger, config, "") + client, err := NewClient(logger, config, "") require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, client.Close()) @@ -145,7 +149,7 @@ func NewEtcdClientForTest(t *testing.T) (*embed.Etcd, *EtcdClient) { return etcd, client } -func NewEtcdClientWithTLSForTest(t *testing.T) (*embed.Etcd, *EtcdClient) { +func NewEtcdClientWithTLSForTest(t *testing.T) (*embed.Etcd, Client) { etcd, keyfile, certfile := NewEtcdForTestWithTls(t, true) config := goconf.NewConfigFile() @@ -156,7 +160,7 @@ func NewEtcdClientWithTLSForTest(t *testing.T) (*embed.Etcd, *EtcdClient) { config.AddOption("etcd", "cacert", certfile) logger := log.NewLoggerForTest(t) - client, err := NewEtcdClient(logger, config, "") + client, err := NewClient(logger, config, "") require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, client.Close()) @@ -164,14 +168,14 @@ func NewEtcdClientWithTLSForTest(t *testing.T) (*embed.Etcd, *EtcdClient) { return etcd, client } -func SetEtcdValue(etcd *embed.Etcd, key string, value []byte) { +func SetValue(etcd *embed.Etcd, key string, value []byte) { if kv := etcd.Server.KV(); kv != nil { kv.Put([]byte(key), value, lease.NoLease) kv.Commit() } } -func DeleteEtcdValue(etcd *embed.Etcd, key string) { +func DeleteValue(etcd *embed.Etcd, key string) { if kv := etcd.Server.KV(); kv != nil { kv.DeleteRange([]byte(key), nil) kv.Commit() @@ -184,7 +188,7 @@ func Test_EtcdClient_Get(t *testing.T) { ctx := log.NewLoggerContext(t.Context(), logger) assert := assert.New(t) require := require.New(t) - etcd, client := NewEtcdClientForTest(t) + etcd, client := NewClientForTest(t) ctx, cancel := context.WithTimeout(ctx, testTimeout) defer cancel() @@ -194,9 +198,7 @@ func Test_EtcdClient_Get(t *testing.T) { assert.Equal([]string{ etcd.Config().ListenClientUrls[0].String(), }, info.Endpoints) - if connected := info.Connected; assert.NotNil(connected) { - assert.False(*connected) - } + assert.NotNil(info.Connected) } require.NoError(client.WaitForConnection(ctx)) @@ -215,7 +217,7 @@ func Test_EtcdClient_Get(t *testing.T) { assert.EqualValues(0, response.Count) } - SetEtcdValue(etcd, "foo", []byte("bar")) + SetValue(etcd, "foo", []byte("bar")) if response, err := client.Get(ctx, "foo"); assert.NoError(err) { if assert.EqualValues(1, response.Count) { @@ -241,9 +243,7 @@ func Test_EtcdClientTLS_Get(t *testing.T) { assert.Equal([]string{ etcd.Config().ListenClientUrls[0].String(), }, info.Endpoints) - if connected := info.Connected; assert.NotNil(connected) { - assert.False(*connected) - } + assert.NotNil(info.Connected) } require.NoError(client.WaitForConnection(ctx)) @@ -262,7 +262,7 @@ func Test_EtcdClientTLS_Get(t *testing.T) { assert.EqualValues(0, response.Count) } - SetEtcdValue(etcd, "foo", []byte("bar")) + SetValue(etcd, "foo", []byte("bar")) if response, err := client.Get(ctx, "foo"); assert.NoError(err) { if assert.EqualValues(1, response.Count) { @@ -277,15 +277,15 @@ func Test_EtcdClient_GetPrefix(t *testing.T) { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) assert := assert.New(t) - etcd, client := NewEtcdClientForTest(t) + etcd, client := NewClientForTest(t) if response, err := client.Get(ctx, "foo"); assert.NoError(err) { assert.EqualValues(0, response.Count) } - SetEtcdValue(etcd, "foo", []byte("1")) - SetEtcdValue(etcd, "foo/lala", []byte("2")) - SetEtcdValue(etcd, "lala/foo", []byte("3")) + SetValue(etcd, "foo", []byte("1")) + SetValue(etcd, "foo/lala", []byte("2")) + SetValue(etcd, "lala/foo", []byte("3")) if response, err := client.Get(ctx, "foo", clientv3.WithPrefix()); assert.NoError(err) { if assert.EqualValues(2, response.Count) { @@ -332,7 +332,7 @@ func (l *EtcdClientTestListener) Close() { l.cancel() } -func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { +func (l *EtcdClientTestListener) EtcdClientCreated(client Client) { go func() { assert := assert.New(l.t) if err := client.WaitForConnection(l.ctx); !assert.NoError(err) { @@ -358,10 +358,10 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { }() } -func (l *EtcdClientTestListener) EtcdWatchCreated(client *EtcdClient, key string) { +func (l *EtcdClientTestListener) EtcdWatchCreated(client Client, key string) { } -func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, value []byte, prevValue []byte) { +func (l *EtcdClientTestListener) EtcdKeyUpdated(client Client, key string, value []byte, prevValue []byte) { evt := etcdEvent{ t: clientv3.EventTypePut, key: string(key), @@ -373,7 +373,7 @@ func (l *EtcdClientTestListener) EtcdKeyUpdated(client *EtcdClient, key string, l.events <- evt } -func (l *EtcdClientTestListener) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { +func (l *EtcdClientTestListener) EtcdKeyDeleted(client Client, key string, prevValue []byte) { evt := etcdEvent{ t: clientv3.EventTypeDelete, key: string(key), @@ -389,9 +389,9 @@ func Test_EtcdClient_Watch(t *testing.T) { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) assert := assert.New(t) - etcd, client := NewEtcdClientForTest(t) + etcd, client := NewClientForTest(t) - SetEtcdValue(etcd, "foo/a", []byte("1")) + SetValue(etcd, "foo/a", []byte("1")) listener := NewEtcdClientTestListener(ctx, t) defer listener.Close() @@ -401,19 +401,19 @@ func Test_EtcdClient_Watch(t *testing.T) { <-listener.initial - SetEtcdValue(etcd, "foo/b", []byte("2")) + SetValue(etcd, "foo/b", []byte("2")) event := <-listener.events assert.Equal(clientv3.EventTypePut, event.t) assert.Equal("foo/b", event.key) assert.Equal("2", event.value) - SetEtcdValue(etcd, "foo/a", []byte("3")) + SetValue(etcd, "foo/a", []byte("3")) event = <-listener.events assert.Equal(clientv3.EventTypePut, event.t) assert.Equal("foo/a", event.key) assert.Equal("3", event.value) - DeleteEtcdValue(etcd, "foo/a") + DeleteValue(etcd, "foo/a") event = <-listener.events assert.Equal(clientv3.EventTypeDelete, event.t) assert.Equal("foo/a", event.key) diff --git a/etcd/etcdtest/etcdtest.go b/etcd/etcdtest/etcdtest.go new file mode 100644 index 0000000..4a8603d --- /dev/null +++ b/etcd/etcdtest/etcdtest.go @@ -0,0 +1,466 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2025 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package etcdtest + +import ( + "bytes" + "context" + "errors" + "net" + "net/url" + "os" + "slices" + "strconv" + "strings" + "sync" + "testing" + + "github.com/dlintw/goconf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" + "go.etcd.io/etcd/server/v3/lease" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/strukturag/nextcloud-spreed-signaling/etcd" + "github.com/strukturag/nextcloud-spreed-signaling/log" + "github.com/strukturag/nextcloud-spreed-signaling/test" +) + +var ( + etcdListenUrl = "http://localhost:8080" +) + +type Server struct { + embed *embed.Etcd +} + +func (s *Server) URL() *url.URL { + return &s.embed.Config().ListenClientUrls[0] +} + +func (s *Server) SetValue(key string, value []byte) { + if kv := s.embed.Server.KV(); kv != nil { + kv.Put([]byte(key), value, lease.NoLease) + kv.Commit() + } +} + +func (s *Server) DeleteValue(key string) { + if kv := s.embed.Server.KV(); kv != nil { + kv.DeleteRange([]byte(key), nil) + kv.Commit() + } +} + +func NewServerForTest(t *testing.T) *Server { + t.Helper() + require := require.New(t) + cfg := embed.NewConfig() + cfg.Dir = t.TempDir() + os.Chmod(cfg.Dir, 0700) // nolint + cfg.LogLevel = "warn" + cfg.Name = "signalingtest" + cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel))) + + u, err := url.Parse(etcdListenUrl) + require.NoError(err) + + // Find a free port to bind the server to. + var etcd *embed.Etcd + for port := 50000; port < 50100; port++ { + u.Host = net.JoinHostPort("localhost", strconv.Itoa(port)) + cfg.ListenClientUrls = []url.URL{*u} + cfg.AdvertiseClientUrls = []url.URL{*u} + httpListener := u + httpListener.Host = net.JoinHostPort("localhost", strconv.Itoa(port+1)) + cfg.ListenClientHttpUrls = []url.URL{*httpListener} + peerListener := u + peerListener.Host = net.JoinHostPort("localhost", strconv.Itoa(port+2)) + cfg.ListenPeerUrls = []url.URL{*peerListener} + cfg.AdvertisePeerUrls = []url.URL{*peerListener} + cfg.InitialCluster = "signalingtest=" + peerListener.String() + etcd, err = embed.StartEtcd(cfg) + if test.IsErrorAddressAlreadyInUse(err) { + continue + } + + require.NoError(err) + break + } + require.NotNil(etcd, "could not find free port") + + t.Cleanup(func() { + etcd.Close() + <-etcd.Server.StopNotify() + }) + // Wait for server to be ready. + <-etcd.Server.ReadyNotify() + + server := &Server{ + embed: etcd, + } + return server +} + +func NewEtcdClientForTest(t *testing.T, server *Server) etcd.Client { + t.Helper() + + logger := log.NewLoggerForTest(t) + + config := goconf.NewConfigFile() + config.AddOption("etcd", "endpoints", server.URL().String()) + + client, err := etcd.NewClient(logger, config, "") + require.NoError(t, err) + + t.Cleanup(func() { + assert.NoError(t, client.Close()) + }) + + return client +} + +type testWatch struct { + key string + op clientv3.Op + rev int64 + + watcher etcd.ClientWatcher +} + +type testClient struct { + mu sync.Mutex + server *TestServer + + // +checklocks:mu + closed bool + closeCh chan struct{} + processCh chan func() + // +checklocks:mu + listeners []etcd.ClientListener + // +checklocks:mu + watchers []*testWatch +} + +func newTestClient(server *TestServer) *testClient { + client := &testClient{ + server: server, + closeCh: make(chan struct{}), + processCh: make(chan func(), 1), + } + go func() { + defer close(client.closeCh) + for { + f := <-client.processCh + if f == nil { + return + } + + f() + } + }() + return client +} + +func (c *testClient) IsConfigured() bool { + return true +} + +func (c *testClient) WaitForConnection(ctx context.Context) error { + return nil +} + +func (c *testClient) GetServerInfoEtcd() *etcd.BackendServerInfoEtcd { + return &etcd.BackendServerInfoEtcd{} +} + +func (c *testClient) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return nil + } + + c.closed = true + c.server.removeClient(c) + close(c.processCh) + <-c.closeCh + return nil +} + +func (c *testClient) AddListener(listener etcd.ClientListener) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return + } + + c.listeners = append(c.listeners, listener) + c.processCh <- func() { + listener.EtcdClientCreated(c) + } +} + +func (c *testClient) RemoveListener(listener etcd.ClientListener) { + c.mu.Lock() + defer c.mu.Unlock() + + c.listeners = slices.DeleteFunc(c.listeners, func(l etcd.ClientListener) bool { + return l == listener + }) +} + +func (c *testClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + keys, values, revision := c.server.getValues(key, 0, opts...) + response := &clientv3.GetResponse{ + Count: int64(len(values)), + Header: &etcdserverpb.ResponseHeader{ + Revision: revision, + }, + } + for idx, key := range keys { + response.Kvs = append(response.Kvs, &mvccpb.KeyValue{ + Key: []byte(key), + Value: values[idx], + }) + } + return response, nil +} + +func (c *testClient) notifyUpdated(key string, oldValue []byte, newValue []byte) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return + } + + for _, w := range c.watchers { + if withPrefix := w.op.IsOptsWithPrefix(); (withPrefix && strings.HasPrefix(key, w.key)) || (!withPrefix && key == w.key) { + c.processCh <- func() { + w.watcher.EtcdKeyUpdated(c, key, newValue, oldValue) + } + } + } +} + +func (c *testClient) notifyDeleted(key string, oldValue []byte) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return + } + + for _, w := range c.watchers { + if withPrefix := w.op.IsOptsWithPrefix(); (withPrefix && strings.HasPrefix(key, w.key)) || (!withPrefix && key == w.key) { + c.processCh <- func() { + w.watcher.EtcdKeyDeleted(c, key, oldValue) + } + } + } +} + +func (c *testClient) addWatcher(w *testWatch, opts ...clientv3.OpOption) error { + keys, values, _ := c.server.getValues(w.key, w.rev, opts...) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return errors.New("closed") + } + + c.watchers = append(c.watchers, w) + c.processCh <- func() { + w.watcher.EtcdWatchCreated(c, w.key) + } + + for idx, key := range keys { + c.processCh <- func() { + w.watcher.EtcdKeyUpdated(c, key, values[idx], nil) + } + } + + return nil +} + +func (c *testClient) Watch(ctx context.Context, key string, nextRevision int64, watcher etcd.ClientWatcher, opts ...clientv3.OpOption) (int64, error) { + w := &testWatch{ + key: key, + rev: nextRevision, + + watcher: watcher, + } + for _, o := range opts { + o(&w.op) + } + + if err := c.addWatcher(w, opts...); err != nil { + return 0, err + } + + select { + case <-c.closeCh: + // Client is closed. + case <-ctx.Done(): + // Watch context was cancelled / timed out. + } + return c.server.getRevision(), nil +} + +type testServerValue struct { + value []byte + revision int64 +} + +type TestServer struct { + t *testing.T + mu sync.Mutex + // +checklocks:mu + clients []*testClient + // +checklocks:mu + values map[string]*testServerValue + // +checklocks:mu + revision int64 +} + +func (s *TestServer) newClient() *testClient { + client := newTestClient(s) + s.addClient(client) + return client +} + +func (s *TestServer) addClient(client *testClient) { + s.mu.Lock() + defer s.mu.Unlock() + + s.clients = append(s.clients, client) +} + +func (s *TestServer) removeClient(client *testClient) { + s.mu.Lock() + defer s.mu.Unlock() + + s.clients = slices.DeleteFunc(s.clients, func(c *testClient) bool { + return c == client + }) +} + +func (s *TestServer) getRevision() int64 { + s.mu.Lock() + defer s.mu.Unlock() + + return s.revision +} + +func (s *TestServer) getValues(key string, minRevision int64, opts ...clientv3.OpOption) (keys []string, values [][]byte, revision int64) { + s.mu.Lock() + defer s.mu.Unlock() + + var op clientv3.Op + for _, o := range opts { + o(&op) + } + if op.IsOptsWithPrefix() { + for k, value := range s.values { + if minRevision > 0 && value.revision < minRevision { + continue + } + if strings.HasPrefix(k, key) { + keys = append(keys, k) + values = append(values, value.value) + } + } + } else { + if value, found := s.values[key]; found && (minRevision == 0 || value.revision >= minRevision) { + keys = append(keys, key) + values = append(values, value.value) + } + } + + revision = s.revision + return +} + +func (s *TestServer) SetValue(key string, value []byte) { + s.mu.Lock() + defer s.mu.Unlock() + + prev, found := s.values[key] + if found && bytes.Equal(prev.value, value) { + return + } + + if s.values == nil { + s.values = make(map[string]*testServerValue) + } + if prev == nil { + prev = &testServerValue{} + s.values[key] = prev + } + s.revision++ + prevValue := prev.value + prev.value = value + prev.revision = s.revision + + for _, c := range s.clients { + c.notifyUpdated(key, prevValue, value) + } +} + +func (s *TestServer) DeleteValue(key string) { + s.mu.Lock() + defer s.mu.Unlock() + + prev, found := s.values[key] + if !found { + return + } + + delete(s.values, key) + s.revision++ + + for _, c := range s.clients { + c.notifyDeleted(key, prev.value) + } +} + +func NewClientForTest(t *testing.T) (*TestServer, etcd.Client) { + t.Helper() + server := &TestServer{ + t: t, + revision: 1, + } + client := server.newClient() + t.Cleanup(func() { + assert.NoError(t, client.Close()) + }) + return server, client +} diff --git a/etcd/etcdtest/etcdtest_test.go b/etcd/etcdtest/etcdtest_test.go new file mode 100644 index 0000000..2302c73 --- /dev/null +++ b/etcd/etcdtest/etcdtest_test.go @@ -0,0 +1,319 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2025 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package etcdtest + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/strukturag/nextcloud-spreed-signaling/etcd" +) + +var ( + testTimeout = 10 * time.Second +) + +type updateEvent struct { + key string + value string + prev []byte +} + +type deleteEvent struct { + key string + prev []byte +} + +type testWatcher struct { + created chan struct{} + updated chan updateEvent + deleted chan deleteEvent +} + +func newTestWatcher() *testWatcher { + return &testWatcher{ + created: make(chan struct{}), + updated: make(chan updateEvent), + deleted: make(chan deleteEvent), + } +} + +func (w *testWatcher) EtcdWatchCreated(client etcd.Client, key string) { + close(w.created) +} + +func (w *testWatcher) EtcdKeyUpdated(client etcd.Client, key string, value []byte, prevValue []byte) { + w.updated <- updateEvent{ + key: key, + value: string(value), + prev: prevValue, + } +} + +func (w *testWatcher) EtcdKeyDeleted(client etcd.Client, key string, prevValue []byte) { + w.deleted <- deleteEvent{ + key: key, + prev: prevValue, + } +} + +type serverInterface interface { + SetValue(key string, value []byte) + DeleteValue(key string) +} + +type testClientListener struct { + called chan struct{} +} + +func (l *testClientListener) EtcdClientCreated(c etcd.Client) { + close(l.called) +} + +func testServerWatch(t *testing.T, server serverInterface, client etcd.Client) { + require := require.New(t) + assert := assert.New(t) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + assert.True(client.IsConfigured(), "should be configured") + require.NoError(client.WaitForConnection(ctx)) + + listener := &testClientListener{ + called: make(chan struct{}), + } + client.AddListener(listener) + defer client.RemoveListener(listener) + + select { + case <-listener.called: + case <-ctx.Done(): + require.NoError(ctx.Err()) + } + + watcher := newTestWatcher() + + go func() { + if _, err := client.Watch(cancelCtx, "foo", 0, watcher); err != nil { + assert.ErrorIs(err, context.Canceled) + } + }() + + select { + case <-watcher.created: + case <-ctx.Done(): + require.NoError(ctx.Err()) + } + + key := "foo" + value := "bar" + server.SetValue("foo", []byte(value)) + select { + case evt := <-watcher.updated: + assert.Equal(key, evt.key) + assert.Equal(value, evt.value) + assert.Empty(evt.prev) + case <-ctx.Done(): + require.NoError(ctx.Err()) + } + + if response, err := client.Get(ctx, "foo"); assert.NoError(err) { + assert.EqualValues(1, response.Count) + if assert.Len(response.Kvs, 1) { + assert.Equal(key, string(response.Kvs[0].Key)) + assert.Equal(value, string(response.Kvs[0].Value)) + } + } + if response, err := client.Get(ctx, "f"); assert.NoError(err) { + assert.EqualValues(0, response.Count) + assert.Empty(response.Kvs) + } + if response, err := client.Get(ctx, "f", clientv3.WithPrefix()); assert.NoError(err) { + assert.EqualValues(1, response.Count) + if assert.Len(response.Kvs, 1) { + assert.Equal(key, string(response.Kvs[0].Key)) + assert.Equal(value, string(response.Kvs[0].Value)) + } + } + + server.DeleteValue("foo") + select { + case evt := <-watcher.deleted: + assert.Equal(key, evt.key) + assert.Equal(value, string(evt.prev)) + case <-ctx.Done(): + require.NoError(ctx.Err()) + } + + select { + case evt := <-watcher.updated: + assert.Fail("unexpected update event", "got %+v", evt) + case evt := <-watcher.deleted: + assert.Fail("unexpected deleted event", "got %+v", evt) + default: + } +} + +func TestServerWatch_Mock(t *testing.T) { + t.Parallel() + + server, client := NewClientForTest(t) + testServerWatch(t, server, client) +} + +func TestServerWatch_Real(t *testing.T) { + t.Parallel() + + server := NewServerForTest(t) + client := NewEtcdClientForTest(t, server) + testServerWatch(t, server, client) +} + +func testServerWatchInitialData(t *testing.T, server serverInterface, client etcd.Client) { + require := require.New(t) + assert := assert.New(t) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + key := "foo" + value := "bar" + server.SetValue("foo", []byte(value)) + + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + watcher := newTestWatcher() + + go func() { + if _, err := client.Watch(cancelCtx, "foo", 1, watcher); err != nil { + assert.ErrorIs(err, context.Canceled) + } + }() + + select { + case <-watcher.created: + case <-ctx.Done(): + require.NoError(ctx.Err()) + } + + select { + case evt := <-watcher.updated: + assert.Equal(key, evt.key) + assert.Equal(value, evt.value) + assert.Empty(evt.prev) + case <-ctx.Done(): + require.NoError(ctx.Err()) + } + + select { + case evt := <-watcher.updated: + assert.Fail("unexpected update event", "got %+v", evt) + case evt := <-watcher.deleted: + assert.Fail("unexpected deleted event", "got %+v", evt) + default: + } +} + +func TestServerWatchInitialData_Mock(t *testing.T) { + t.Parallel() + + server, client := NewClientForTest(t) + testServerWatchInitialData(t, server, client) +} + +func TestServerWatchInitialData_Real(t *testing.T) { + t.Parallel() + + server := NewServerForTest(t) + client := NewEtcdClientForTest(t, server) + testServerWatchInitialData(t, server, client) +} + +func testServerWatchInitialOldData(t *testing.T, server serverInterface, client etcd.Client) { + require := require.New(t) + assert := assert.New(t) + + ctx, cancel := context.WithTimeout(t.Context(), testTimeout) + defer cancel() + + key := "foo" + value := "bar" + server.SetValue("foo", []byte(value)) + + response, err := client.Get(ctx, key) + require.NoError(err) + + if assert.EqualValues(1, response.Count) && assert.Len(response.Kvs, 1) { + assert.Equal(key, string(response.Kvs[0].Key)) + assert.Equal(value, string(response.Kvs[0].Value)) + } + + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + watcher := newTestWatcher() + + go func() { + if _, err := client.Watch(cancelCtx, "foo", response.Header.GetRevision()+1, watcher); err != nil { + assert.ErrorIs(err, context.Canceled) + } + }() + + select { + case <-watcher.created: + case <-ctx.Done(): + require.NoError(ctx.Err()) + } + + select { + case evt := <-watcher.updated: + assert.Fail("unexpected update event", "got %+v", evt) + case evt := <-watcher.deleted: + assert.Fail("unexpected deleted event", "got %+v", evt) + default: + } +} + +func TestServerWatchInitialOldData_Mock(t *testing.T) { + t.Parallel() + + server, client := NewClientForTest(t) + testServerWatchInitialOldData(t, server, client) +} + +func TestServerWatchInitialOldData_Real(t *testing.T) { + t.Parallel() + + server := NewServerForTest(t) + client := NewEtcdClientForTest(t, server) + testServerWatchInitialOldData(t, server, client) +} diff --git a/grpc_client.go b/grpc_client.go index 82f79c5..f6c154e 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -45,6 +45,7 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/async" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" ) @@ -466,7 +467,7 @@ type GrpcClients struct { // +checklocks:mu dnsDiscovery bool - etcdClient *EtcdClient // +checklocksignore: Only written to from constructor. + etcdClient etcd.Client // +checklocksignore: Only written to from constructor. targetPrefix string // +checklocks:mu targetInformation map[string]*GrpcTargetInformationEtcd @@ -482,7 +483,7 @@ type GrpcClients struct { closeFunc context.CancelFunc // +checklocksignore: No locking necessary. } -func NewGrpcClients(ctx context.Context, config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor, version string) (*GrpcClients, error) { +func NewGrpcClients(ctx context.Context, config *goconf.ConfigFile, etcdClient etcd.Client, dnsMonitor *DnsMonitor, version string) (*GrpcClients, error) { initializedCtx, initializedFunc := context.WithCancel(context.Background()) closeCtx, closeFunc := context.WithCancel(context.Background()) result := &GrpcClients{ @@ -823,7 +824,7 @@ func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, fromReload bool return nil } -func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { +func (c *GrpcClients) EtcdClientCreated(client etcd.Client) { go func() { if err := client.WaitForConnection(c.closeCtx); err != nil { if errors.Is(err, context.Canceled) { @@ -879,17 +880,17 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { }() } -func (c *GrpcClients) EtcdWatchCreated(client *EtcdClient, key string) { +func (c *GrpcClients) EtcdWatchCreated(client etcd.Client, key string) { } -func (c *GrpcClients) getGrpcTargets(ctx context.Context, client *EtcdClient, targetPrefix string) (*clientv3.GetResponse, error) { +func (c *GrpcClients) getGrpcTargets(ctx context.Context, client etcd.Client, targetPrefix string) (*clientv3.GetResponse, error) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() return client.Get(ctx, targetPrefix, clientv3.WithPrefix()) } -func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) { +func (c *GrpcClients) EtcdKeyUpdated(client etcd.Client, key string, data []byte, prevValue []byte) { var info GrpcTargetInformationEtcd if err := json.Unmarshal(data, &info); err != nil { c.logger.Printf("Could not decode GRPC target %s=%s: %s", key, string(data), err) @@ -938,7 +939,7 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte c.wakeupForTesting() } -func (c *GrpcClients) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { +func (c *GrpcClients) EtcdKeyDeleted(client etcd.Client, key string, prevValue []byte) { c.mu.Lock() defer c.mu.Unlock() diff --git a/grpc_client_test.go b/grpc_client_test.go index 3885aaf..4fe72a2 100644 --- a/grpc_client_test.go +++ b/grpc_client_test.go @@ -34,8 +34,9 @@ import ( "github.com/dlintw/goconf" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/server/v3/embed" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" + "github.com/strukturag/nextcloud-spreed-signaling/etcd/etcdtest" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/test" @@ -54,7 +55,7 @@ func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} { return ch } -func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient, lookup *mockDnsLookup) (*GrpcClients, *DnsMonitor) { +func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient etcd.Client, lookup *mockDnsLookup) (*GrpcClients, *DnsMonitor) { dnsMonitor := newDnsMonitorForTest(t, time.Hour, lookup) // will be updated manually logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) @@ -75,15 +76,15 @@ func NewGrpcClientsForTest(t *testing.T, addr string, lookup *mockDnsLookup) (*G return NewGrpcClientsForTestWithConfig(t, config, nil, lookup) } -func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd, lookup *mockDnsLookup) (*GrpcClients, *DnsMonitor) { +func NewGrpcClientsWithEtcdForTest(t *testing.T, embedEtcd *etcdtest.Server, lookup *mockDnsLookup) (*GrpcClients, *DnsMonitor) { config := goconf.NewConfigFile() - config.AddOption("etcd", "endpoints", etcd.Config().ListenClientUrls[0].String()) + config.AddOption("etcd", "endpoints", embedEtcd.URL().String()) config.AddOption("grpc", "targettype", "etcd") config.AddOption("grpc", "targetprefix", "/grpctargets") logger := log.NewLoggerForTest(t) - etcdClient, err := NewEtcdClient(logger, config, "") + etcdClient, err := etcd.NewClient(logger, config, "") require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, etcdClient.Close()) @@ -120,12 +121,12 @@ func Test_GrpcClients_EtcdInitial(t *testing.T) { // nolint:paralleltest _, addr1 := NewGrpcServerForTest(t) _, addr2 := NewGrpcServerForTest(t) - etcd := NewEtcdForTest(t) + embedEtcd := etcdtest.NewServerForTest(t) - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - client, _ := NewGrpcClientsWithEtcdForTest(t, etcd, nil) + client, _ := NewGrpcClientsWithEtcdForTest(t, embedEtcd, nil) ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() require.NoError(t, client.WaitForInitialized(ctx)) @@ -140,8 +141,8 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) assert := assert.New(t) - etcd := NewEtcdForTest(t) - client, _ := NewGrpcClientsWithEtcdForTest(t, etcd, nil) + embedEtcd := etcdtest.NewServerForTest(t) + client, _ := NewGrpcClientsWithEtcdForTest(t, embedEtcd, nil) ch := client.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(ctx, testTimeout) @@ -151,7 +152,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) { drainWakeupChannel(ch) _, addr1 := NewGrpcServerForTest(t) - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) waitForEvent(ctx, t, ch) if clients := client.GetClients(); assert.Len(clients, 1) { assert.Equal(addr1, clients[0].Target()) @@ -159,7 +160,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) { drainWakeupChannel(ch) _, addr2 := NewGrpcServerForTest(t) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) waitForEvent(ctx, t, ch) if clients := client.GetClients(); assert.Len(clients, 2) { assert.Equal(addr1, clients[0].Target()) @@ -167,7 +168,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) { } drainWakeupChannel(ch) - DeleteEtcdValue(etcd, "/grpctargets/one") + embedEtcd.DeleteValue("/grpctargets/one") waitForEvent(ctx, t, ch) if clients := client.GetClients(); assert.Len(clients, 1) { assert.Equal(addr2, clients[0].Target()) @@ -175,7 +176,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) { drainWakeupChannel(ch) _, addr3 := NewGrpcServerForTest(t) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr3+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr3+"\"}")) waitForEvent(ctx, t, ch) if clients := client.GetClients(); assert.Len(clients, 1) { assert.Equal(addr3, clients[0].Target()) @@ -187,8 +188,8 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) assert := assert.New(t) - etcd := NewEtcdForTest(t) - client, _ := NewGrpcClientsWithEtcdForTest(t, etcd, nil) + embedEtcd := etcdtest.NewServerForTest(t) + client, _ := NewGrpcClientsWithEtcdForTest(t, embedEtcd, nil) ch := client.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(ctx, testTimeout) @@ -198,7 +199,7 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { drainWakeupChannel(ch) _, addr1 := NewGrpcServerForTest(t) - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) waitForEvent(ctx, t, ch) if clients := client.GetClients(); assert.Len(clients, 1) { assert.Equal(addr1, clients[0].Target()) @@ -207,7 +208,7 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { drainWakeupChannel(ch) server2, addr2 := NewGrpcServerForTest(t) server2.serverId = GrpcServerId - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) waitForEvent(ctx, t, ch) client.selfCheckWaitGroup.Wait() if clients := client.GetClients(); assert.Len(clients, 1) { @@ -215,7 +216,7 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { } drainWakeupChannel(ch) - DeleteEtcdValue(etcd, "/grpctargets/two") + embedEtcd.DeleteValue("/grpctargets/two") waitForEvent(ctx, t, ch) if clients := client.GetClients(); assert.Len(clients, 1) { assert.Equal(addr1, clients[0].Target()) diff --git a/hub.go b/hub.go index 5e11afb..1db57fa 100644 --- a/hub.go +++ b/hub.go @@ -55,6 +55,7 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/async" "github.com/strukturag/nextcloud-spreed-signaling/config" "github.com/strukturag/nextcloud-spreed-signaling/container" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/talk" @@ -210,7 +211,7 @@ type Hub struct { geoipOverrides atomic.Pointer[map[*net.IPNet]string] geoipUpdating atomic.Bool - etcdClient *EtcdClient + etcdClient etcd.Client rpcServer *GrpcServer rpcClients *GrpcClients @@ -223,7 +224,7 @@ type Hub struct { blockedCandidates atomic.Pointer[container.IPList] } -func NewHub(ctx context.Context, cfg *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer, rpcClients *GrpcClients, etcdClient *EtcdClient, r *mux.Router, version string) (*Hub, error) { +func NewHub(ctx context.Context, cfg *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer, rpcClients *GrpcClients, etcdClient etcd.Client, r *mux.Router, version string) (*Hub, error) { logger := log.LoggerFromContext(ctx) hashKey, _ := config.GetStringOptionWithEnv(cfg, "sessions", "hashkey") switch len(hashKey) { diff --git a/mcu_proxy.go b/mcu_proxy.go index 0383a91..1575ba5 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -48,6 +48,7 @@ import ( "github.com/strukturag/nextcloud-spreed-signaling/api" "github.com/strukturag/nextcloud-spreed-signaling/async" "github.com/strukturag/nextcloud-spreed-signaling/config" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" ) @@ -1512,7 +1513,7 @@ type mcuProxy struct { rpcClients *GrpcClients } -func NewMcuProxy(ctx context.Context, config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *GrpcClients, dnsMonitor *DnsMonitor) (Mcu, error) { +func NewMcuProxy(ctx context.Context, config *goconf.ConfigFile, etcdClient etcd.Client, rpcClients *GrpcClients, dnsMonitor *DnsMonitor) (Mcu, error) { logger := log.LoggerFromContext(ctx) urlType, _ := config.GetString("mcu", "urltype") if urlType == "" { diff --git a/mcu_proxy_test.go b/mcu_proxy_test.go index fd7bb00..bd81f3b 100644 --- a/mcu_proxy_test.go +++ b/mcu_proxy_test.go @@ -47,9 +47,10 @@ import ( "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/server/v3/embed" "github.com/strukturag/nextcloud-spreed-signaling/api" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" + "github.com/strukturag/nextcloud-spreed-signaling/etcd/etcdtest" "github.com/strukturag/nextcloud-spreed-signaling/internal" "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/talk" @@ -846,7 +847,7 @@ func NewProxyServerForTest(t *testing.T, country string) *TestProxyServerHandler } type proxyTestOptions struct { - etcd *embed.Etcd + etcd *etcdtest.Server servers []*TestProxyServerHandler } @@ -854,7 +855,7 @@ func newMcuProxyForTestWithOptions(t *testing.T, options proxyTestOptions, idx i t.Helper() require := require.New(t) if options.etcd == nil { - options.etcd = NewEtcdForTest(t) + options.etcd = etcdtest.NewServerForTest(t) } grpcClients, dnsMonitor := NewGrpcClientsWithEtcdForTest(t, options.etcd, lookup) @@ -891,12 +892,12 @@ func newMcuProxyForTestWithOptions(t *testing.T, options proxyTestOptions, idx i cfg.AddOption("mcu", "token_key", privkeyFile) etcdConfig := goconf.NewConfigFile() - etcdConfig.AddOption("etcd", "endpoints", options.etcd.Config().ListenClientUrls[0].String()) + etcdConfig.AddOption("etcd", "endpoints", options.etcd.URL().String()) etcdConfig.AddOption("etcd", "loglevel", "error") logger := log.NewLoggerForTest(t) ctx := log.NewLoggerContext(t.Context(), logger) - etcdClient, err := NewEtcdClient(logger, etcdConfig, "") + etcdClient, err := etcd.NewClient(logger, etcdConfig, "") require.NoError(err) t.Cleanup(func() { assert.NoError(t, etcdClient.Close()) @@ -1808,7 +1809,7 @@ func (h *mockGrpcServerHub) CreateProxyToken(publisherId string) (string, error) func Test_ProxyRemotePublisher(t *testing.T) { t.Parallel() - etcd := NewEtcdForTest(t) + embedEtcd := etcdtest.NewServerForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) @@ -1818,14 +1819,14 @@ func Test_ProxyRemotePublisher(t *testing.T) { grpcServer1.hub = hub1 grpcServer2.hub = hub2 - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu1, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, server2, @@ -1833,7 +1834,7 @@ func Test_ProxyRemotePublisher(t *testing.T) { }, 1, nil) hub1.proxy.Store(mcu1) mcu2, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, server2, @@ -1887,7 +1888,7 @@ func Test_ProxyRemotePublisher(t *testing.T) { func Test_ProxyMultipleRemotePublisher(t *testing.T) { t.Parallel() - etcd := NewEtcdForTest(t) + embedEtcd := etcdtest.NewServerForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) @@ -1900,16 +1901,16 @@ func Test_ProxyMultipleRemotePublisher(t *testing.T) { grpcServer2.hub = hub2 grpcServer3.hub = hub3 - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - SetEtcdValue(etcd, "/grpctargets/three", []byte("{\"address\":\""+addr3+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/three", []byte("{\"address\":\""+addr3+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "US") server3 := NewProxyServerForTest(t, "US") mcu1, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, server2, @@ -1918,7 +1919,7 @@ func Test_ProxyMultipleRemotePublisher(t *testing.T) { }, 1, nil) hub1.proxy.Store(mcu1) mcu2, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, server2, @@ -1927,7 +1928,7 @@ func Test_ProxyMultipleRemotePublisher(t *testing.T) { }, 2, nil) hub2.proxy.Store(mcu2) mcu3, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, server2, @@ -1993,7 +1994,7 @@ func Test_ProxyMultipleRemotePublisher(t *testing.T) { func Test_ProxyRemotePublisherWait(t *testing.T) { t.Parallel() - etcd := NewEtcdForTest(t) + embedEtcd := etcdtest.NewServerForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) @@ -2003,14 +2004,14 @@ func Test_ProxyRemotePublisherWait(t *testing.T) { grpcServer1.hub = hub1 grpcServer2.hub = hub2 - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu1, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, server2, @@ -2018,7 +2019,7 @@ func Test_ProxyRemotePublisherWait(t *testing.T) { }, 1, nil) hub1.proxy.Store(mcu1) mcu2, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, server2, @@ -2088,7 +2089,7 @@ func Test_ProxyRemotePublisherWait(t *testing.T) { func Test_ProxyRemotePublisherTemporary(t *testing.T) { t.Parallel() - etcd := NewEtcdForTest(t) + embedEtcd := etcdtest.NewServerForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) @@ -2098,21 +2099,21 @@ func Test_ProxyRemotePublisherTemporary(t *testing.T) { grpcServer1.hub = hub1 grpcServer2.hub = hub2 - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") mcu1, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, }, }, 1, nil) hub1.proxy.Store(mcu1) mcu2, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server2, }, @@ -2196,7 +2197,7 @@ loop: func Test_ProxyConnectToken(t *testing.T) { t.Parallel() - etcd := NewEtcdForTest(t) + embedEtcd := etcdtest.NewServerForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) @@ -2206,8 +2207,8 @@ func Test_ProxyConnectToken(t *testing.T) { grpcServer1.hub = hub1 grpcServer2.hub = hub2 - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "DE") @@ -2216,14 +2217,14 @@ func Test_ProxyConnectToken(t *testing.T) { // i.e. they are only known to their local proxy, not the one of the other // signaling server - so the connection token must be passed between them. mcu1, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, }, }, 1, nil) hub1.proxy.Store(mcu1) mcu2, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server2, }, @@ -2276,7 +2277,7 @@ func Test_ProxyConnectToken(t *testing.T) { func Test_ProxyPublisherToken(t *testing.T) { t.Parallel() - etcd := NewEtcdForTest(t) + embedEtcd := etcdtest.NewServerForTest(t) grpcServer1, addr1 := NewGrpcServerForTest(t) grpcServer2, addr2 := NewGrpcServerForTest(t) @@ -2286,8 +2287,8 @@ func Test_ProxyPublisherToken(t *testing.T) { grpcServer1.hub = hub1 grpcServer2.hub = hub2 - SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + embedEtcd.SetValue("/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + embedEtcd.SetValue("/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) server1 := NewProxyServerForTest(t, "DE") server2 := NewProxyServerForTest(t, "US") @@ -2298,14 +2299,14 @@ func Test_ProxyPublisherToken(t *testing.T) { // Also the subscriber is connecting from a different country, so a remote // stream will be created that needs a valid token from the remote proxy. mcu1, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server1, }, }, 1, nil) hub1.proxy.Store(mcu1) mcu2, _ := newMcuProxyForTestWithOptions(t, proxyTestOptions{ - etcd: etcd, + etcd: embedEtcd, servers: []*TestProxyServerHandler{ server2, }, diff --git a/proxy/proxy_tokens_etcd.go b/proxy/proxy_tokens_etcd.go index 00a94c2..22d22d9 100644 --- a/proxy/proxy_tokens_etcd.go +++ b/proxy/proxy_tokens_etcd.go @@ -33,8 +33,8 @@ import ( "github.com/dlintw/goconf" "github.com/golang-jwt/jwt/v5" - signaling "github.com/strukturag/nextcloud-spreed-signaling" "github.com/strukturag/nextcloud-spreed-signaling/container" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" "github.com/strukturag/nextcloud-spreed-signaling/log" ) @@ -49,14 +49,14 @@ type tokenCacheEntry struct { type tokensEtcd struct { logger log.Logger - client *signaling.EtcdClient + client etcd.Client tokenFormats atomic.Value tokenCache *container.LruCache[*tokenCacheEntry] } func NewProxyTokensEtcd(logger log.Logger, config *goconf.ConfigFile) (ProxyTokens, error) { - client, err := signaling.NewEtcdClient(logger, config, "tokens") + client, err := etcd.NewClient(logger, config, "tokens") if err != nil { return nil, err } diff --git a/proxy_config_etcd.go b/proxy_config_etcd.go index 354e771..69e7b43 100644 --- a/proxy_config_etcd.go +++ b/proxy_config_etcd.go @@ -32,6 +32,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "github.com/strukturag/nextcloud-spreed-signaling/async" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" "github.com/strukturag/nextcloud-spreed-signaling/log" ) @@ -40,7 +41,7 @@ type proxyConfigEtcd struct { mu sync.Mutex proxy McuProxy // +checklocksignore: Only written to from constructor. - client *EtcdClient + client etcd.Client keyPrefix string // +checklocks:mu keyInfos map[string]*ProxyInformationEtcd @@ -51,7 +52,7 @@ type proxyConfigEtcd struct { closeFunc context.CancelFunc } -func NewProxyConfigEtcd(logger log.Logger, config *goconf.ConfigFile, etcdClient *EtcdClient, proxy McuProxy) (ProxyConfig, error) { +func NewProxyConfigEtcd(logger log.Logger, config *goconf.ConfigFile, etcdClient etcd.Client, proxy McuProxy) (ProxyConfig, error) { if !etcdClient.IsConfigured() { return nil, errors.New("no etcd endpoints configured") } @@ -100,7 +101,7 @@ func (p *proxyConfigEtcd) Stop() { p.closeFunc() } -func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) { +func (p *proxyConfigEtcd) EtcdClientCreated(client etcd.Client) { go func() { if err := client.WaitForConnection(p.closeCtx); err != nil { if errors.Is(err, context.Canceled) { @@ -159,17 +160,17 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) { }() } -func (p *proxyConfigEtcd) EtcdWatchCreated(client *EtcdClient, key string) { +func (p *proxyConfigEtcd) EtcdWatchCreated(client etcd.Client, key string) { } -func (p *proxyConfigEtcd) getProxyUrls(ctx context.Context, client *EtcdClient, keyPrefix string) (*clientv3.GetResponse, error) { +func (p *proxyConfigEtcd) getProxyUrls(ctx context.Context, client etcd.Client, keyPrefix string) (*clientv3.GetResponse, error) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() return client.Get(ctx, keyPrefix, clientv3.WithPrefix()) } -func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []byte, prevValue []byte) { +func (p *proxyConfigEtcd) EtcdKeyUpdated(client etcd.Client, key string, data []byte, prevValue []byte) { var info ProxyInformationEtcd if err := json.Unmarshal(data, &info); err != nil { p.logger.Printf("Could not decode proxy information %s: %s", string(data), err) @@ -210,7 +211,7 @@ func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data [] } } -func (p *proxyConfigEtcd) EtcdKeyDeleted(client *EtcdClient, key string, prevValue []byte) { +func (p *proxyConfigEtcd) EtcdKeyDeleted(client etcd.Client, key string, prevValue []byte) { p.mu.Lock() defer p.mu.Unlock() diff --git a/proxy_config_etcd_test.go b/proxy_config_etcd_test.go index 938ae9c..ef9a453 100644 --- a/proxy_config_etcd_test.go +++ b/proxy_config_etcd_test.go @@ -29,8 +29,8 @@ import ( "github.com/dlintw/goconf" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/server/v3/embed" + "github.com/strukturag/nextcloud-spreed-signaling/etcd/etcdtest" "github.com/strukturag/nextcloud-spreed-signaling/log" ) @@ -40,9 +40,9 @@ type TestProxyInformationEtcd struct { OtherData string `json:"otherdata,omitempty"` } -func newProxyConfigEtcd(t *testing.T, proxy McuProxy) (*embed.Etcd, ProxyConfig) { +func newProxyConfigEtcd(t *testing.T, proxy McuProxy) (*etcdtest.TestServer, ProxyConfig) { t.Helper() - etcd, client := NewEtcdClientForTest(t) + embedEtcd, client := etcdtest.NewClientForTest(t) cfg := goconf.NewConfigFile() cfg.AddOption("mcu", "keyprefix", "proxies/") logger := log.NewLoggerForTest(t) @@ -51,24 +51,24 @@ func newProxyConfigEtcd(t *testing.T, proxy McuProxy) (*embed.Etcd, ProxyConfig) t.Cleanup(func() { p.Stop() }) - return etcd, p + return embedEtcd, p } -func SetEtcdProxy(t *testing.T, etcd *embed.Etcd, path string, proxy *TestProxyInformationEtcd) { +func SetEtcdProxy(t *testing.T, server *etcdtest.TestServer, path string, proxy *TestProxyInformationEtcd) { t.Helper() data, _ := json.Marshal(proxy) - SetEtcdValue(etcd, path, data) + server.SetValue(path, data) } func TestProxyConfigEtcd(t *testing.T) { t.Parallel() proxy := newMcuProxyForConfig(t) - etcd, config := newProxyConfigEtcd(t, proxy) + embedEtcd, config := newProxyConfigEtcd(t, proxy) ctx, cancel := context.WithTimeout(t.Context(), time.Second) defer cancel() - SetEtcdProxy(t, etcd, "proxies/a", &TestProxyInformationEtcd{ + SetEtcdProxy(t, embedEtcd, "proxies/a", &TestProxyInformationEtcd{ Address: "https://foo/", }) proxy.Expect("add", "https://foo/") @@ -76,31 +76,31 @@ func TestProxyConfigEtcd(t *testing.T) { proxy.WaitForEvents(ctx) proxy.Expect("add", "https://bar/") - SetEtcdProxy(t, etcd, "proxies/b", &TestProxyInformationEtcd{ + SetEtcdProxy(t, embedEtcd, "proxies/b", &TestProxyInformationEtcd{ Address: "https://bar/", }) proxy.WaitForEvents(ctx) proxy.Expect("keep", "https://bar/") - SetEtcdProxy(t, etcd, "proxies/b", &TestProxyInformationEtcd{ + SetEtcdProxy(t, embedEtcd, "proxies/b", &TestProxyInformationEtcd{ Address: "https://bar/", OtherData: "ignore-me", }) proxy.WaitForEvents(ctx) proxy.Expect("remove", "https://foo/") - DeleteEtcdValue(etcd, "proxies/a") + embedEtcd.DeleteValue("proxies/a") proxy.WaitForEvents(ctx) proxy.Expect("remove", "https://bar/") proxy.Expect("add", "https://baz/") - SetEtcdProxy(t, etcd, "proxies/b", &TestProxyInformationEtcd{ + SetEtcdProxy(t, embedEtcd, "proxies/b", &TestProxyInformationEtcd{ Address: "https://baz/", }) proxy.WaitForEvents(ctx) // Adding the same hostname multiple times should not trigger an event. - SetEtcdProxy(t, etcd, "proxies/c", &TestProxyInformationEtcd{ + SetEtcdProxy(t, embedEtcd, "proxies/c", &TestProxyInformationEtcd{ Address: "https://baz/", }) time.Sleep(100 * time.Millisecond) diff --git a/server/main.go b/server/main.go index 022ef7c..e1e8a33 100644 --- a/server/main.go +++ b/server/main.go @@ -43,6 +43,7 @@ import ( signaling "github.com/strukturag/nextcloud-spreed-signaling" "github.com/strukturag/nextcloud-spreed-signaling/config" + "github.com/strukturag/nextcloud-spreed-signaling/etcd" "github.com/strukturag/nextcloud-spreed-signaling/internal" signalinglog "github.com/strukturag/nextcloud-spreed-signaling/log" "github.com/strukturag/nextcloud-spreed-signaling/nats" @@ -204,7 +205,7 @@ func main() { } defer dnsMonitor.Stop() - etcdClient, err := signaling.NewEtcdClient(logger, cfg, "mcu") + etcdClient, err := etcd.NewClient(logger, cfg, "mcu") if err != nil { logger.Fatalf("Could not create etcd client: %s", err) }