grpc: Enable DNS discovery for GRPC clients.

This commit is contained in:
Joachim Bauch 2022-06-29 16:13:39 +02:00
parent 20cc51c2fe
commit 01858a89f4
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
3 changed files with 363 additions and 63 deletions

View file

@ -38,6 +38,7 @@ import (
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
) )
@ -48,6 +49,12 @@ const (
DefaultGrpcTargetType = GrpcTargetTypeStatic DefaultGrpcTargetType = GrpcTargetTypeStatic
) )
var (
lookupGrpcIp = net.LookupIP // can be overwritten from tests
customResolverPrefix uint64
)
func init() { func init() {
RegisterGrpcClientStats() RegisterGrpcClientStats()
} }
@ -67,25 +74,90 @@ func newGrpcClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl {
} }
type GrpcClient struct { type GrpcClient struct {
conn *grpc.ClientConn ip net.IP
impl *grpcClientImpl target string
conn *grpc.ClientConn
impl *grpcClientImpl
} }
func NewGrpcClient(target string, opts ...grpc.DialOption) (*GrpcClient, error) { type customIpResolver struct {
conn, err := grpc.Dial(target, opts...) resolver.Builder
resolver.Resolver
scheme string
addr string
hostname string
}
func (r *customIpResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
state := resolver.State{
Addresses: []resolver.Address{
{
Addr: r.addr,
ServerName: r.hostname,
},
},
}
if err := cc.UpdateState(state); err != nil {
return nil, err
}
return r, nil
}
func (r *customIpResolver) Scheme() string {
return r.scheme
}
func (r *customIpResolver) ResolveNow(opts resolver.ResolveNowOptions) {
// Noop, we use a static configuration.
}
func (r *customIpResolver) Close() {
// Noop
}
func NewGrpcClient(target string, ip net.IP, opts ...grpc.DialOption) (*GrpcClient, error) {
var conn *grpc.ClientConn
var err error
if ip != nil {
prefix := atomic.AddUint64(&customResolverPrefix, 1)
addr := ip.String()
hostname := target
if host, port, err := net.SplitHostPort(target); err == nil {
addr = net.JoinHostPort(addr, port)
hostname = host
}
resolver := &customIpResolver{
scheme: fmt.Sprintf("custom%d", prefix),
addr: addr,
hostname: hostname,
}
opts = append(opts, grpc.WithResolvers(resolver))
conn, err = grpc.Dial(fmt.Sprintf("%s://%s", resolver.Scheme(), target), opts...)
} else {
conn, err = grpc.Dial(target, opts...)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
result := &GrpcClient{ result := &GrpcClient{
conn: conn, ip: ip,
impl: newGrpcClientImpl(conn), target: target,
conn: conn,
impl: newGrpcClientImpl(conn),
}
if ip != nil {
result.target += " (" + ip.String() + ")"
} }
return result, nil return result, nil
} }
func (c *GrpcClient) Target() string { func (c *GrpcClient) Target() string {
return c.conn.Target() return c.target
} }
func (c *GrpcClient) Close() error { func (c *GrpcClient) Close() error {
@ -161,9 +233,13 @@ func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, strea
type GrpcClients struct { type GrpcClients struct {
mu sync.RWMutex mu sync.RWMutex
clientsMap map[string]*GrpcClient clientsMap map[string][]*GrpcClient
clients []*GrpcClient clients []*GrpcClient
dnsDiscovery bool
stopping chan bool
stopped chan bool
etcdClient *EtcdClient etcdClient *EtcdClient
targetPrefix string targetPrefix string
targetInformation map[string]*GrpcTargetInformationEtcd targetInformation map[string]*GrpcTargetInformationEtcd
@ -180,14 +256,17 @@ func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcCli
etcdClient: etcdClient, etcdClient: etcdClient,
initializedCtx: initializedCtx, initializedCtx: initializedCtx,
initializedFunc: initializedFunc, initializedFunc: initializedFunc,
stopping: make(chan bool, 1),
stopped: make(chan bool, 1),
} }
if err := result.load(config); err != nil { if err := result.load(config, false); err != nil {
return nil, err return nil, err
} }
return result, nil return result, nil
} }
func (c *GrpcClients) load(config *goconf.ConfigFile) error { func (c *GrpcClients) load(config *goconf.ConfigFile, fromReload bool) error {
var opts []grpc.DialOption var opts []grpc.DialOption
caFile, _ := config.GetString("grpc", "ca") caFile, _ := config.GetString("grpc", "ca")
if caFile != "" { if caFile != "" {
@ -202,31 +281,41 @@ func (c *GrpcClients) load(config *goconf.ConfigFile) error {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} }
if opts == nil {
opts = make([]grpc.DialOption, 0)
}
c.dialOptions.Store(opts)
targetType, _ := config.GetString("grpc", "targettype") targetType, _ := config.GetString("grpc", "targettype")
if targetType == "" { if targetType == "" {
targetType = DefaultGrpcTargetType targetType = DefaultGrpcTargetType
} }
var err error
switch targetType { switch targetType {
case GrpcTargetTypeStatic: case GrpcTargetTypeStatic:
return c.loadTargetsStatic(config, opts...) err = c.loadTargetsStatic(config, fromReload, opts...)
if err == nil && c.dnsDiscovery {
go c.monitorGrpcIPs()
}
case GrpcTargetTypeEtcd: case GrpcTargetTypeEtcd:
return c.loadTargetsEtcd(config, opts...) err = c.loadTargetsEtcd(config, fromReload, opts...)
default: default:
return fmt.Errorf("unknown GRPC target type: %s", targetType) err = fmt.Errorf("unknown GRPC target type: %s", targetType)
} }
return err
} }
func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, opts ...grpc.DialOption) error { func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bool, opts ...grpc.DialOption) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
clientsMap := make(map[string]*GrpcClient) clientsMap := make(map[string][]*GrpcClient)
var clients []*GrpcClient var clients []*GrpcClient
removeTargets := make(map[string]bool, len(c.clientsMap)) removeTargets := make(map[string]bool, len(c.clientsMap))
for target, client := range c.clientsMap { for target, entries := range c.clientsMap {
removeTargets[target] = true removeTargets[target] = true
clientsMap[target] = client clientsMap[target] = entries
} }
targets, _ := config.GetString("grpc", "targets") targets, _ := config.GetString("grpc", "targets")
@ -236,50 +325,87 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, opts ...grpc.
continue continue
} }
if client, found := clientsMap[target]; found { if entries, found := clientsMap[target]; found {
clients = append(clients, client) clients = append(clients, entries...)
delete(removeTargets, target) delete(removeTargets, target)
continue continue
} }
client, err := NewGrpcClient(target, opts...) host := target
if err != nil { if h, _, err := net.SplitHostPort(target); err == nil {
for target, client := range clientsMap { host = h
if closeerr := client.Close(); closeerr != nil { }
log.Printf("Error closing client to %s: %s", target, closeerr)
var ips []net.IP
if net.ParseIP(host) == nil {
// Use dedicated client for each IP address.
var err error
ips, err = lookupGrpcIp(host)
if err != nil {
log.Printf("Could not lookup %s: %s", host, err)
continue
}
} else {
// Connect directly to IP address.
ips = []net.IP{nil}
}
for _, ip := range ips {
client, err := NewGrpcClient(target, ip, opts...)
if err != nil {
for _, clients := range clientsMap {
for _, client := range clients {
if closeerr := client.Close(); closeerr != nil {
log.Printf("Error closing client to %s: %s", client.Target(), closeerr)
}
}
} }
return err
} }
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
if id, err := client.GetServerId(ctx); err != nil { if id, err := client.GetServerId(ctx); err != nil {
log.Printf("Error checking server id of %s: %s", client.Target(), err) log.Printf("Error checking server id of %s: %s", client.Target(), err)
} else if id == GrpcServerId { } else if id == GrpcServerId {
log.Printf("GRPC target %s is this server, ignoring", client.Target()) log.Printf("GRPC target %s is this server, ignoring", client.Target())
if err := client.Close(); err != nil { if err := client.Close(); err != nil {
log.Printf("Error closing client to %s: %s", client.Target(), err) log.Printf("Error closing client to %s: %s", client.Target(), err)
}
continue
} }
continue
}
log.Printf("Adding %s as GRPC target", target) log.Printf("Adding %s as GRPC target", client.Target())
clientsMap[target] = client clientsMap[target] = append(clientsMap[target], client)
clients = append(clients, client) clients = append(clients, client)
}
} }
for target := range removeTargets { for target := range removeTargets {
if client, found := clientsMap[target]; found { if clients, found := clientsMap[target]; found {
log.Printf("Deleting GRPC target %s", target) for _, client := range clients {
if err := client.Close(); err != nil { log.Printf("Deleting GRPC target %s", client.Target())
log.Printf("Error closing client to %s: %s", target, err) if err := client.Close(); err != nil {
log.Printf("Error closing client to %s: %s", client.Target(), err)
}
} }
delete(clientsMap, target) delete(clientsMap, target)
} }
} }
dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery")
if dnsDiscovery != c.dnsDiscovery {
if !dnsDiscovery && fromReload {
c.stopping <- true
<-c.stopped
}
c.dnsDiscovery = dnsDiscovery
if dnsDiscovery && fromReload {
go c.monitorGrpcIPs()
}
}
c.clients = clients c.clients = clients
c.clientsMap = clientsMap c.clientsMap = clientsMap
c.initializedFunc() c.initializedFunc()
@ -287,7 +413,109 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, opts ...grpc.
return nil return nil
} }
func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, opts ...grpc.DialOption) error { func (c *GrpcClients) monitorGrpcIPs() {
log.Printf("Start monitoring GRPC client IPs")
ticker := time.NewTicker(updateDnsInterval)
for {
select {
case <-ticker.C:
c.updateGrpcIPs()
case <-c.stopping:
c.stopped <- true
return
}
}
}
func (c *GrpcClients) updateGrpcIPs() {
c.mu.Lock()
defer c.mu.Unlock()
opts := c.dialOptions.Load().([]grpc.DialOption)
mapModified := false
for target, clients := range c.clientsMap {
host := target
if h, _, err := net.SplitHostPort(target); err == nil {
host = h
}
if net.ParseIP(host) != nil {
// No need to lookup endpoints that connect to IP addresses.
continue
}
ips, err := lookupGrpcIp(host)
if err != nil {
log.Printf("Could not lookup %s: %s", host, err)
continue
}
var newClients []*GrpcClient
changed := false
for _, client := range clients {
found := false
for idx, ip := range ips {
if ip.Equal(client.ip) {
ips = append(ips[:idx], ips[idx+1:]...)
found = true
newClients = append(newClients, client)
break
}
}
if !found {
changed = true
log.Printf("Removing connection to %s", client.Target())
if err := client.Close(); err != nil {
log.Printf("Error closing client to %s: %s", client.Target(), err)
}
c.wakeupForTesting()
}
}
for _, ip := range ips {
client, err := NewGrpcClient(target, ip, opts...)
if err != nil {
log.Printf("Error creating client to %s with IP %s: %s", target, ip.String(), err)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if id, err := client.GetServerId(ctx); err != nil {
log.Printf("Error checking server id of %s: %s", client.Target(), err)
} else if id == GrpcServerId {
//log.Printf("GRPC target %s is this server, ignoring", client.Target())
if err := client.Close(); err != nil {
log.Printf("Error closing client to %s: %s", client.Target(), err)
}
continue
}
log.Printf("Adding %s as GRPC target", client.Target())
newClients = append(newClients, client)
changed = true
c.wakeupForTesting()
}
if changed {
c.clientsMap[target] = newClients
mapModified = true
}
}
if mapModified {
c.clients = make([]*GrpcClient, 0, len(c.clientsMap))
for _, clients := range c.clientsMap {
c.clients = append(c.clients, clients...)
}
statsGrpcClients.Set(float64(len(c.clients)))
}
}
func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, fromReload bool, opts ...grpc.DialOption) error {
if !c.etcdClient.IsConfigured() { if !c.etcdClient.IsConfigured() {
return fmt.Errorf("No etcd endpoints configured") return fmt.Errorf("No etcd endpoints configured")
} }
@ -301,11 +529,6 @@ func (c *GrpcClients) loadTargetsEtcd(config *goconf.ConfigFile, opts ...grpc.Di
c.targetInformation = make(map[string]*GrpcTargetInformationEtcd) c.targetInformation = make(map[string]*GrpcTargetInformationEtcd)
} }
if opts == nil {
opts = make([]grpc.DialOption, 0)
}
c.dialOptions.Store(opts)
c.etcdClient.AddListener(c) c.etcdClient.AddListener(c)
return nil return nil
} }
@ -380,7 +603,7 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte
} }
opts := c.dialOptions.Load().([]grpc.DialOption) opts := c.dialOptions.Load().([]grpc.DialOption)
cl, err := NewGrpcClient(info.Address, opts...) cl, err := NewGrpcClient(info.Address, nil, opts...)
if err != nil { if err != nil {
log.Printf("Could not create GRPC client for target %s: %s", info.Address, err) log.Printf("Could not create GRPC client for target %s: %s", info.Address, err)
return return
@ -400,12 +623,12 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte
return return
} }
log.Printf("Adding %s as GRPC target", info.Address) log.Printf("Adding %s as GRPC target", cl.Target())
if c.clientsMap == nil { if c.clientsMap == nil {
c.clientsMap = make(map[string]*GrpcClient) c.clientsMap = make(map[string][]*GrpcClient)
} }
c.clientsMap[info.Address] = cl c.clientsMap[info.Address] = []*GrpcClient{cl}
c.clients = append(c.clients, cl) c.clients = append(c.clients, cl)
c.targetInformation[key] = &info c.targetInformation[key] = &info
statsGrpcClients.Inc() statsGrpcClients.Inc()
@ -428,19 +651,21 @@ func (c *GrpcClients) removeEtcdClientLocked(key string) {
} }
delete(c.targetInformation, key) delete(c.targetInformation, key)
client, found := c.clientsMap[info.Address] clients, found := c.clientsMap[info.Address]
if !found { if !found {
return return
} }
log.Printf("Removing connection to %s (from %s)", info.Address, key) for _, client := range clients {
if err := client.Close(); err != nil { log.Printf("Removing connection to %s (from %s)", client.Target(), key)
log.Printf("Error closing client to %s: %s", client.Target(), err) if err := client.Close(); err != nil {
log.Printf("Error closing client to %s: %s", client.Target(), err)
}
} }
delete(c.clientsMap, info.Address) delete(c.clientsMap, info.Address)
c.clients = make([]*GrpcClient, 0, len(c.clientsMap)) c.clients = make([]*GrpcClient, 0, len(c.clientsMap))
for _, client := range c.clientsMap { for _, clients := range c.clientsMap {
c.clients = append(c.clients, client) c.clients = append(c.clients, clients...)
} }
statsGrpcClients.Dec() statsGrpcClients.Dec()
c.wakeupForTesting() c.wakeupForTesting()
@ -467,7 +692,7 @@ func (c *GrpcClients) wakeupForTesting() {
} }
func (c *GrpcClients) Reload(config *goconf.ConfigFile) { func (c *GrpcClients) Reload(config *goconf.ConfigFile) {
if err := c.load(config); err != nil { if err := c.load(config, true); err != nil {
log.Printf("Could not reload RPC clients: %s", err) log.Printf("Could not reload RPC clients: %s", err)
} }
} }
@ -476,14 +701,21 @@ func (c *GrpcClients) Close() {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
for target, client := range c.clientsMap { for _, clients := range c.clientsMap {
if err := client.Close(); err != nil { for _, client := range clients {
log.Printf("Error closing client to %s: %s", target, err) if err := client.Close(); err != nil {
log.Printf("Error closing client to %s: %s", client.Target(), err)
}
} }
} }
c.clients = nil c.clients = nil
c.clientsMap = nil c.clientsMap = nil
if c.dnsDiscovery {
c.stopping <- true
<-c.stopped
c.dnsDiscovery = false
}
if c.etcdClient != nil { if c.etcdClient != nil {
c.etcdClient.RemoveListener(c) c.etcdClient.RemoveListener(c)

View file

@ -23,6 +23,8 @@ package signaling
import ( import (
"context" "context"
"fmt"
"net"
"testing" "testing"
"time" "time"
@ -33,6 +35,7 @@ import (
func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients { func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients {
config := goconf.NewConfigFile() config := goconf.NewConfigFile()
config.AddOption("grpc", "targets", addr) config.AddOption("grpc", "targets", addr)
config.AddOption("grpc", "dnsdiscovery", "true")
client, err := NewGrpcClients(config, nil) client, err := NewGrpcClients(config, nil)
if err != nil { if err != nil {
@ -196,3 +199,61 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) {
t.Errorf("Expected target %s, got %s", addr1, clients[0].Target()) t.Errorf("Expected target %s, got %s", addr1, clients[0].Target())
} }
} }
func Test_GrpcClients_DnsDiscovery(t *testing.T) {
var ipsResult []net.IP
lookupGrpcIp = func(host string) ([]net.IP, error) {
if host == "testgrpc" {
return ipsResult, nil
}
return nil, fmt.Errorf("unknown host")
}
target := "testgrpc:12345"
ip1 := net.ParseIP("192.168.0.1")
ip2 := net.ParseIP("192.168.0.2")
targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1)
targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2)
ipsResult = []net.IP{ip1}
client := NewGrpcClientsForTest(t, target)
ch := make(chan bool, 1)
client.wakeupChanForTesting = ch
if clients := client.GetClients(); len(clients) != 1 {
t.Errorf("Expected one client, got %+v", clients)
} else if clients[0].Target() != targetWithIp1 {
t.Errorf("Expected target %s, got %s", targetWithIp1, clients[0].Target())
} else if !clients[0].ip.Equal(ip1) {
t.Errorf("Expected IP %s, got %s", ip1, clients[0].ip)
}
ipsResult = []net.IP{ip1, ip2}
drainWakeupChannel(ch)
client.updateGrpcIPs()
<-ch
if clients := client.GetClients(); len(clients) != 2 {
t.Errorf("Expected two client, got %+v", clients)
} else if clients[0].Target() != targetWithIp1 {
t.Errorf("Expected target %s, got %s", targetWithIp1, clients[0].Target())
} else if !clients[0].ip.Equal(ip1) {
t.Errorf("Expected IP %s, got %s", ip1, clients[0].ip)
} else if clients[1].Target() != targetWithIp2 {
t.Errorf("Expected target %s, got %s", targetWithIp2, clients[1].Target())
} else if !clients[1].ip.Equal(ip2) {
t.Errorf("Expected IP %s, got %s", ip2, clients[1].ip)
}
ipsResult = []net.IP{ip2}
drainWakeupChannel(ch)
client.updateGrpcIPs()
<-ch
if clients := client.GetClients(); len(clients) != 1 {
t.Errorf("Expected one client, got %+v", clients)
} else if clients[0].Target() != targetWithIp2 {
t.Errorf("Expected target %s, got %s", targetWithIp2, clients[0].Target())
} else if !clients[0].ip.Equal(ip2) {
t.Errorf("Expected IP %s, got %s", ip2, clients[0].ip)
}
}

View file

@ -261,6 +261,13 @@ connectionsperhost = 8
# for clustering mode. # for clustering mode.
#targets = 192.168.0.1:9090, 192.168.0.2:9090 #targets = 192.168.0.1:9090, 192.168.0.2:9090
# For target type "static": Enable DNS discovery on hostnames of GRPC target.
# If a hostname resolves to multiple IP addresses, a connection is established
# to each of them.
# Changes to the DNS are monitored regularly and GRPC clients are created or
# deleted as necessary.
#dnsdiscovery = true
# For target type "etcd": Key prefix of GRPC target entries. All keys below will # For target type "etcd": Key prefix of GRPC target entries. All keys below will
# be watched and assumed to contain a JSON document. The entry "address" from # be watched and assumed to contain a JSON document. The entry "address" from
# this document will be used as target URL, other contents in the document will # this document will be used as target URL, other contents in the document will