Check version of cluster nodes and log warning if different.

This commit is contained in:
Joachim Bauch 2025-01-16 09:37:40 +01:00
commit aaaf8cc743
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
6 changed files with 26 additions and 18 deletions

View file

@ -179,14 +179,14 @@ func (c *GrpcClient) SetSelf(self bool) {
c.isSelf.Store(self)
}
func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) {
func (c *GrpcClient) GetServerId(ctx context.Context) (string, string, error) {
statsGrpcClientCalls.WithLabelValues("GetServerId").Inc()
response, err := c.impl.GetServerId(ctx, &GetServerIdRequest{}, grpc.WaitForReady(true))
if err != nil {
return "", err
return "", "", err
}
return response.GetServerId(), nil
return response.GetServerId(), response.GetVersion(), nil
}
func (c *GrpcClient) LookupResumeId(ctx context.Context, resumeId string) (*LookupResumeIdReply, error) {
@ -398,7 +398,8 @@ type grpcClientsList struct {
}
type GrpcClients struct {
mu sync.RWMutex
mu sync.RWMutex
version string
clientsMap map[string]*grpcClientsList
clients []*GrpcClient
@ -421,10 +422,11 @@ type GrpcClients struct {
closeFunc context.CancelFunc
}
func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor) (*GrpcClients, error) {
func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor, version string) (*GrpcClients, error) {
initializedCtx, initializedFunc := context.WithCancel(context.Background())
closeCtx, closeFunc := context.WithCancel(context.Background())
result := &GrpcClients{
version: version,
dnsMonitor: dnsMonitor,
etcdClient: etcdClient,
initializedCtx: initializedCtx,
@ -499,12 +501,12 @@ func (c *GrpcClients) isClientAvailable(target string, client *GrpcClient) bool
return false
}
func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, error) {
func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, string, error) {
ctx2, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
id, err := client.GetServerId(ctx2)
return id, err
id, version, err := client.GetServerId(ctx2)
return id, version, err
}
func (c *GrpcClients) checkIsSelf(ctx context.Context, target string, client *GrpcClient) {
@ -522,7 +524,7 @@ loop:
return
}
id, err := c.getServerIdWithTimeout(ctx, client)
id, version, err := c.getServerIdWithTimeout(ctx, client)
if err != nil {
if errors.Is(err, context.Canceled) {
return
@ -539,8 +541,10 @@ loop:
log.Printf("GRPC target %s is this server, removing", client.Target())
c.closeClient(client)
client.SetSelf(true)
} else if version != c.version {
log.Printf("WARNING: Node %s is runing different version %s than local node (%s)", client.Target(), version, c.version)
} else {
log.Printf("Checked GRPC server id of %s", client.Target())
log.Printf("Checked GRPC server id of %s running version %s", client.Target(), version)
}
break loop
}

View file

@ -53,7 +53,7 @@ func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} {
func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, *DnsMonitor) {
dnsMonitor := newDnsMonitorForTest(t, time.Hour) // will be updated manually
client, err := NewGrpcClients(config, etcdClient, dnsMonitor)
client, err := NewGrpcClients(config, etcdClient, dnsMonitor, "0.0.0")
require.NoError(t, err)
t.Cleanup(func() {
client.Close()
@ -336,7 +336,7 @@ func Test_GrpcClients_Encryption(t *testing.T) {
require.NoError(clients.WaitForInitialized(ctx))
for _, client := range clients.GetClients() {
_, err := client.GetServerId(ctx)
_, _, err := client.GetServerId(ctx)
require.NoError(err)
}
})

View file

@ -34,4 +34,5 @@ message GetServerIdRequest {
message GetServerIdReply {
string serverId = 1;
string version = 2;
}

View file

@ -70,6 +70,7 @@ type GrpcServer struct {
UnimplementedRpcMcuServer
UnimplementedRpcSessionsServer
version string
creds credentials.TransportCredentials
conn *grpc.Server
listener net.Listener
@ -78,7 +79,7 @@ type GrpcServer struct {
hub GrpcServerHub
}
func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) {
func NewGrpcServer(config *goconf.ConfigFile, version string) (*GrpcServer, error) {
var listener net.Listener
if addr, _ := GetStringOptionWithEnv(config, "grpc", "listen"); addr != "" {
var err error
@ -95,6 +96,7 @@ func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) {
conn := grpc.NewServer(grpc.Creds(creds))
result := &GrpcServer{
version: version,
creds: creds,
conn: conn,
listener: listener,
@ -265,6 +267,7 @@ func (s *GrpcServer) GetServerId(ctx context.Context, request *GetServerIdReques
statsGrpcServerCalls.WithLabelValues("GetServerId").Inc()
return &GetServerIdReply{
ServerId: s.serverId,
Version: s.version,
}, nil
}

View file

@ -66,7 +66,7 @@ func NewGrpcServerForTestWithConfig(t *testing.T, config *goconf.ConfigFile) (se
addr = net.JoinHostPort("127.0.0.1", strconv.Itoa(port))
config.AddOption("grpc", "listen", addr)
var err error
server, err = NewGrpcServer(config)
server, err = NewGrpcServer(config, "0.0.0")
if isErrorAddressAlreadyInUse(err) {
continue
}
@ -218,7 +218,7 @@ func Test_GrpcServer_ReloadCA(t *testing.T) {
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
defer cancel1()
_, err = client1.GetServerId(ctx1)
_, _, err = client1.GetServerId(ctx1)
require.NoError(err)
org2 := "Updated client"
@ -245,6 +245,6 @@ func Test_GrpcServer_ReloadCA(t *testing.T) {
defer cancel2()
// This will fail if the CA certificate has not been reloaded by the server.
_, err = client2.GetServerId(ctx2)
_, _, err = client2.GetServerId(ctx2)
require.NoError(err)
}

View file

@ -202,7 +202,7 @@ func main() {
}
}()
rpcServer, err := signaling.NewGrpcServer(config)
rpcServer, err := signaling.NewGrpcServer(config, version)
if err != nil {
log.Fatalf("Could not create RPC server: %s", err)
}
@ -213,7 +213,7 @@ func main() {
}()
defer rpcServer.Close()
rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor)
rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor, version)
if err != nil {
log.Fatalf("Could not create RPC clients: %s", err)
}