From aaaf8cc743eaa315f25cf4a033fc76ba9eb51c14 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 16 Jan 2025 09:37:40 +0100 Subject: [PATCH] Check version of cluster nodes and log warning if different. --- grpc_client.go | 24 ++++++++++++++---------- grpc_client_test.go | 4 ++-- grpc_internal.proto | 1 + grpc_server.go | 5 ++++- grpc_server_test.go | 6 +++--- server/main.go | 4 ++-- 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/grpc_client.go b/grpc_client.go index 24d2b0c..1d33e62 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -179,14 +179,14 @@ func (c *GrpcClient) SetSelf(self bool) { c.isSelf.Store(self) } -func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) { +func (c *GrpcClient) GetServerId(ctx context.Context) (string, string, error) { statsGrpcClientCalls.WithLabelValues("GetServerId").Inc() response, err := c.impl.GetServerId(ctx, &GetServerIdRequest{}, grpc.WaitForReady(true)) if err != nil { - return "", err + return "", "", err } - return response.GetServerId(), nil + return response.GetServerId(), response.GetVersion(), nil } func (c *GrpcClient) LookupResumeId(ctx context.Context, resumeId string) (*LookupResumeIdReply, error) { @@ -398,7 +398,8 @@ type grpcClientsList struct { } type GrpcClients struct { - mu sync.RWMutex + mu sync.RWMutex + version string clientsMap map[string]*grpcClientsList clients []*GrpcClient @@ -421,10 +422,11 @@ type GrpcClients struct { closeFunc context.CancelFunc } -func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor) (*GrpcClients, error) { +func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor, version string) (*GrpcClients, error) { initializedCtx, initializedFunc := context.WithCancel(context.Background()) closeCtx, closeFunc := context.WithCancel(context.Background()) result := &GrpcClients{ + version: version, dnsMonitor: dnsMonitor, etcdClient: etcdClient, initializedCtx: initializedCtx, @@ -499,12 +501,12 @@ func (c *GrpcClients) isClientAvailable(target string, client *GrpcClient) bool return false } -func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, error) { +func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, string, error) { ctx2, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - id, err := client.GetServerId(ctx2) - return id, err + id, version, err := client.GetServerId(ctx2) + return id, version, err } func (c *GrpcClients) checkIsSelf(ctx context.Context, target string, client *GrpcClient) { @@ -522,7 +524,7 @@ loop: return } - id, err := c.getServerIdWithTimeout(ctx, client) + id, version, err := c.getServerIdWithTimeout(ctx, client) if err != nil { if errors.Is(err, context.Canceled) { return @@ -539,8 +541,10 @@ loop: log.Printf("GRPC target %s is this server, removing", client.Target()) c.closeClient(client) client.SetSelf(true) + } else if version != c.version { + log.Printf("WARNING: Node %s is runing different version %s than local node (%s)", client.Target(), version, c.version) } else { - log.Printf("Checked GRPC server id of %s", client.Target()) + log.Printf("Checked GRPC server id of %s running version %s", client.Target(), version) } break loop } diff --git a/grpc_client_test.go b/grpc_client_test.go index 3606aec..b536a34 100644 --- a/grpc_client_test.go +++ b/grpc_client_test.go @@ -53,7 +53,7 @@ func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} { func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, *DnsMonitor) { dnsMonitor := newDnsMonitorForTest(t, time.Hour) // will be updated manually - client, err := NewGrpcClients(config, etcdClient, dnsMonitor) + client, err := NewGrpcClients(config, etcdClient, dnsMonitor, "0.0.0") require.NoError(t, err) t.Cleanup(func() { client.Close() @@ -336,7 +336,7 @@ func Test_GrpcClients_Encryption(t *testing.T) { require.NoError(clients.WaitForInitialized(ctx)) for _, client := range clients.GetClients() { - _, err := client.GetServerId(ctx) + _, _, err := client.GetServerId(ctx) require.NoError(err) } }) diff --git a/grpc_internal.proto b/grpc_internal.proto index 6a6978a..6093f78 100644 --- a/grpc_internal.proto +++ b/grpc_internal.proto @@ -34,4 +34,5 @@ message GetServerIdRequest { message GetServerIdReply { string serverId = 1; + string version = 2; } diff --git a/grpc_server.go b/grpc_server.go index 4467495..77c6aec 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -70,6 +70,7 @@ type GrpcServer struct { UnimplementedRpcMcuServer UnimplementedRpcSessionsServer + version string creds credentials.TransportCredentials conn *grpc.Server listener net.Listener @@ -78,7 +79,7 @@ type GrpcServer struct { hub GrpcServerHub } -func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) { +func NewGrpcServer(config *goconf.ConfigFile, version string) (*GrpcServer, error) { var listener net.Listener if addr, _ := GetStringOptionWithEnv(config, "grpc", "listen"); addr != "" { var err error @@ -95,6 +96,7 @@ func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) { conn := grpc.NewServer(grpc.Creds(creds)) result := &GrpcServer{ + version: version, creds: creds, conn: conn, listener: listener, @@ -265,6 +267,7 @@ func (s *GrpcServer) GetServerId(ctx context.Context, request *GetServerIdReques statsGrpcServerCalls.WithLabelValues("GetServerId").Inc() return &GetServerIdReply{ ServerId: s.serverId, + Version: s.version, }, nil } diff --git a/grpc_server_test.go b/grpc_server_test.go index ffa6ceb..e985fd2 100644 --- a/grpc_server_test.go +++ b/grpc_server_test.go @@ -66,7 +66,7 @@ func NewGrpcServerForTestWithConfig(t *testing.T, config *goconf.ConfigFile) (se addr = net.JoinHostPort("127.0.0.1", strconv.Itoa(port)) config.AddOption("grpc", "listen", addr) var err error - server, err = NewGrpcServer(config) + server, err = NewGrpcServer(config, "0.0.0") if isErrorAddressAlreadyInUse(err) { continue } @@ -218,7 +218,7 @@ func Test_GrpcServer_ReloadCA(t *testing.T) { ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) defer cancel1() - _, err = client1.GetServerId(ctx1) + _, _, err = client1.GetServerId(ctx1) require.NoError(err) org2 := "Updated client" @@ -245,6 +245,6 @@ func Test_GrpcServer_ReloadCA(t *testing.T) { defer cancel2() // This will fail if the CA certificate has not been reloaded by the server. - _, err = client2.GetServerId(ctx2) + _, _, err = client2.GetServerId(ctx2) require.NoError(err) } diff --git a/server/main.go b/server/main.go index b8ace97..e372bd9 100644 --- a/server/main.go +++ b/server/main.go @@ -202,7 +202,7 @@ func main() { } }() - rpcServer, err := signaling.NewGrpcServer(config) + rpcServer, err := signaling.NewGrpcServer(config, version) if err != nil { log.Fatalf("Could not create RPC server: %s", err) } @@ -213,7 +213,7 @@ func main() { }() defer rpcServer.Close() - rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor) + rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor, version) if err != nil { log.Fatalf("Could not create RPC clients: %s", err) }