From 20cc51c2fe811fc0f440164b86f37d066b98f656 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 29 Jun 2022 13:33:02 +0200 Subject: [PATCH] grpc: Automatically detect if a target is the current server itself. This allows configuring the same list of targets for all instances without having to setup the "own" address differently for each server. --- Makefile | 1 + backend_server_test.go | 20 +++-------------- grpc_client.go | 49 +++++++++++++++++++++++++++++++++--------- grpc_client_test.go | 14 ++++++------ grpc_internal.proto | 37 +++++++++++++++++++++++++++++++ grpc_server.go | 26 ++++++++++++++++++++++ grpc_server_test.go | 9 ++++++++ hub.go | 17 ++++----------- hub_test.go | 20 +++-------------- server.conf.in | 4 ---- server/main.go | 13 ++++++++++- 11 files changed, 140 insertions(+), 70 deletions(-) create mode 100644 grpc_internal.proto diff --git a/Makefile b/Makefile index b951b1f..27c9d48 100644 --- a/Makefile +++ b/Makefile @@ -119,6 +119,7 @@ common_easyjson: \ api_signaling_easyjson.go common_proto: \ + grpc_internal.pb.go \ grpc_mcu.pb.go \ grpc_sessions.pb.go diff --git a/backend_server_test.go b/backend_server_test.go index ff74b71..40416f0 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -88,7 +88,7 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil config.AddOption("clients", "internalsecret", string(testInternalSecret)) config.AddOption("geoip", "url", "none") events := getAsyncEventsForTest(t) - hub, err := NewHub(config, events, nil, r, "no-version") + hub, err := NewHub(config, events, nil, nil, r, "no-version") if err != nil { t.Fatal(err) } @@ -162,7 +162,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g events1.Close() }) client1 := NewGrpcClientsForTest(t, addr2) - hub1, err := NewHub(config1, events1, client1, r1, "no-version") + hub1, err := NewHub(config1, events1, grpcServer1, client1, r1, "no-version") if err != nil { t.Fatal(err) } @@ -191,7 +191,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g events2.Close() }) client2 := NewGrpcClientsForTest(t, addr1) - hub2, err := NewHub(config2, events2, client2, r2, "no-version") + hub2, err := NewHub(config2, events2, grpcServer2, client2, r2, "no-version") if err != nil { t.Fatal(err) } @@ -211,20 +211,6 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g t.Fatal(err) } - grpcServer1.hub = hub1 - grpcServer2.hub = hub2 - - go func() { - if err := grpcServer1.Run(); err != nil { - t.Errorf("Could not start RPC server on %s: %s", addr1, err) - } - }() - go func() { - if err := grpcServer2.Run(); err != nil { - t.Errorf("Could not start RPC server on %s: %s", addr2, err) - } - }() - go hub1.Run() go hub2.Run() diff --git a/grpc_client.go b/grpc_client.go index d46092f..d50b4ab 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -53,12 +53,14 @@ func init() { } type grpcClientImpl struct { + RpcInternalClient RpcMcuClient RpcSessionsClient } func newGrpcClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl { return &grpcClientImpl{ + RpcInternalClient: NewRpcInternalClient(conn), RpcMcuClient: NewRpcMcuClient(conn), RpcSessionsClient: NewRpcSessionsClient(conn), } @@ -90,6 +92,16 @@ func (c *GrpcClient) Close() error { return c.conn.Close() } +func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) { + statsGrpcClientCalls.WithLabelValues("GetServerId").Inc() + response, err := c.impl.GetServerId(ctx, &GetServerIdRequest{}, grpc.WaitForReady(true)) + if err != nil { + return "", err + } + + return response.GetServerId(), nil +} + func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) { statsGrpcClientCalls.WithLabelValues("LookupSessionId").Inc() // TODO: Remove debug logging @@ -154,7 +166,6 @@ type GrpcClients struct { etcdClient *EtcdClient targetPrefix string - targetSelf string targetInformation map[string]*GrpcTargetInformationEtcd dialOptions atomic.Value // []grpc.DialOption @@ -241,6 +252,19 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, opts ...grpc. return err } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if id, err := client.GetServerId(ctx); err != nil { + log.Printf("Error checking server id of %s: %s", client.Target(), err) + } else if id == GrpcServerId { + log.Printf("GRPC target %s is this server, ignoring", client.Target()) + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", client.Target(), err) + } + continue + } + log.Printf("Adding %s as GRPC target", target) clientsMap[target] = client clients = append(clients, client) @@ -277,9 +301,6 @@ func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, opts ...grpc.Di c.targetInformation = make(map[string]*GrpcTargetInformationEtcd) } - targetSelf, _ := config.GetString("grpc", "targetself") - c.targetSelf = targetSelf - if opts == nil { opts = make([]grpc.DialOption, 0) } @@ -353,12 +374,6 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte c.removeEtcdClientLocked(key) } - if c.targetSelf != "" && info.Address == c.targetSelf { - log.Printf("GRPC target %s is this server, ignoring %s", info.Address, key) - c.wakeupForTesting() - return - } - if _, found := c.clientsMap[info.Address]; found { log.Printf("GRPC target %s already exists, ignoring %s", info.Address, key) return @@ -371,6 +386,20 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte return } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if id, err := cl.GetServerId(ctx); err != nil { + log.Printf("Error checking server id of %s: %s", cl.Target(), err) + } else if id == GrpcServerId { + log.Printf("GRPC target %s is this server, ignoring %s", cl.Target(), key) + if err := cl.Close(); err != nil { + log.Printf("Error closing client to %s: %s", cl.Target(), err) + } + c.wakeupForTesting() + return + } + log.Printf("Adding %s as GRPC target", info.Address) if c.clientsMap == nil { diff --git a/grpc_client_test.go b/grpc_client_test.go index 51bebf5..a2fbb16 100644 --- a/grpc_client_test.go +++ b/grpc_client_test.go @@ -30,10 +30,6 @@ import ( "go.etcd.io/etcd/server/v3/embed" ) -const ( - GrpcSelfTargetForTesting = "testing.grpc.target" -) - func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients { config := goconf.NewConfigFile() config.AddOption("grpc", "targets", addr) @@ -55,7 +51,6 @@ func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients config.AddOption("grpc", "targettype", "etcd") config.AddOption("grpc", "targetprefix", "/grpctargets") - config.AddOption("grpc", "targetself", GrpcSelfTargetForTesting) etcdClient, err := NewEtcdClient(config, "") if err != nil { @@ -89,11 +84,12 @@ func drainWakeupChannel(ch chan bool) { } func Test_GrpcClients_EtcdInitial(t *testing.T) { + _, addr1 := NewGrpcServerForTest(t) + _, addr2 := NewGrpcServerForTest(t) + etcd := NewEtcdForTest(t) - _, addr1 := NewGrpcServerForTest(t) SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) - _, addr2 := NewGrpcServerForTest(t) SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) client := NewGrpcClientsWithEtcdForTest(t, etcd) @@ -181,7 +177,9 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { } drainWakeupChannel(ch) - SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+GrpcSelfTargetForTesting+"\"}")) + server2, addr2 := NewGrpcServerForTest(t) + server2.serverId = GrpcServerId + SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) <-ch if clients := client.GetClients(); len(clients) != 1 { t.Errorf("Expected one client, got %+v", clients) diff --git a/grpc_internal.proto b/grpc_internal.proto new file mode 100644 index 0000000..6a6978a --- /dev/null +++ b/grpc_internal.proto @@ -0,0 +1,37 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +syntax = "proto3"; + +option go_package = "github.com/strukturag/nextcloud-spreed-signaling;signaling"; + +package signaling; + +service RpcInternal { + rpc GetServerId(GetServerIdRequest) returns (GetServerIdReply) {} +} + +message GetServerIdRequest { +} + +message GetServerIdReply { + string serverId = 1; +} diff --git a/grpc_server.go b/grpc_server.go index d2ba2fb..03c9a34 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -23,10 +23,13 @@ package signaling import ( "context" + "crypto/sha256" + "encoding/hex" "errors" "fmt" "log" "net" + "os" "github.com/dlintw/goconf" "google.golang.org/grpc" @@ -35,16 +38,30 @@ import ( status "google.golang.org/grpc/status" ) +var ( + GrpcServerId string +) + func init() { RegisterGrpcServerStats() + + hostname, err := os.Hostname() + if err != nil { + hostname = newRandomString(8) + } + md := sha256.New() + md.Write([]byte(fmt.Sprintf("%s-%s-%d", newRandomString(32), hostname, os.Getpid()))) + GrpcServerId = hex.EncodeToString(md.Sum(nil)) } type GrpcServer struct { + UnimplementedRpcInternalServer UnimplementedRpcMcuServer UnimplementedRpcSessionsServer conn *grpc.Server listener net.Listener + serverId string // can be overwritten from tests hub *Hub } @@ -77,7 +94,9 @@ func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) { result := &GrpcServer{ conn: conn, listener: listener, + serverId: GrpcServerId, } + RegisterRpcInternalServer(conn, result) RegisterRpcSessionsServer(conn, result) RegisterRpcMcuServer(conn, result) return result, nil @@ -160,3 +179,10 @@ func (s *GrpcServer) GetPublisherId(ctx context.Context, request *GetPublisherId return nil, status.Error(codes.NotFound, "no such publisher") } + +func (s *GrpcServer) GetServerId(ctx context.Context, request *GetServerIdRequest) (*GetServerIdReply, error) { + statsGrpcServerCalls.WithLabelValues("GetServerId").Inc() + return &GetServerIdReply{ + ServerId: s.serverId, + }, nil +} diff --git a/grpc_server_test.go b/grpc_server_test.go index 61b78e8..7292ffc 100644 --- a/grpc_server_test.go +++ b/grpc_server_test.go @@ -48,6 +48,15 @@ func NewGrpcServerForTest(t *testing.T) (server *GrpcServer, addr string) { t.Fatal("could not find free port") } + // Don't match with own server id by default. + server.serverId = "dont-match" + + go func() { + if err := server.Run(); err != nil { + t.Errorf("could not start GRPC server: %s", err) + } + }() + t.Cleanup(func() { server.Close() }) diff --git a/hub.go b/hub.go index 4dd62ee..9edf83e 100644 --- a/hub.go +++ b/hub.go @@ -154,7 +154,7 @@ type Hub struct { rpcClients *GrpcClients } -func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcClients *GrpcClients, r *mux.Router, version string) (*Hub, error) { +func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer, rpcClients *GrpcClients, r *mux.Router, version string) (*Hub, error) { hashKey, _ := config.GetString("sessions", "hashkey") switch len(hashKey) { case 32: @@ -214,11 +214,6 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcClients *GrpcClien decodeCaches = append(decodeCaches, NewLruCache(decodeCacheSize)) } - rpcServer, err := NewGrpcServer(config) - if err != nil { - return nil, err - } - roomSessions, err := NewBuiltinRoomSessions(rpcClients) if err != nil { return nil, err @@ -352,7 +347,9 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcClients *GrpcClien rpcClients: rpcClients, } backend.hub = hub - rpcServer.hub = hub + if rpcServer != nil { + rpcServer.hub = hub + } hub.upgrader.CheckOrigin = hub.checkOrigin r.HandleFunc("/spreed", func(w http.ResponseWriter, r *http.Request) { hub.serveWs(w, r) @@ -450,12 +447,6 @@ func (h *Hub) Run() { go h.updateGeoDatabase() h.roomPing.Start() defer h.roomPing.Stop() - go func() { - if err := h.rpcServer.Run(); err != nil { - log.Fatalf("Could not start RPC server: %s", err) - } - }() - defer h.rpcServer.Close() housekeeping := time.NewTicker(housekeepingInterval) geoipUpdater := time.NewTicker(24 * time.Hour) diff --git a/hub_test.go b/hub_test.go index 3bc4db4..cb92289 100644 --- a/hub_test.go +++ b/hub_test.go @@ -122,7 +122,7 @@ func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Serve if err != nil { t.Fatal(err) } - h, err := NewHub(config, events, nil, r, "no-version") + h, err := NewHub(config, events, nil, nil, r, "no-version") if err != nil { t.Fatal(err) } @@ -190,7 +190,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http t.Fatal(err) } client1 := NewGrpcClientsForTest(t, addr2) - h1, err := NewHub(config1, events1, client1, r1, "no-version") + h1, err := NewHub(config1, events1, grpcServer1, client1, r1, "no-version") if err != nil { t.Fatal(err) } @@ -210,7 +210,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http t.Fatal(err) } client2 := NewGrpcClientsForTest(t, addr1) - h2, err := NewHub(config2, events2, client2, r2, "no-version") + h2, err := NewHub(config2, events2, grpcServer2, client2, r2, "no-version") if err != nil { t.Fatal(err) } @@ -225,20 +225,6 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http t.Fatal(err) } - grpcServer1.hub = h1 - grpcServer2.hub = h2 - - go func() { - if err := grpcServer1.Run(); err != nil { - t.Errorf("Could not start RPC server on %s: %s", addr1, err) - } - }() - go func() { - if err := grpcServer2.Run(); err != nil { - t.Errorf("Could not start RPC server on %s: %s", addr2, err) - } - }() - go h1.Run() go h2.Run() diff --git a/server.conf.in b/server.conf.in index 12f8cde..bb6826a 100644 --- a/server.conf.in +++ b/server.conf.in @@ -270,7 +270,3 @@ connectionsperhost = 8 # "/signaling/cluster/grpc/one" -> {"address": "192.168.0.1:9090"} # "/signaling/cluster/grpc/two" -> {"address": "192.168.0.2:9090"} #targetprefix = /signaling/cluster/grpc - -# For target type "etcd": Address of this signaling server instance. Will be -# ignored when retrieved from the etcd cluster to avoid loopback connections. -#targetself = 192.168.0.1:9090 diff --git a/server/main.go b/server/main.go index 3e39a6a..843ee48 100644 --- a/server/main.go +++ b/server/main.go @@ -164,6 +164,17 @@ func main() { } }() + rpcServer, err := signaling.NewGrpcServer(config) + if err != nil { + log.Fatalf("Could not create RPC server: %s", err) + } + go func() { + if err := rpcServer.Run(); err != nil { + log.Fatalf("Could not start RPC server: %s", err) + } + }() + defer rpcServer.Close() + rpcClients, err := signaling.NewGrpcClients(config, etcdClient) if err != nil { log.Fatalf("Could not create RPC clients: %s", err) @@ -171,7 +182,7 @@ func main() { defer rpcClients.Close() r := mux.NewRouter() - hub, err := signaling.NewHub(config, events, rpcClients, r, version) + hub, err := signaling.NewHub(config, events, rpcServer, rpcClients, r, version) if err != nil { log.Fatal("Could not create hub: ", err) }