From 01858a89f498602c11c1d9de9ff3cfb88f86920f Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 29 Jun 2022 16:13:39 +0200 Subject: [PATCH] grpc: Enable DNS discovery for GRPC clients. --- grpc_client.go | 358 ++++++++++++++++++++++++++++++++++++-------- grpc_client_test.go | 61 ++++++++ server.conf.in | 7 + 3 files changed, 363 insertions(+), 63 deletions(-) diff --git a/grpc_client.go b/grpc_client.go index d50b4ab..ecb4f04 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -38,6 +38,7 @@ import ( codes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/resolver" status "google.golang.org/grpc/status" ) @@ -48,6 +49,12 @@ const ( DefaultGrpcTargetType = GrpcTargetTypeStatic ) +var ( + lookupGrpcIp = net.LookupIP // can be overwritten from tests + + customResolverPrefix uint64 +) + func init() { RegisterGrpcClientStats() } @@ -67,25 +74,90 @@ func newGrpcClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl { } type GrpcClient struct { - conn *grpc.ClientConn - impl *grpcClientImpl + ip net.IP + target string + conn *grpc.ClientConn + impl *grpcClientImpl } -func NewGrpcClient(target string, opts ...grpc.DialOption) (*GrpcClient, error) { - conn, err := grpc.Dial(target, opts...) +type customIpResolver struct { + resolver.Builder + resolver.Resolver + + scheme string + addr string + hostname string +} + +func (r *customIpResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + state := resolver.State{ + Addresses: []resolver.Address{ + { + Addr: r.addr, + ServerName: r.hostname, + }, + }, + } + + if err := cc.UpdateState(state); err != nil { + return nil, err + } + + return r, nil +} + +func (r *customIpResolver) Scheme() string { + return r.scheme +} + +func (r *customIpResolver) ResolveNow(opts resolver.ResolveNowOptions) { + // Noop, we use a static configuration. +} + +func (r *customIpResolver) Close() { + // Noop +} + +func NewGrpcClient(target string, ip net.IP, opts ...grpc.DialOption) (*GrpcClient, error) { + var conn *grpc.ClientConn + var err error + if ip != nil { + prefix := atomic.AddUint64(&customResolverPrefix, 1) + addr := ip.String() + hostname := target + if host, port, err := net.SplitHostPort(target); err == nil { + addr = net.JoinHostPort(addr, port) + hostname = host + } + resolver := &customIpResolver{ + scheme: fmt.Sprintf("custom%d", prefix), + addr: addr, + hostname: hostname, + } + opts = append(opts, grpc.WithResolvers(resolver)) + conn, err = grpc.Dial(fmt.Sprintf("%s://%s", resolver.Scheme(), target), opts...) + } else { + conn, err = grpc.Dial(target, opts...) + } if err != nil { return nil, err } result := &GrpcClient{ - conn: conn, - impl: newGrpcClientImpl(conn), + ip: ip, + target: target, + conn: conn, + impl: newGrpcClientImpl(conn), + } + + if ip != nil { + result.target += " (" + ip.String() + ")" } return result, nil } func (c *GrpcClient) Target() string { - return c.conn.Target() + return c.target } func (c *GrpcClient) Close() error { @@ -161,9 +233,13 @@ func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, strea type GrpcClients struct { mu sync.RWMutex - clientsMap map[string]*GrpcClient + clientsMap map[string][]*GrpcClient clients []*GrpcClient + dnsDiscovery bool + stopping chan bool + stopped chan bool + etcdClient *EtcdClient targetPrefix string targetInformation map[string]*GrpcTargetInformationEtcd @@ -180,14 +256,17 @@ func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcCli etcdClient: etcdClient, initializedCtx: initializedCtx, initializedFunc: initializedFunc, + + stopping: make(chan bool, 1), + stopped: make(chan bool, 1), } - if err := result.load(config); err != nil { + if err := result.load(config, false); err != nil { return nil, err } return result, nil } -func (c *GrpcClients) load(config *goconf.ConfigFile) error { +func (c *GrpcClients) load(config *goconf.ConfigFile, fromReload bool) error { var opts []grpc.DialOption caFile, _ := config.GetString("grpc", "ca") if caFile != "" { @@ -202,31 +281,41 @@ func (c *GrpcClients) load(config *goconf.ConfigFile) error { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } + if opts == nil { + opts = make([]grpc.DialOption, 0) + } + c.dialOptions.Store(opts) + targetType, _ := config.GetString("grpc", "targettype") if targetType == "" { targetType = DefaultGrpcTargetType } + var err error switch targetType { case GrpcTargetTypeStatic: - return c.loadTargetsStatic(config, opts...) + err = c.loadTargetsStatic(config, fromReload, opts...) + if err == nil && c.dnsDiscovery { + go c.monitorGrpcIPs() + } case GrpcTargetTypeEtcd: - return c.loadTargetsEtcd(config, opts...) + err = c.loadTargetsEtcd(config, fromReload, opts...) default: - return fmt.Errorf("unknown GRPC target type: %s", targetType) + err = fmt.Errorf("unknown GRPC target type: %s", targetType) } + return err } -func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, opts ...grpc.DialOption) error { +func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bool, opts ...grpc.DialOption) error { c.mu.Lock() defer c.mu.Unlock() - clientsMap := make(map[string]*GrpcClient) + clientsMap := make(map[string][]*GrpcClient) var clients []*GrpcClient removeTargets := make(map[string]bool, len(c.clientsMap)) - for target, client := range c.clientsMap { + for target, entries := range c.clientsMap { removeTargets[target] = true - clientsMap[target] = client + clientsMap[target] = entries } targets, _ := config.GetString("grpc", "targets") @@ -236,50 +325,87 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, opts ...grpc. continue } - if client, found := clientsMap[target]; found { - clients = append(clients, client) + if entries, found := clientsMap[target]; found { + clients = append(clients, entries...) delete(removeTargets, target) continue } - client, err := NewGrpcClient(target, opts...) - if err != nil { - for target, client := range clientsMap { - if closeerr := client.Close(); closeerr != nil { - log.Printf("Error closing client to %s: %s", target, closeerr) + host := target + if h, _, err := net.SplitHostPort(target); err == nil { + host = h + } + + var ips []net.IP + if net.ParseIP(host) == nil { + // Use dedicated client for each IP address. + var err error + ips, err = lookupGrpcIp(host) + if err != nil { + log.Printf("Could not lookup %s: %s", host, err) + continue + } + } else { + // Connect directly to IP address. + ips = []net.IP{nil} + } + + for _, ip := range ips { + client, err := NewGrpcClient(target, ip, opts...) + if err != nil { + for _, clients := range clientsMap { + for _, client := range clients { + if closeerr := client.Close(); closeerr != nil { + log.Printf("Error closing client to %s: %s", client.Target(), closeerr) + } + } } + return err } - return err - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() - if id, err := client.GetServerId(ctx); err != nil { - log.Printf("Error checking server id of %s: %s", client.Target(), err) - } else if id == GrpcServerId { - log.Printf("GRPC target %s is this server, ignoring", client.Target()) - if err := client.Close(); err != nil { - log.Printf("Error closing client to %s: %s", client.Target(), err) + if id, err := client.GetServerId(ctx); err != nil { + log.Printf("Error checking server id of %s: %s", client.Target(), err) + } else if id == GrpcServerId { + log.Printf("GRPC target %s is this server, ignoring", client.Target()) + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", client.Target(), err) + } + continue } - continue - } - log.Printf("Adding %s as GRPC target", target) - clientsMap[target] = client - clients = append(clients, client) + log.Printf("Adding %s as GRPC target", client.Target()) + clientsMap[target] = append(clientsMap[target], client) + clients = append(clients, client) + } } for target := range removeTargets { - if client, found := clientsMap[target]; found { - log.Printf("Deleting GRPC target %s", target) - if err := client.Close(); err != nil { - log.Printf("Error closing client to %s: %s", target, err) + if clients, found := clientsMap[target]; found { + for _, client := range clients { + log.Printf("Deleting GRPC target %s", client.Target()) + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", client.Target(), err) + } } delete(clientsMap, target) } } + dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery") + if dnsDiscovery != c.dnsDiscovery { + if !dnsDiscovery && fromReload { + c.stopping <- true + <-c.stopped + } + c.dnsDiscovery = dnsDiscovery + if dnsDiscovery && fromReload { + go c.monitorGrpcIPs() + } + } + c.clients = clients c.clientsMap = clientsMap c.initializedFunc() @@ -287,7 +413,109 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, opts ...grpc. return nil } -func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, opts ...grpc.DialOption) error { +func (c *GrpcClients) monitorGrpcIPs() { + log.Printf("Start monitoring GRPC client IPs") + ticker := time.NewTicker(updateDnsInterval) + for { + select { + case <-ticker.C: + c.updateGrpcIPs() + case <-c.stopping: + c.stopped <- true + return + } + } +} + +func (c *GrpcClients) updateGrpcIPs() { + c.mu.Lock() + defer c.mu.Unlock() + + opts := c.dialOptions.Load().([]grpc.DialOption) + + mapModified := false + for target, clients := range c.clientsMap { + host := target + if h, _, err := net.SplitHostPort(target); err == nil { + host = h + } + + if net.ParseIP(host) != nil { + // No need to lookup endpoints that connect to IP addresses. + continue + } + + ips, err := lookupGrpcIp(host) + if err != nil { + log.Printf("Could not lookup %s: %s", host, err) + continue + } + + var newClients []*GrpcClient + changed := false + for _, client := range clients { + found := false + for idx, ip := range ips { + if ip.Equal(client.ip) { + ips = append(ips[:idx], ips[idx+1:]...) + found = true + newClients = append(newClients, client) + break + } + } + + if !found { + changed = true + log.Printf("Removing connection to %s", client.Target()) + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", client.Target(), err) + } + c.wakeupForTesting() + } + } + + for _, ip := range ips { + client, err := NewGrpcClient(target, ip, opts...) + if err != nil { + log.Printf("Error creating client to %s with IP %s: %s", target, ip.String(), err) + continue + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if id, err := client.GetServerId(ctx); err != nil { + log.Printf("Error checking server id of %s: %s", client.Target(), err) + } else if id == GrpcServerId { + //log.Printf("GRPC target %s is this server, ignoring", client.Target()) + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", client.Target(), err) + } + continue + } + + log.Printf("Adding %s as GRPC target", client.Target()) + newClients = append(newClients, client) + changed = true + c.wakeupForTesting() + } + + if changed { + c.clientsMap[target] = newClients + mapModified = true + } + } + + if mapModified { + c.clients = make([]*GrpcClient, 0, len(c.clientsMap)) + for _, clients := range c.clientsMap { + c.clients = append(c.clients, clients...) + } + statsGrpcClients.Set(float64(len(c.clients))) + } +} + +func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, fromReload bool, opts ...grpc.DialOption) error { if !c.etcdClient.IsConfigured() { return fmt.Errorf("No etcd endpoints configured") } @@ -301,11 +529,6 @@ func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, opts ...grpc.Di c.targetInformation = make(map[string]*GrpcTargetInformationEtcd) } - if opts == nil { - opts = make([]grpc.DialOption, 0) - } - c.dialOptions.Store(opts) - c.etcdClient.AddListener(c) return nil } @@ -380,7 +603,7 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte } opts := c.dialOptions.Load().([]grpc.DialOption) - cl, err := NewGrpcClient(info.Address, opts...) + cl, err := NewGrpcClient(info.Address, nil, opts...) if err != nil { log.Printf("Could not create GRPC client for target %s: %s", info.Address, err) return @@ -400,12 +623,12 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte return } - log.Printf("Adding %s as GRPC target", info.Address) + log.Printf("Adding %s as GRPC target", cl.Target()) if c.clientsMap == nil { - c.clientsMap = make(map[string]*GrpcClient) + c.clientsMap = make(map[string][]*GrpcClient) } - c.clientsMap[info.Address] = cl + c.clientsMap[info.Address] = []*GrpcClient{cl} c.clients = append(c.clients, cl) c.targetInformation[key] = &info statsGrpcClients.Inc() @@ -428,19 +651,21 @@ func (c *GrpcClients) removeEtcdClientLocked(key string) { } delete(c.targetInformation, key) - client, found := c.clientsMap[info.Address] + clients, found := c.clientsMap[info.Address] if !found { return } - log.Printf("Removing connection to %s (from %s)", info.Address, key) - if err := client.Close(); err != nil { - log.Printf("Error closing client to %s: %s", client.Target(), err) + for _, client := range clients { + log.Printf("Removing connection to %s (from %s)", client.Target(), key) + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", client.Target(), err) + } } delete(c.clientsMap, info.Address) c.clients = make([]*GrpcClient, 0, len(c.clientsMap)) - for _, client := range c.clientsMap { - c.clients = append(c.clients, client) + for _, clients := range c.clientsMap { + c.clients = append(c.clients, clients...) } statsGrpcClients.Dec() c.wakeupForTesting() @@ -467,7 +692,7 @@ func (c *GrpcClients) wakeupForTesting() { } func (c *GrpcClients) Reload(config *goconf.ConfigFile) { - if err := c.load(config); err != nil { + if err := c.load(config, true); err != nil { log.Printf("Could not reload RPC clients: %s", err) } } @@ -476,14 +701,21 @@ func (c *GrpcClients) Close() { c.mu.Lock() defer c.mu.Unlock() - for target, client := range c.clientsMap { - if err := client.Close(); err != nil { - log.Printf("Error closing client to %s: %s", target, err) + for _, clients := range c.clientsMap { + for _, client := range clients { + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", client.Target(), err) + } } } c.clients = nil c.clientsMap = nil + if c.dnsDiscovery { + c.stopping <- true + <-c.stopped + c.dnsDiscovery = false + } if c.etcdClient != nil { c.etcdClient.RemoveListener(c) diff --git a/grpc_client_test.go b/grpc_client_test.go index a2fbb16..8ad4f92 100644 --- a/grpc_client_test.go +++ b/grpc_client_test.go @@ -23,6 +23,8 @@ package signaling import ( "context" + "fmt" + "net" "testing" "time" @@ -33,6 +35,7 @@ import ( func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients { config := goconf.NewConfigFile() config.AddOption("grpc", "targets", addr) + config.AddOption("grpc", "dnsdiscovery", "true") client, err := NewGrpcClients(config, nil) if err != nil { @@ -196,3 +199,61 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { t.Errorf("Expected target %s, got %s", addr1, clients[0].Target()) } } + +func Test_GrpcClients_DnsDiscovery(t *testing.T) { + var ipsResult []net.IP + lookupGrpcIp = func(host string) ([]net.IP, error) { + if host == "testgrpc" { + return ipsResult, nil + } + + return nil, fmt.Errorf("unknown host") + } + target := "testgrpc:12345" + ip1 := net.ParseIP("192.168.0.1") + ip2 := net.ParseIP("192.168.0.2") + targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1) + targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2) + ipsResult = []net.IP{ip1} + client := NewGrpcClientsForTest(t, target) + ch := make(chan bool, 1) + client.wakeupChanForTesting = ch + + if clients := client.GetClients(); len(clients) != 1 { + t.Errorf("Expected one client, got %+v", clients) + } else if clients[0].Target() != targetWithIp1 { + t.Errorf("Expected target %s, got %s", targetWithIp1, clients[0].Target()) + } else if !clients[0].ip.Equal(ip1) { + t.Errorf("Expected IP %s, got %s", ip1, clients[0].ip) + } + + ipsResult = []net.IP{ip1, ip2} + drainWakeupChannel(ch) + client.updateGrpcIPs() + <-ch + + if clients := client.GetClients(); len(clients) != 2 { + t.Errorf("Expected two client, got %+v", clients) + } else if clients[0].Target() != targetWithIp1 { + t.Errorf("Expected target %s, got %s", targetWithIp1, clients[0].Target()) + } else if !clients[0].ip.Equal(ip1) { + t.Errorf("Expected IP %s, got %s", ip1, clients[0].ip) + } else if clients[1].Target() != targetWithIp2 { + t.Errorf("Expected target %s, got %s", targetWithIp2, clients[1].Target()) + } else if !clients[1].ip.Equal(ip2) { + t.Errorf("Expected IP %s, got %s", ip2, clients[1].ip) + } + + ipsResult = []net.IP{ip2} + drainWakeupChannel(ch) + client.updateGrpcIPs() + <-ch + + if clients := client.GetClients(); len(clients) != 1 { + t.Errorf("Expected one client, got %+v", clients) + } else if clients[0].Target() != targetWithIp2 { + t.Errorf("Expected target %s, got %s", targetWithIp2, clients[0].Target()) + } else if !clients[0].ip.Equal(ip2) { + t.Errorf("Expected IP %s, got %s", ip2, clients[0].ip) + } +} diff --git a/server.conf.in b/server.conf.in index bb6826a..9303d31 100644 --- a/server.conf.in +++ b/server.conf.in @@ -261,6 +261,13 @@ connectionsperhost = 8 # for clustering mode. #targets = 192.168.0.1:9090, 192.168.0.2:9090 +# For target type "static": Enable DNS discovery on hostnames of GRPC target. +# If a hostname resolves to multiple IP addresses, a connection is established +# to each of them. +# Changes to the DNS are monitored regularly and GRPC clients are created or +# deleted as necessary. +#dnsdiscovery = true + # For target type "etcd": Key prefix of GRPC target entries. All keys below will # be watched and assumed to contain a JSON document. The entry "address" from # this document will be used as target URL, other contents in the document will