From 25dabf910d39c70abcc5b1c018db6c7e70f7e3a3 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 28 Jun 2022 13:55:24 +0200 Subject: [PATCH] Allow configuring GRPC targets through etcd. --- Makefile | 1 + api_grpc.go | 41 ++++++++ grpc_client.go | 231 +++++++++++++++++++++++++++++++++++++++++++- grpc_client_test.go | 185 +++++++++++++++++++++++++++++++++++ grpc_server_test.go | 55 +++++++++++ server.conf.in | 27 +++++- server/main.go | 2 +- 7 files changed, 534 insertions(+), 8 deletions(-) create mode 100644 api_grpc.go create mode 100644 grpc_client_test.go create mode 100644 grpc_server_test.go diff --git a/Makefile b/Makefile index 8cd998f..b951b1f 100644 --- a/Makefile +++ b/Makefile @@ -114,6 +114,7 @@ common: common_easyjson common_proto common_easyjson: \ api_async_easyjson.go \ api_backend_easyjson.go \ + api_grpc_easyjson.go \ api_proxy_easyjson.go \ api_signaling_easyjson.go diff --git a/api_grpc.go b/api_grpc.go new file mode 100644 index 0000000..127e880 --- /dev/null +++ b/api_grpc.go @@ -0,0 +1,41 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "fmt" +) + +// Information on a GRPC target in the etcd cluster. + +type GrpcTargetInformationEtcd struct { + Address string `json:"address"` +} + +func (p *GrpcTargetInformationEtcd) CheckValid() error { + if l := len(p.Address); l == 0 { + return fmt.Errorf("address missing") + } else if p.Address[l-1] == '/' { + p.Address = p.Address[:l-1] + } + return nil +} diff --git a/grpc_client.go b/grpc_client.go index 49190f4..d46092f 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -23,13 +23,17 @@ package signaling import ( "context" + "encoding/json" "fmt" "log" "net" "strings" "sync" + "sync/atomic" + "time" "github.com/dlintw/goconf" + clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" codes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -37,6 +41,13 @@ import ( status "google.golang.org/grpc/status" ) +const ( + GrpcTargetTypeStatic = "static" + GrpcTargetTypeEtcd = "etcd" + + DefaultGrpcTargetType = GrpcTargetTypeStatic +) + func init() { RegisterGrpcClientStats() } @@ -140,10 +151,25 @@ type GrpcClients struct { clientsMap map[string]*GrpcClient clients []*GrpcClient + + etcdClient *EtcdClient + targetPrefix string + targetSelf string + targetInformation map[string]*GrpcTargetInformationEtcd + dialOptions atomic.Value // []grpc.DialOption + + initializedCtx context.Context + initializedFunc context.CancelFunc + wakeupChanForTesting chan bool } -func NewGrpcClients(config *goconf.ConfigFile) (*GrpcClients, error) { - result := &GrpcClients{} +func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, error) { + initializedCtx, initializedFunc := context.WithCancel(context.Background()) + result := &GrpcClients{ + etcdClient: etcdClient, + initializedCtx: initializedCtx, + initializedFunc: initializedFunc, + } if err := result.load(config); err != nil { return nil, err } @@ -151,9 +177,6 @@ func NewGrpcClients(config *goconf.ConfigFile) (*GrpcClients, error) { } func (c *GrpcClients) load(config *goconf.ConfigFile) error { - c.mu.Lock() - defer c.mu.Unlock() - var opts []grpc.DialOption caFile, _ := config.GetString("grpc", "ca") if caFile != "" { @@ -168,6 +191,25 @@ func (c *GrpcClients) load(config *goconf.ConfigFile) error { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } + targetType, _ := config.GetString("grpc", "targettype") + if targetType == "" { + targetType = DefaultGrpcTargetType + } + + switch targetType { + case GrpcTargetTypeStatic: + return c.loadTargetsStatic(config, opts...) + case GrpcTargetTypeEtcd: + return c.loadTargetsEtcd(config, opts...) + default: + return fmt.Errorf("unknown GRPC target type: %s", targetType) + } +} + +func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, opts ...grpc.DialOption) error { + c.mu.Lock() + defer c.mu.Unlock() + clientsMap := make(map[string]*GrpcClient) var clients []*GrpcClient removeTargets := make(map[string]bool, len(c.clientsMap)) @@ -216,10 +258,185 @@ func (c *GrpcClients) load(config *goconf.ConfigFile) error { c.clients = clients c.clientsMap = clientsMap + c.initializedFunc() statsGrpcClients.Set(float64(len(clients))) return nil } +func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, opts ...grpc.DialOption) error { + if !c.etcdClient.IsConfigured() { + return fmt.Errorf("No etcd endpoints configured") + } + + targetPrefix, _ := config.GetString("grpc", "targetprefix") + if targetPrefix == "" { + return fmt.Errorf("No GRPC target prefix configured") + } + c.targetPrefix = targetPrefix + if c.targetInformation == nil { + c.targetInformation = make(map[string]*GrpcTargetInformationEtcd) + } + + targetSelf, _ := config.GetString("grpc", "targetself") + c.targetSelf = targetSelf + + if opts == nil { + opts = make([]grpc.DialOption, 0) + } + c.dialOptions.Store(opts) + + c.etcdClient.AddListener(c) + return nil +} + +func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { + go func() { + if err := client.Watch(context.Background(), c.targetPrefix, c, clientv3.WithPrefix()); err != nil { + log.Printf("Error processing watch for %s: %s", c.targetPrefix, err) + } + }() + + go func() { + client.WaitForConnection() + + waitDelay := initialWaitDelay + for { + response, err := c.getGrpcTargets(client, c.targetPrefix) + if err != nil { + if err == context.DeadlineExceeded { + log.Printf("Timeout getting initial list of GRPC targets, retry in %s", waitDelay) + } else { + log.Printf("Could not get initial list of GRPC targets, retry in %s: %s", waitDelay, err) + } + + time.Sleep(waitDelay) + waitDelay = waitDelay * 2 + if waitDelay > maxWaitDelay { + waitDelay = maxWaitDelay + } + continue + } + + for _, ev := range response.Kvs { + c.EtcdKeyUpdated(client, string(ev.Key), ev.Value) + } + c.initializedFunc() + return + } + }() +} + +func (c *GrpcClients) getGrpcTargets(client *EtcdClient, targetPrefix string) (*clientv3.GetResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + return client.Get(ctx, targetPrefix, clientv3.WithPrefix()) +} + +func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) { + var info GrpcTargetInformationEtcd + if err := json.Unmarshal(data, &info); err != nil { + log.Printf("Could not decode GRPC target %s=%s: %s", key, string(data), err) + return + } + if err := info.CheckValid(); err != nil { + log.Printf("Received invalid GRPC target %s=%s: %s", key, string(data), err) + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + prev, found := c.targetInformation[key] + if found && prev.Address != info.Address { + // Address of endpoint has changed, remove old one. + c.removeEtcdClientLocked(key) + } + + if c.targetSelf != "" && info.Address == c.targetSelf { + log.Printf("GRPC target %s is this server, ignoring %s", info.Address, key) + c.wakeupForTesting() + return + } + + if _, found := c.clientsMap[info.Address]; found { + log.Printf("GRPC target %s already exists, ignoring %s", info.Address, key) + return + } + + opts := c.dialOptions.Load().([]grpc.DialOption) + cl, err := NewGrpcClient(info.Address, opts...) + if err != nil { + log.Printf("Could not create GRPC client for target %s: %s", info.Address, err) + return + } + + log.Printf("Adding %s as GRPC target", info.Address) + + if c.clientsMap == nil { + c.clientsMap = make(map[string]*GrpcClient) + } + c.clientsMap[info.Address] = cl + c.clients = append(c.clients, cl) + c.targetInformation[key] = &info + statsGrpcClients.Inc() + c.wakeupForTesting() +} + +func (c *GrpcClients) EtcdKeyDeleted(client *EtcdClient, key string) { + c.mu.Lock() + defer c.mu.Unlock() + + c.removeEtcdClientLocked(key) +} + +func (c *GrpcClients) removeEtcdClientLocked(key string) { + info, found := c.targetInformation[key] + if !found { + log.Printf("No connection found for %s, ignoring", key) + c.wakeupForTesting() + return + } + + delete(c.targetInformation, key) + client, found := c.clientsMap[info.Address] + if !found { + return + } + + log.Printf("Removing connection to %s (from %s)", info.Address, key) + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", client.Target(), err) + } + delete(c.clientsMap, info.Address) + c.clients = make([]*GrpcClient, 0, len(c.clientsMap)) + for _, client := range c.clientsMap { + c.clients = append(c.clients, client) + } + statsGrpcClients.Dec() + c.wakeupForTesting() +} + +func (c *GrpcClients) WaitForInitialized(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.initializedCtx.Done(): + return nil + } +} + +func (c *GrpcClients) wakeupForTesting() { + if c.wakeupChanForTesting == nil { + return + } + + select { + case c.wakeupChanForTesting <- true: + default: + } +} + func (c *GrpcClients) Reload(config *goconf.ConfigFile) { if err := c.load(config); err != nil { log.Printf("Could not reload RPC clients: %s", err) @@ -238,6 +455,10 @@ func (c *GrpcClients) Close() { c.clients = nil c.clientsMap = nil + + if c.etcdClient != nil { + c.etcdClient.RemoveListener(c) + } } func (c *GrpcClients) GetClients() []*GrpcClient { diff --git a/grpc_client_test.go b/grpc_client_test.go new file mode 100644 index 0000000..692297a --- /dev/null +++ b/grpc_client_test.go @@ -0,0 +1,185 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "testing" + "time" + + "github.com/dlintw/goconf" + "go.etcd.io/etcd/server/v3/embed" +) + +const ( + GrpcSelfTargetForTesting = "testing.grpc.target" +) + +func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients { + config := goconf.NewConfigFile() + config.AddOption("etcd", "endpoints", etcd.Config().LCUrls[0].String()) + + config.AddOption("grpc", "targettype", "etcd") + config.AddOption("grpc", "targetprefix", "/grpctargets") + config.AddOption("grpc", "targetself", GrpcSelfTargetForTesting) + + etcdClient, err := NewEtcdClient(config, "") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := etcdClient.Close(); err != nil { + t.Error(err) + } + }) + + client, err := NewGrpcClients(config, etcdClient) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + client.Close() + }) + + return client +} + +func drainWakeupChannel(ch chan bool) { + for { + select { + case <-ch: + default: + return + } + } +} + +func Test_GrpcClients_EtcdInitial(t *testing.T) { + etcd := NewEtcdForTest(t) + + _, addr1 := NewGrpcServerForTest(t) + SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + _, addr2 := NewGrpcServerForTest(t) + SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + + client := NewGrpcClientsWithEtcdForTest(t, etcd) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := client.WaitForInitialized(ctx); err != nil { + t.Fatal(err) + } + + if clients := client.GetClients(); len(clients) != 2 { + t.Errorf("Expected two clients, got %+v", clients) + } +} + +func Test_GrpcClients_EtcdUpdate(t *testing.T) { + etcd := NewEtcdForTest(t) + client := NewGrpcClientsWithEtcdForTest(t, etcd) + ch := make(chan bool, 1) + client.wakeupChanForTesting = ch + + if clients := client.GetClients(); len(clients) != 0 { + t.Errorf("Expected no clients, got %+v", clients) + } + + drainWakeupChannel(ch) + _, addr1 := NewGrpcServerForTest(t) + SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + <-ch + if clients := client.GetClients(); len(clients) != 1 { + t.Errorf("Expected one client, got %+v", clients) + } else if clients[0].Target() != addr1 { + t.Errorf("Expected target %s, got %s", addr1, clients[0].Target()) + } + + drainWakeupChannel(ch) + _, addr2 := NewGrpcServerForTest(t) + SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) + <-ch + if clients := client.GetClients(); len(clients) != 2 { + t.Errorf("Expected two clients, got %+v", clients) + } else if clients[0].Target() != addr1 { + t.Errorf("Expected target %s, got %s", addr1, clients[0].Target()) + } else if clients[1].Target() != addr2 { + t.Errorf("Expected target %s, got %s", addr2, clients[1].Target()) + } + + drainWakeupChannel(ch) + DeleteEtcdValue(etcd, "/grpctargets/one") + <-ch + if clients := client.GetClients(); len(clients) != 1 { + t.Errorf("Expected one client, got %+v", clients) + } else if clients[0].Target() != addr2 { + t.Errorf("Expected target %s, got %s", addr2, clients[0].Target()) + } + + drainWakeupChannel(ch) + _, addr3 := NewGrpcServerForTest(t) + SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr3+"\"}")) + <-ch + if clients := client.GetClients(); len(clients) != 1 { + t.Errorf("Expected one client, got %+v", clients) + } else if clients[0].Target() != addr3 { + t.Errorf("Expected target %s, got %s", addr3, clients[0].Target()) + } +} + +func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { + etcd := NewEtcdForTest(t) + client := NewGrpcClientsWithEtcdForTest(t, etcd) + ch := make(chan bool, 1) + client.wakeupChanForTesting = ch + + if clients := client.GetClients(); len(clients) != 0 { + t.Errorf("Expected no clients, got %+v", clients) + } + + drainWakeupChannel(ch) + _, addr1 := NewGrpcServerForTest(t) + SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) + <-ch + if clients := client.GetClients(); len(clients) != 1 { + t.Errorf("Expected one client, got %+v", clients) + } else if clients[0].Target() != addr1 { + t.Errorf("Expected target %s, got %s", addr1, clients[0].Target()) + } + + drainWakeupChannel(ch) + SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+GrpcSelfTargetForTesting+"\"}")) + <-ch + if clients := client.GetClients(); len(clients) != 1 { + t.Errorf("Expected one client, got %+v", clients) + } else if clients[0].Target() != addr1 { + t.Errorf("Expected target %s, got %s", addr1, clients[0].Target()) + } + + drainWakeupChannel(ch) + DeleteEtcdValue(etcd, "/grpctargets/two") + <-ch + if clients := client.GetClients(); len(clients) != 1 { + t.Errorf("Expected one client, got %+v", clients) + } else if clients[0].Target() != addr1 { + t.Errorf("Expected target %s, got %s", addr1, clients[0].Target()) + } +} diff --git a/grpc_server_test.go b/grpc_server_test.go new file mode 100644 index 0000000..61b78e8 --- /dev/null +++ b/grpc_server_test.go @@ -0,0 +1,55 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "net" + "strconv" + "testing" + + "github.com/dlintw/goconf" +) + +func NewGrpcServerForTest(t *testing.T) (server *GrpcServer, addr string) { + config := goconf.NewConfigFile() + for port := 50000; port < 50100; port++ { + addr = net.JoinHostPort("127.0.0.1", strconv.Itoa(port)) + config.AddOption("grpc", "listen", addr) + var err error + server, err = NewGrpcServer(config) + if isErrorAddressAlreadyInUse(err) { + continue + } else if err != nil { + t.Fatal(err) + } + break + } + + if server == nil { + t.Fatal("could not find free port") + } + + t.Cleanup(func() { + server.Close() + }) + return server, addr +} diff --git a/server.conf.in b/server.conf.in index d6a88ce..12f8cde 100644 --- a/server.conf.in +++ b/server.conf.in @@ -249,5 +249,28 @@ connectionsperhost = 8 # Omit to expect unencrypted connections. #ca = /path/to/grpc-ca.crt -# Comma-separated list of GRPC targets to connect to for clustering mode. -#targets = 192.168.0.1:9090, 192.168.0.1:9091 +# Type of GRPC target configuration. +# Defaults to "static". +# +# Possible values: +# - static: A comma-separated list of targets is given in the "targets" option. +# - etcd: Target URLs are retrieved from an etcd cluster. +#targettype = static + +# For target type "static": Comma-separated list of GRPC targets to connect to +# for clustering mode. +#targets = 192.168.0.1:9090, 192.168.0.2:9090 + +# For target type "etcd": Key prefix of GRPC target entries. All keys below will +# be watched and assumed to contain a JSON document. The entry "address" from +# this document will be used as target URL, other contents in the document will +# be ignored. +# +# Example: +# "/signaling/cluster/grpc/one" -> {"address": "192.168.0.1:9090"} +# "/signaling/cluster/grpc/two" -> {"address": "192.168.0.2:9090"} +#targetprefix = /signaling/cluster/grpc + +# For target type "etcd": Address of this signaling server instance. Will be +# ignored when retrieved from the etcd cluster to avoid loopback connections. +#targetself = 192.168.0.1:9090 diff --git a/server/main.go b/server/main.go index ec9d84f..3e39a6a 100644 --- a/server/main.go +++ b/server/main.go @@ -164,7 +164,7 @@ func main() { } }() - rpcClients, err := signaling.NewGrpcClients(config) + rpcClients, err := signaling.NewGrpcClients(config, etcdClient) if err != nil { log.Fatalf("Could not create RPC clients: %s", err) }