diff --git a/backend_server_test.go b/backend_server_test.go index 1e14008..e2d3f94 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -169,7 +169,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g t.Cleanup(func() { events1.Close() }) - client1 := NewGrpcClientsForTest(t, addr2) + client1, _ := NewGrpcClientsForTest(t, addr2) hub1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version") if err != nil { t.Fatal(err) @@ -198,7 +198,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g t.Cleanup(func() { events2.Close() }) - client2 := NewGrpcClientsForTest(t, addr1) + client2, _ := NewGrpcClientsForTest(t, addr1) hub2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version") if err != nil { t.Fatal(err) diff --git a/dns_monitor_test.go b/dns_monitor_test.go index 4fa7b49..72cd89d 100644 --- a/dns_monitor_test.go +++ b/dns_monitor_test.go @@ -304,7 +304,7 @@ func TestDnsMonitorIP(t *testing.T) { rec1 := newDnsMonitorReceiverForTest(t) rec1.Expect(ips, ips, nil, nil) - entry, err := monitor.Add("https://"+ip, rec1.OnLookup) + entry, err := monitor.Add(ip+":12345", rec1.OnLookup) if err != nil { t.Fatal(err) } diff --git a/grpc_client.go b/grpc_client.go index 1f74d62..b2a1855 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -49,8 +49,6 @@ const ( ) var ( - lookupGrpcIp = net.LookupIP // can be overwritten from tests - customResolverPrefix atomic.Uint64 ) @@ -258,15 +256,19 @@ func (c *GrpcClient) GetSessionCount(ctx context.Context, u *url.URL) (uint32, e return response.GetCount(), nil } +type grpcClientsList struct { + clients []*GrpcClient + entry *DnsMonitorEntry +} + type GrpcClients struct { mu sync.RWMutex - clientsMap map[string][]*GrpcClient + clientsMap map[string]*grpcClientsList clients []*GrpcClient + dnsMonitor *DnsMonitor dnsDiscovery bool - stopping chan struct{} - stopped chan struct{} etcdClient *EtcdClient targetPrefix string @@ -280,15 +282,13 @@ type GrpcClients struct { selfCheckWaitGroup sync.WaitGroup } -func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, error) { +func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor) (*GrpcClients, error) { initializedCtx, initializedFunc := context.WithCancel(context.Background()) result := &GrpcClients{ + dnsMonitor: dnsMonitor, etcdClient: etcdClient, initializedCtx: initializedCtx, initializedFunc: initializedFunc, - - stopping: make(chan struct{}, 1), - stopped: make(chan struct{}, 1), } if err := result.load(config, false); err != nil { return nil, err @@ -313,9 +313,6 @@ func (c *GrpcClients) load(config *goconf.ConfigFile, fromReload bool) error { switch targetType { case GrpcTargetTypeStatic: err = c.loadTargetsStatic(config, fromReload, opts...) - if err == nil && c.dnsDiscovery { - go c.monitorGrpcIPs() - } case GrpcTargetTypeEtcd: err = c.loadTargetsEtcd(config, fromReload, opts...) default: @@ -344,7 +341,7 @@ func (c *GrpcClients) isClientAvailable(target string, client *GrpcClient) bool return false } - for _, entry := range entries { + for _, entry := range entries.clients { if entry == client { return true } @@ -401,7 +398,20 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo c.mu.Lock() defer c.mu.Unlock() - clientsMap := make(map[string][]*GrpcClient) + dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery") + if dnsDiscovery != c.dnsDiscovery { + if !dnsDiscovery { + for _, entry := range c.clientsMap { + if entry.entry != nil { + c.dnsMonitor.Remove(entry.entry) + entry.entry = nil + } + } + } + c.dnsDiscovery = dnsDiscovery + } + + clientsMap := make(map[string]*grpcClientsList) var clients []*GrpcClient removeTargets := make(map[string]bool, len(c.clientsMap)) for target, entries := range c.clientsMap { @@ -417,7 +427,15 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo } if entries, found := clientsMap[target]; found { - clients = append(clients, entries...) + clients = append(clients, entries.clients...) + if dnsDiscovery && entries.entry == nil { + entry, err := c.dnsMonitor.Add(target, c.onLookup) + if err != nil { + return err + } + + entries.entry = entry + } delete(removeTargets, target) continue } @@ -427,61 +445,58 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo host = h } - var ips []net.IP - if net.ParseIP(host) == nil { + if dnsDiscovery && net.ParseIP(host) == nil { // Use dedicated client for each IP address. - var err error - ips, err = lookupGrpcIp(host) + entry, err := c.dnsMonitor.Add(target, c.onLookup) if err != nil { - log.Printf("Could not lookup %s: %s", host, err) - // Make sure updating continues even if initial lookup failed. - clientsMap[target] = nil - continue - } - } else { - // Connect directly to IP address. - ips = []net.IP{nil} - } - - for _, ip := range ips { - client, err := NewGrpcClient(target, ip, opts...) - if err != nil { - for _, clients := range clientsMap { - for _, client := range clients { - c.closeClient(client) - } - } return err } - c.selfCheckWaitGroup.Add(1) - go c.checkIsSelf(context.Background(), target, client) - - log.Printf("Adding %s as GRPC target", client.Target()) - clientsMap[target] = append(clientsMap[target], client) - clients = append(clients, client) + clientsMap[target] = &grpcClientsList{ + entry: entry, + } + continue } + + client, err := NewGrpcClient(target, nil, opts...) + if err != nil { + for _, entry := range clientsMap { + for _, client := range entry.clients { + c.closeClient(client) + } + + if entry.entry != nil { + c.dnsMonitor.Remove(entry.entry) + entry.entry = nil + } + } + return err + } + + c.selfCheckWaitGroup.Add(1) + go c.checkIsSelf(context.Background(), target, client) + + log.Printf("Adding %s as GRPC target", client.Target()) + entry, found := clientsMap[target] + if !found { + entry = &grpcClientsList{} + } + entry.clients = append(entry.clients, client) + clients = append(clients, client) } for target := range removeTargets { - if clients, found := clientsMap[target]; found { - for _, client := range clients { + if entry, found := clientsMap[target]; found { + for _, client := range entry.clients { log.Printf("Deleting GRPC target %s", client.Target()) c.closeClient(client) } - delete(clientsMap, target) - } - } - dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery") - if dnsDiscovery != c.dnsDiscovery { - if !dnsDiscovery && fromReload { - c.stopping <- struct{}{} - <-c.stopped - } - c.dnsDiscovery = dnsDiscovery - if dnsDiscovery && fromReload { - go c.monitorGrpcIPs() + if entry.entry != nil { + c.dnsMonitor.Remove(entry.entry) + entry.entry = nil + } + delete(clientsMap, target) } } @@ -492,91 +507,61 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo return nil } -func (c *GrpcClients) monitorGrpcIPs() { - log.Printf("Start monitoring GRPC client IPs") - ticker := time.NewTicker(updateDnsInterval) - for { - select { - case <-ticker.C: - c.updateGrpcIPs() - case <-c.stopping: - c.stopped <- struct{}{} - return - } - } -} - -func (c *GrpcClients) updateGrpcIPs() { +func (c *GrpcClients) onLookup(entry *DnsMonitorEntry, all []net.IP, added []net.IP, keep []net.IP, removed []net.IP) { c.mu.Lock() defer c.mu.Unlock() + target := entry.URL() + e, found := c.clientsMap[target] + if !found { + return + } + opts := c.dialOptions.Load().([]grpc.DialOption) mapModified := false - for target, clients := range c.clientsMap { - host := target - if h, _, err := net.SplitHostPort(target); err == nil { - host = h - } - - if net.ParseIP(host) != nil { - // No need to lookup endpoints that connect to IP addresses. - continue - } - - ips, err := lookupGrpcIp(host) - if err != nil { - log.Printf("Could not lookup %s: %s", host, err) - continue - } - - var newClients []*GrpcClient - changed := false - for _, client := range clients { - found := false - for idx, ip := range ips { - if ip.Equal(client.ip) { - ips = append(ips[:idx], ips[idx+1:]...) - found = true - newClients = append(newClients, client) - break - } - } - - if !found { - changed = true + var newClients []*GrpcClient + for _, ip := range removed { + for _, client := range e.clients { + if ip.Equal(client.ip) { + mapModified = true log.Printf("Removing connection to %s", client.Target()) c.closeClient(client) c.wakeupForTesting() } } + } - for _, ip := range ips { - client, err := NewGrpcClient(target, ip, opts...) - if err != nil { - log.Printf("Error creating client to %s with IP %s: %s", target, ip.String(), err) - continue + for _, ip := range keep { + for _, client := range e.clients { + if ip.Equal(client.ip) { + newClients = append(newClients, client) } - - c.selfCheckWaitGroup.Add(1) - go c.checkIsSelf(context.Background(), target, client) - - log.Printf("Adding %s as GRPC target", client.Target()) - newClients = append(newClients, client) - changed = true - c.wakeupForTesting() - } - - if changed { - c.clientsMap[target] = newClients - mapModified = true } } + for _, ip := range added { + client, err := NewGrpcClient(target, ip, opts...) + if err != nil { + log.Printf("Error creating client to %s with IP %s: %s", target, ip.String(), err) + continue + } + + c.selfCheckWaitGroup.Add(1) + go c.checkIsSelf(context.Background(), target, client) + + log.Printf("Adding %s as GRPC target", client.Target()) + newClients = append(newClients, client) + mapModified = true + c.wakeupForTesting() + } + if mapModified { + c.clientsMap[target].clients = newClients + c.clients = make([]*GrpcClient, 0, len(c.clientsMap)) - for _, clients := range c.clientsMap { - c.clients = append(c.clients, clients...) + for _, entry := range c.clientsMap { + c.clients = append(c.clients, entry.clients...) } statsGrpcClients.Set(float64(len(c.clients))) } @@ -684,9 +669,11 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte log.Printf("Adding %s as GRPC target", cl.Target()) if c.clientsMap == nil { - c.clientsMap = make(map[string][]*GrpcClient) + c.clientsMap = make(map[string]*grpcClientsList) + } + c.clientsMap[info.Address] = &grpcClientsList{ + clients: []*GrpcClient{cl}, } - c.clientsMap[info.Address] = []*GrpcClient{cl} c.clients = append(c.clients, cl) c.targetInformation[key] = &info statsGrpcClients.Inc() @@ -709,19 +696,19 @@ func (c *GrpcClients) removeEtcdClientLocked(key string) { } delete(c.targetInformation, key) - clients, found := c.clientsMap[info.Address] + entry, found := c.clientsMap[info.Address] if !found { return } - for _, client := range clients { + for _, client := range entry.clients { log.Printf("Removing connection to %s (from %s)", client.Target(), key) c.closeClient(client) } delete(c.clientsMap, info.Address) c.clients = make([]*GrpcClient, 0, len(c.clientsMap)) - for _, clients := range c.clientsMap { - c.clients = append(c.clients, clients...) + for _, entry := range c.clientsMap { + c.clients = append(c.clients, entry.clients...) } statsGrpcClients.Dec() c.wakeupForTesting() @@ -757,21 +744,22 @@ func (c *GrpcClients) Close() { c.mu.Lock() defer c.mu.Unlock() - for _, clients := range c.clientsMap { - for _, client := range clients { + for _, entry := range c.clientsMap { + for _, client := range entry.clients { if err := client.Close(); err != nil { log.Printf("Error closing client to %s: %s", client.Target(), err) } } + + if entry.entry != nil { + c.dnsMonitor.Remove(entry.entry) + entry.entry = nil + } } c.clients = nil c.clientsMap = nil - if c.dnsDiscovery { - c.stopping <- struct{}{} - <-c.stopped - c.dnsDiscovery = false - } + c.dnsDiscovery = false if c.etcdClient != nil { c.etcdClient.RemoveListener(c) diff --git a/grpc_client_test.go b/grpc_client_test.go index bc63c9b..a2db4b3 100644 --- a/grpc_client_test.go +++ b/grpc_client_test.go @@ -46,8 +46,9 @@ func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} { return ch } -func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) *GrpcClients { - client, err := NewGrpcClients(config, etcdClient) +func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, *DnsMonitor) { + dnsMonitor := newDnsMonitorForTest(t, time.Hour) // will be updated manually + client, err := NewGrpcClients(config, etcdClient, dnsMonitor) if err != nil { t.Fatal(err) } @@ -55,10 +56,10 @@ func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, et client.Close() }) - return client + return client, dnsMonitor } -func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients { +func NewGrpcClientsForTest(t *testing.T, addr string) (*GrpcClients, *DnsMonitor) { config := goconf.NewConfigFile() config.AddOption("grpc", "targets", addr) config.AddOption("grpc", "dnsdiscovery", "true") @@ -66,7 +67,7 @@ func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients { return NewGrpcClientsForTestWithConfig(t, config, nil) } -func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients { +func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) (*GrpcClients, *DnsMonitor) { config := goconf.NewConfigFile() config.AddOption("etcd", "endpoints", etcd.Config().ListenClientUrls[0].String()) @@ -116,7 +117,7 @@ func Test_GrpcClients_EtcdInitial(t *testing.T) { SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}")) SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}")) - client := NewGrpcClientsWithEtcdForTest(t, etcd) + client, _ := NewGrpcClientsWithEtcdForTest(t, etcd) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() if err := client.WaitForInitialized(ctx); err != nil { @@ -130,7 +131,7 @@ func Test_GrpcClients_EtcdInitial(t *testing.T) { func Test_GrpcClients_EtcdUpdate(t *testing.T) { etcd := NewEtcdForTest(t) - client := NewGrpcClientsWithEtcdForTest(t, etcd) + client, _ := NewGrpcClientsWithEtcdForTest(t, etcd) ch := client.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(context.Background(), testTimeout) @@ -184,7 +185,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) { func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { etcd := NewEtcdForTest(t) - client := NewGrpcClientsWithEtcdForTest(t, etcd) + client, _ := NewGrpcClientsWithEtcdForTest(t, etcd) ch := client.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(context.Background(), testTimeout) @@ -227,26 +228,20 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) { } func Test_GrpcClients_DnsDiscovery(t *testing.T) { - var ipsResult []net.IP - lookupGrpcIp = func(host string) ([]net.IP, error) { - if host == "testgrpc" { - return ipsResult, nil - } - - return nil, fmt.Errorf("unknown host") - } + lookup := newMockDnsLookupForTest(t) target := "testgrpc:12345" ip1 := net.ParseIP("192.168.0.1") ip2 := net.ParseIP("192.168.0.2") targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1) targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2) - ipsResult = []net.IP{ip1} - client := NewGrpcClientsForTest(t, target) + lookup.Set("testgrpc", []net.IP{ip1}) + client, dnsMonitor := NewGrpcClientsForTest(t, target) ch := client.getWakeupChannelForTesting() ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() + dnsMonitor.checkHostnames() if clients := client.GetClients(); len(clients) != 1 { t.Errorf("Expected one client, got %+v", clients) } else if clients[0].Target() != targetWithIp1 { @@ -255,9 +250,9 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) { t.Errorf("Expected IP %s, got %s", ip1, clients[0].ip) } - ipsResult = []net.IP{ip1, ip2} + lookup.Set("testgrpc", []net.IP{ip1, ip2}) drainWakeupChannel(ch) - client.updateGrpcIPs() + dnsMonitor.checkHostnames() waitForEvent(ctx, t, ch) if clients := client.GetClients(); len(clients) != 2 { @@ -272,9 +267,9 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) { t.Errorf("Expected IP %s, got %s", ip2, clients[1].ip) } - ipsResult = []net.IP{ip2} + lookup.Set("testgrpc", []net.IP{ip2}) drainWakeupChannel(ch) - client.updateGrpcIPs() + dnsMonitor.checkHostnames() waitForEvent(ctx, t, ch) if clients := client.GetClients(); len(clients) != 1 { @@ -287,22 +282,11 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) { } func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) { - var ipsResult []net.IP - lookupGrpcIp = func(host string) ([]net.IP, error) { - if host == "testgrpc" && len(ipsResult) > 0 { - return ipsResult, nil - } - - return nil, &net.DNSError{ - Err: "no such host", - Name: host, - IsNotFound: true, - } - } + lookup := newMockDnsLookupForTest(t) target := "testgrpc:12345" ip1 := net.ParseIP("192.168.0.1") targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1) - client := NewGrpcClientsForTest(t, target) + client, dnsMonitor := NewGrpcClientsForTest(t, target) ch := client.getWakeupChannelForTesting() testCtx, testCtxCancel := context.WithTimeout(context.Background(), testTimeout) @@ -318,9 +302,9 @@ func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) { t.Errorf("Expected no client, got %+v", clients) } - ipsResult = []net.IP{ip1} + lookup.Set("testgrpc", []net.IP{ip1}) drainWakeupChannel(ch) - client.updateGrpcIPs() + dnsMonitor.checkHostnames() waitForEvent(testCtx, t, ch) if clients := client.GetClients(); len(clients) != 1 { @@ -370,7 +354,7 @@ func Test_GrpcClients_Encryption(t *testing.T) { clientConfig.AddOption("grpc", "clientcertificate", clientCertFile) clientConfig.AddOption("grpc", "clientkey", clientPrivkeyFile) clientConfig.AddOption("grpc", "serverca", serverCertFile) - clients := NewGrpcClientsForTestWithConfig(t, clientConfig, nil) + clients, _ := NewGrpcClientsForTestWithConfig(t, clientConfig, nil) ctx, cancel1 := context.WithTimeout(context.Background(), time.Second) defer cancel1() diff --git a/hub_test.go b/hub_test.go index f01684b..9321733 100644 --- a/hub_test.go +++ b/hub_test.go @@ -211,7 +211,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http if err != nil { t.Fatal(err) } - client1 := NewGrpcClientsForTest(t, addr2) + client1, _ := NewGrpcClientsForTest(t, addr2) h1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version") if err != nil { t.Fatal(err) @@ -231,7 +231,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http if err != nil { t.Fatal(err) } - client2 := NewGrpcClientsForTest(t, addr1) + client2, _ := NewGrpcClientsForTest(t, addr1) h2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version") if err != nil { t.Fatal(err) diff --git a/mcu_proxy.go b/mcu_proxy.go index 67ea4fb..9186a64 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -66,9 +66,6 @@ const ( defaultProxyTimeoutSeconds = 2 rttLogDuration = 500 * time.Millisecond - - // Update service IP addresses every 10 seconds. - updateDnsInterval = 10 * time.Second ) type McuProxy interface { diff --git a/server/main.go b/server/main.go index 8e7d944..53da6a4 100644 --- a/server/main.go +++ b/server/main.go @@ -183,7 +183,7 @@ func main() { }() defer rpcServer.Close() - rpcClients, err := signaling.NewGrpcClients(config, etcdClient) + rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor) if err != nil { log.Fatalf("Could not create RPC clients: %s", err) }