mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-05-11 18:16:33 +02:00
Use DNS monitor from static GRPC clients configuration.
This commit is contained in:
parent
8db4068989
commit
b1c78f6e9d
|
@ -169,7 +169,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
events1.Close()
|
events1.Close()
|
||||||
})
|
})
|
||||||
client1 := NewGrpcClientsForTest(t, addr2)
|
client1, _ := NewGrpcClientsForTest(t, addr2)
|
||||||
hub1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version")
|
hub1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -198,7 +198,7 @@ func CreateBackendServerWithClusteringForTestFromConfig(t *testing.T, config1 *g
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
events2.Close()
|
events2.Close()
|
||||||
})
|
})
|
||||||
client2 := NewGrpcClientsForTest(t, addr1)
|
client2, _ := NewGrpcClientsForTest(t, addr1)
|
||||||
hub2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version")
|
hub2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
|
@ -304,7 +304,7 @@ func TestDnsMonitorIP(t *testing.T) {
|
||||||
rec1 := newDnsMonitorReceiverForTest(t)
|
rec1 := newDnsMonitorReceiverForTest(t)
|
||||||
rec1.Expect(ips, ips, nil, nil)
|
rec1.Expect(ips, ips, nil, nil)
|
||||||
|
|
||||||
entry, err := monitor.Add("https://"+ip, rec1.OnLookup)
|
entry, err := monitor.Add(ip+":12345", rec1.OnLookup)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
262
grpc_client.go
262
grpc_client.go
|
@ -49,8 +49,6 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
lookupGrpcIp = net.LookupIP // can be overwritten from tests
|
|
||||||
|
|
||||||
customResolverPrefix atomic.Uint64
|
customResolverPrefix atomic.Uint64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -258,15 +256,19 @@ func (c *GrpcClient) GetSessionCount(ctx context.Context, u *url.URL) (uint32, e
|
||||||
return response.GetCount(), nil
|
return response.GetCount(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type grpcClientsList struct {
|
||||||
|
clients []*GrpcClient
|
||||||
|
entry *DnsMonitorEntry
|
||||||
|
}
|
||||||
|
|
||||||
type GrpcClients struct {
|
type GrpcClients struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
|
||||||
clientsMap map[string][]*GrpcClient
|
clientsMap map[string]*grpcClientsList
|
||||||
clients []*GrpcClient
|
clients []*GrpcClient
|
||||||
|
|
||||||
|
dnsMonitor *DnsMonitor
|
||||||
dnsDiscovery bool
|
dnsDiscovery bool
|
||||||
stopping chan struct{}
|
|
||||||
stopped chan struct{}
|
|
||||||
|
|
||||||
etcdClient *EtcdClient
|
etcdClient *EtcdClient
|
||||||
targetPrefix string
|
targetPrefix string
|
||||||
|
@ -280,15 +282,13 @@ type GrpcClients struct {
|
||||||
selfCheckWaitGroup sync.WaitGroup
|
selfCheckWaitGroup sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, error) {
|
func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor) (*GrpcClients, error) {
|
||||||
initializedCtx, initializedFunc := context.WithCancel(context.Background())
|
initializedCtx, initializedFunc := context.WithCancel(context.Background())
|
||||||
result := &GrpcClients{
|
result := &GrpcClients{
|
||||||
|
dnsMonitor: dnsMonitor,
|
||||||
etcdClient: etcdClient,
|
etcdClient: etcdClient,
|
||||||
initializedCtx: initializedCtx,
|
initializedCtx: initializedCtx,
|
||||||
initializedFunc: initializedFunc,
|
initializedFunc: initializedFunc,
|
||||||
|
|
||||||
stopping: make(chan struct{}, 1),
|
|
||||||
stopped: make(chan struct{}, 1),
|
|
||||||
}
|
}
|
||||||
if err := result.load(config, false); err != nil {
|
if err := result.load(config, false); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -313,9 +313,6 @@ func (c *GrpcClients) load(config *goconf.ConfigFile, fromReload bool) error {
|
||||||
switch targetType {
|
switch targetType {
|
||||||
case GrpcTargetTypeStatic:
|
case GrpcTargetTypeStatic:
|
||||||
err = c.loadTargetsStatic(config, fromReload, opts...)
|
err = c.loadTargetsStatic(config, fromReload, opts...)
|
||||||
if err == nil && c.dnsDiscovery {
|
|
||||||
go c.monitorGrpcIPs()
|
|
||||||
}
|
|
||||||
case GrpcTargetTypeEtcd:
|
case GrpcTargetTypeEtcd:
|
||||||
err = c.loadTargetsEtcd(config, fromReload, opts...)
|
err = c.loadTargetsEtcd(config, fromReload, opts...)
|
||||||
default:
|
default:
|
||||||
|
@ -344,7 +341,7 @@ func (c *GrpcClients) isClientAvailable(target string, client *GrpcClient) bool
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, entry := range entries {
|
for _, entry := range entries.clients {
|
||||||
if entry == client {
|
if entry == client {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -401,7 +398,20 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
clientsMap := make(map[string][]*GrpcClient)
|
dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery")
|
||||||
|
if dnsDiscovery != c.dnsDiscovery {
|
||||||
|
if !dnsDiscovery {
|
||||||
|
for _, entry := range c.clientsMap {
|
||||||
|
if entry.entry != nil {
|
||||||
|
c.dnsMonitor.Remove(entry.entry)
|
||||||
|
entry.entry = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.dnsDiscovery = dnsDiscovery
|
||||||
|
}
|
||||||
|
|
||||||
|
clientsMap := make(map[string]*grpcClientsList)
|
||||||
var clients []*GrpcClient
|
var clients []*GrpcClient
|
||||||
removeTargets := make(map[string]bool, len(c.clientsMap))
|
removeTargets := make(map[string]bool, len(c.clientsMap))
|
||||||
for target, entries := range c.clientsMap {
|
for target, entries := range c.clientsMap {
|
||||||
|
@ -417,7 +427,15 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo
|
||||||
}
|
}
|
||||||
|
|
||||||
if entries, found := clientsMap[target]; found {
|
if entries, found := clientsMap[target]; found {
|
||||||
clients = append(clients, entries...)
|
clients = append(clients, entries.clients...)
|
||||||
|
if dnsDiscovery && entries.entry == nil {
|
||||||
|
entry, err := c.dnsMonitor.Add(target, c.onLookup)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
entries.entry = entry
|
||||||
|
}
|
||||||
delete(removeTargets, target)
|
delete(removeTargets, target)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -427,61 +445,58 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo
|
||||||
host = h
|
host = h
|
||||||
}
|
}
|
||||||
|
|
||||||
var ips []net.IP
|
if dnsDiscovery && net.ParseIP(host) == nil {
|
||||||
if net.ParseIP(host) == nil {
|
|
||||||
// Use dedicated client for each IP address.
|
// Use dedicated client for each IP address.
|
||||||
var err error
|
entry, err := c.dnsMonitor.Add(target, c.onLookup)
|
||||||
ips, err = lookupGrpcIp(host)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Could not lookup %s: %s", host, err)
|
|
||||||
// Make sure updating continues even if initial lookup failed.
|
|
||||||
clientsMap[target] = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Connect directly to IP address.
|
|
||||||
ips = []net.IP{nil}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, ip := range ips {
|
|
||||||
client, err := NewGrpcClient(target, ip, opts...)
|
|
||||||
if err != nil {
|
|
||||||
for _, clients := range clientsMap {
|
|
||||||
for _, client := range clients {
|
|
||||||
c.closeClient(client)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.selfCheckWaitGroup.Add(1)
|
clientsMap[target] = &grpcClientsList{
|
||||||
go c.checkIsSelf(context.Background(), target, client)
|
entry: entry,
|
||||||
|
}
|
||||||
log.Printf("Adding %s as GRPC target", client.Target())
|
continue
|
||||||
clientsMap[target] = append(clientsMap[target], client)
|
|
||||||
clients = append(clients, client)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client, err := NewGrpcClient(target, nil, opts...)
|
||||||
|
if err != nil {
|
||||||
|
for _, entry := range clientsMap {
|
||||||
|
for _, client := range entry.clients {
|
||||||
|
c.closeClient(client)
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.entry != nil {
|
||||||
|
c.dnsMonitor.Remove(entry.entry)
|
||||||
|
entry.entry = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.selfCheckWaitGroup.Add(1)
|
||||||
|
go c.checkIsSelf(context.Background(), target, client)
|
||||||
|
|
||||||
|
log.Printf("Adding %s as GRPC target", client.Target())
|
||||||
|
entry, found := clientsMap[target]
|
||||||
|
if !found {
|
||||||
|
entry = &grpcClientsList{}
|
||||||
|
}
|
||||||
|
entry.clients = append(entry.clients, client)
|
||||||
|
clients = append(clients, client)
|
||||||
}
|
}
|
||||||
|
|
||||||
for target := range removeTargets {
|
for target := range removeTargets {
|
||||||
if clients, found := clientsMap[target]; found {
|
if entry, found := clientsMap[target]; found {
|
||||||
for _, client := range clients {
|
for _, client := range entry.clients {
|
||||||
log.Printf("Deleting GRPC target %s", client.Target())
|
log.Printf("Deleting GRPC target %s", client.Target())
|
||||||
c.closeClient(client)
|
c.closeClient(client)
|
||||||
}
|
}
|
||||||
delete(clientsMap, target)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
dnsDiscovery, _ := config.GetBool("grpc", "dnsdiscovery")
|
if entry.entry != nil {
|
||||||
if dnsDiscovery != c.dnsDiscovery {
|
c.dnsMonitor.Remove(entry.entry)
|
||||||
if !dnsDiscovery && fromReload {
|
entry.entry = nil
|
||||||
c.stopping <- struct{}{}
|
}
|
||||||
<-c.stopped
|
delete(clientsMap, target)
|
||||||
}
|
|
||||||
c.dnsDiscovery = dnsDiscovery
|
|
||||||
if dnsDiscovery && fromReload {
|
|
||||||
go c.monitorGrpcIPs()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -492,91 +507,61 @@ func (c *GrpcClients) loadTargetsStatic(config *goconf.ConfigFile, fromReload bo
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *GrpcClients) monitorGrpcIPs() {
|
func (c *GrpcClients) onLookup(entry *DnsMonitorEntry, all []net.IP, added []net.IP, keep []net.IP, removed []net.IP) {
|
||||||
log.Printf("Start monitoring GRPC client IPs")
|
|
||||||
ticker := time.NewTicker(updateDnsInterval)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
c.updateGrpcIPs()
|
|
||||||
case <-c.stopping:
|
|
||||||
c.stopped <- struct{}{}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *GrpcClients) updateGrpcIPs() {
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
target := entry.URL()
|
||||||
|
e, found := c.clientsMap[target]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
opts := c.dialOptions.Load().([]grpc.DialOption)
|
opts := c.dialOptions.Load().([]grpc.DialOption)
|
||||||
|
|
||||||
mapModified := false
|
mapModified := false
|
||||||
for target, clients := range c.clientsMap {
|
var newClients []*GrpcClient
|
||||||
host := target
|
for _, ip := range removed {
|
||||||
if h, _, err := net.SplitHostPort(target); err == nil {
|
for _, client := range e.clients {
|
||||||
host = h
|
if ip.Equal(client.ip) {
|
||||||
}
|
mapModified = true
|
||||||
|
|
||||||
if net.ParseIP(host) != nil {
|
|
||||||
// No need to lookup endpoints that connect to IP addresses.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ips, err := lookupGrpcIp(host)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Could not lookup %s: %s", host, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var newClients []*GrpcClient
|
|
||||||
changed := false
|
|
||||||
for _, client := range clients {
|
|
||||||
found := false
|
|
||||||
for idx, ip := range ips {
|
|
||||||
if ip.Equal(client.ip) {
|
|
||||||
ips = append(ips[:idx], ips[idx+1:]...)
|
|
||||||
found = true
|
|
||||||
newClients = append(newClients, client)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !found {
|
|
||||||
changed = true
|
|
||||||
log.Printf("Removing connection to %s", client.Target())
|
log.Printf("Removing connection to %s", client.Target())
|
||||||
c.closeClient(client)
|
c.closeClient(client)
|
||||||
c.wakeupForTesting()
|
c.wakeupForTesting()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, ip := range ips {
|
for _, ip := range keep {
|
||||||
client, err := NewGrpcClient(target, ip, opts...)
|
for _, client := range e.clients {
|
||||||
if err != nil {
|
if ip.Equal(client.ip) {
|
||||||
log.Printf("Error creating client to %s with IP %s: %s", target, ip.String(), err)
|
newClients = append(newClients, client)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.selfCheckWaitGroup.Add(1)
|
|
||||||
go c.checkIsSelf(context.Background(), target, client)
|
|
||||||
|
|
||||||
log.Printf("Adding %s as GRPC target", client.Target())
|
|
||||||
newClients = append(newClients, client)
|
|
||||||
changed = true
|
|
||||||
c.wakeupForTesting()
|
|
||||||
}
|
|
||||||
|
|
||||||
if changed {
|
|
||||||
c.clientsMap[target] = newClients
|
|
||||||
mapModified = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, ip := range added {
|
||||||
|
client, err := NewGrpcClient(target, ip, opts...)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error creating client to %s with IP %s: %s", target, ip.String(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.selfCheckWaitGroup.Add(1)
|
||||||
|
go c.checkIsSelf(context.Background(), target, client)
|
||||||
|
|
||||||
|
log.Printf("Adding %s as GRPC target", client.Target())
|
||||||
|
newClients = append(newClients, client)
|
||||||
|
mapModified = true
|
||||||
|
c.wakeupForTesting()
|
||||||
|
}
|
||||||
|
|
||||||
if mapModified {
|
if mapModified {
|
||||||
|
c.clientsMap[target].clients = newClients
|
||||||
|
|
||||||
c.clients = make([]*GrpcClient, 0, len(c.clientsMap))
|
c.clients = make([]*GrpcClient, 0, len(c.clientsMap))
|
||||||
for _, clients := range c.clientsMap {
|
for _, entry := range c.clientsMap {
|
||||||
c.clients = append(c.clients, clients...)
|
c.clients = append(c.clients, entry.clients...)
|
||||||
}
|
}
|
||||||
statsGrpcClients.Set(float64(len(c.clients)))
|
statsGrpcClients.Set(float64(len(c.clients)))
|
||||||
}
|
}
|
||||||
|
@ -684,9 +669,11 @@ func (c *GrpcClients) EtcdKeyUpdated(client *EtcdClient, key string, data []byte
|
||||||
log.Printf("Adding %s as GRPC target", cl.Target())
|
log.Printf("Adding %s as GRPC target", cl.Target())
|
||||||
|
|
||||||
if c.clientsMap == nil {
|
if c.clientsMap == nil {
|
||||||
c.clientsMap = make(map[string][]*GrpcClient)
|
c.clientsMap = make(map[string]*grpcClientsList)
|
||||||
|
}
|
||||||
|
c.clientsMap[info.Address] = &grpcClientsList{
|
||||||
|
clients: []*GrpcClient{cl},
|
||||||
}
|
}
|
||||||
c.clientsMap[info.Address] = []*GrpcClient{cl}
|
|
||||||
c.clients = append(c.clients, cl)
|
c.clients = append(c.clients, cl)
|
||||||
c.targetInformation[key] = &info
|
c.targetInformation[key] = &info
|
||||||
statsGrpcClients.Inc()
|
statsGrpcClients.Inc()
|
||||||
|
@ -709,19 +696,19 @@ func (c *GrpcClients) removeEtcdClientLocked(key string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(c.targetInformation, key)
|
delete(c.targetInformation, key)
|
||||||
clients, found := c.clientsMap[info.Address]
|
entry, found := c.clientsMap[info.Address]
|
||||||
if !found {
|
if !found {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, client := range clients {
|
for _, client := range entry.clients {
|
||||||
log.Printf("Removing connection to %s (from %s)", client.Target(), key)
|
log.Printf("Removing connection to %s (from %s)", client.Target(), key)
|
||||||
c.closeClient(client)
|
c.closeClient(client)
|
||||||
}
|
}
|
||||||
delete(c.clientsMap, info.Address)
|
delete(c.clientsMap, info.Address)
|
||||||
c.clients = make([]*GrpcClient, 0, len(c.clientsMap))
|
c.clients = make([]*GrpcClient, 0, len(c.clientsMap))
|
||||||
for _, clients := range c.clientsMap {
|
for _, entry := range c.clientsMap {
|
||||||
c.clients = append(c.clients, clients...)
|
c.clients = append(c.clients, entry.clients...)
|
||||||
}
|
}
|
||||||
statsGrpcClients.Dec()
|
statsGrpcClients.Dec()
|
||||||
c.wakeupForTesting()
|
c.wakeupForTesting()
|
||||||
|
@ -757,21 +744,22 @@ func (c *GrpcClients) Close() {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
for _, clients := range c.clientsMap {
|
for _, entry := range c.clientsMap {
|
||||||
for _, client := range clients {
|
for _, client := range entry.clients {
|
||||||
if err := client.Close(); err != nil {
|
if err := client.Close(); err != nil {
|
||||||
log.Printf("Error closing client to %s: %s", client.Target(), err)
|
log.Printf("Error closing client to %s: %s", client.Target(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if entry.entry != nil {
|
||||||
|
c.dnsMonitor.Remove(entry.entry)
|
||||||
|
entry.entry = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c.clients = nil
|
c.clients = nil
|
||||||
c.clientsMap = nil
|
c.clientsMap = nil
|
||||||
if c.dnsDiscovery {
|
c.dnsDiscovery = false
|
||||||
c.stopping <- struct{}{}
|
|
||||||
<-c.stopped
|
|
||||||
c.dnsDiscovery = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.etcdClient != nil {
|
if c.etcdClient != nil {
|
||||||
c.etcdClient.RemoveListener(c)
|
c.etcdClient.RemoveListener(c)
|
||||||
|
|
|
@ -46,8 +46,9 @@ func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} {
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) *GrpcClients {
|
func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, *DnsMonitor) {
|
||||||
client, err := NewGrpcClients(config, etcdClient)
|
dnsMonitor := newDnsMonitorForTest(t, time.Hour) // will be updated manually
|
||||||
|
client, err := NewGrpcClients(config, etcdClient, dnsMonitor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -55,10 +56,10 @@ func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, et
|
||||||
client.Close()
|
client.Close()
|
||||||
})
|
})
|
||||||
|
|
||||||
return client
|
return client, dnsMonitor
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients {
|
func NewGrpcClientsForTest(t *testing.T, addr string) (*GrpcClients, *DnsMonitor) {
|
||||||
config := goconf.NewConfigFile()
|
config := goconf.NewConfigFile()
|
||||||
config.AddOption("grpc", "targets", addr)
|
config.AddOption("grpc", "targets", addr)
|
||||||
config.AddOption("grpc", "dnsdiscovery", "true")
|
config.AddOption("grpc", "dnsdiscovery", "true")
|
||||||
|
@ -66,7 +67,7 @@ func NewGrpcClientsForTest(t *testing.T, addr string) *GrpcClients {
|
||||||
return NewGrpcClientsForTestWithConfig(t, config, nil)
|
return NewGrpcClientsForTestWithConfig(t, config, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) *GrpcClients {
|
func NewGrpcClientsWithEtcdForTest(t *testing.T, etcd *embed.Etcd) (*GrpcClients, *DnsMonitor) {
|
||||||
config := goconf.NewConfigFile()
|
config := goconf.NewConfigFile()
|
||||||
config.AddOption("etcd", "endpoints", etcd.Config().ListenClientUrls[0].String())
|
config.AddOption("etcd", "endpoints", etcd.Config().ListenClientUrls[0].String())
|
||||||
|
|
||||||
|
@ -116,7 +117,7 @@ func Test_GrpcClients_EtcdInitial(t *testing.T) {
|
||||||
SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}"))
|
SetEtcdValue(etcd, "/grpctargets/one", []byte("{\"address\":\""+addr1+"\"}"))
|
||||||
SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}"))
|
SetEtcdValue(etcd, "/grpctargets/two", []byte("{\"address\":\""+addr2+"\"}"))
|
||||||
|
|
||||||
client := NewGrpcClientsWithEtcdForTest(t, etcd)
|
client, _ := NewGrpcClientsWithEtcdForTest(t, etcd)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := client.WaitForInitialized(ctx); err != nil {
|
if err := client.WaitForInitialized(ctx); err != nil {
|
||||||
|
@ -130,7 +131,7 @@ func Test_GrpcClients_EtcdInitial(t *testing.T) {
|
||||||
|
|
||||||
func Test_GrpcClients_EtcdUpdate(t *testing.T) {
|
func Test_GrpcClients_EtcdUpdate(t *testing.T) {
|
||||||
etcd := NewEtcdForTest(t)
|
etcd := NewEtcdForTest(t)
|
||||||
client := NewGrpcClientsWithEtcdForTest(t, etcd)
|
client, _ := NewGrpcClientsWithEtcdForTest(t, etcd)
|
||||||
ch := client.getWakeupChannelForTesting()
|
ch := client.getWakeupChannelForTesting()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
|
@ -184,7 +185,7 @@ func Test_GrpcClients_EtcdUpdate(t *testing.T) {
|
||||||
|
|
||||||
func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) {
|
func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) {
|
||||||
etcd := NewEtcdForTest(t)
|
etcd := NewEtcdForTest(t)
|
||||||
client := NewGrpcClientsWithEtcdForTest(t, etcd)
|
client, _ := NewGrpcClientsWithEtcdForTest(t, etcd)
|
||||||
ch := client.getWakeupChannelForTesting()
|
ch := client.getWakeupChannelForTesting()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
|
@ -227,26 +228,20 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_GrpcClients_DnsDiscovery(t *testing.T) {
|
func Test_GrpcClients_DnsDiscovery(t *testing.T) {
|
||||||
var ipsResult []net.IP
|
lookup := newMockDnsLookupForTest(t)
|
||||||
lookupGrpcIp = func(host string) ([]net.IP, error) {
|
|
||||||
if host == "testgrpc" {
|
|
||||||
return ipsResult, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("unknown host")
|
|
||||||
}
|
|
||||||
target := "testgrpc:12345"
|
target := "testgrpc:12345"
|
||||||
ip1 := net.ParseIP("192.168.0.1")
|
ip1 := net.ParseIP("192.168.0.1")
|
||||||
ip2 := net.ParseIP("192.168.0.2")
|
ip2 := net.ParseIP("192.168.0.2")
|
||||||
targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1)
|
targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1)
|
||||||
targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2)
|
targetWithIp2 := fmt.Sprintf("%s (%s)", target, ip2)
|
||||||
ipsResult = []net.IP{ip1}
|
lookup.Set("testgrpc", []net.IP{ip1})
|
||||||
client := NewGrpcClientsForTest(t, target)
|
client, dnsMonitor := NewGrpcClientsForTest(t, target)
|
||||||
ch := client.getWakeupChannelForTesting()
|
ch := client.getWakeupChannelForTesting()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
dnsMonitor.checkHostnames()
|
||||||
if clients := client.GetClients(); len(clients) != 1 {
|
if clients := client.GetClients(); len(clients) != 1 {
|
||||||
t.Errorf("Expected one client, got %+v", clients)
|
t.Errorf("Expected one client, got %+v", clients)
|
||||||
} else if clients[0].Target() != targetWithIp1 {
|
} else if clients[0].Target() != targetWithIp1 {
|
||||||
|
@ -255,9 +250,9 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) {
|
||||||
t.Errorf("Expected IP %s, got %s", ip1, clients[0].ip)
|
t.Errorf("Expected IP %s, got %s", ip1, clients[0].ip)
|
||||||
}
|
}
|
||||||
|
|
||||||
ipsResult = []net.IP{ip1, ip2}
|
lookup.Set("testgrpc", []net.IP{ip1, ip2})
|
||||||
drainWakeupChannel(ch)
|
drainWakeupChannel(ch)
|
||||||
client.updateGrpcIPs()
|
dnsMonitor.checkHostnames()
|
||||||
waitForEvent(ctx, t, ch)
|
waitForEvent(ctx, t, ch)
|
||||||
|
|
||||||
if clients := client.GetClients(); len(clients) != 2 {
|
if clients := client.GetClients(); len(clients) != 2 {
|
||||||
|
@ -272,9 +267,9 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) {
|
||||||
t.Errorf("Expected IP %s, got %s", ip2, clients[1].ip)
|
t.Errorf("Expected IP %s, got %s", ip2, clients[1].ip)
|
||||||
}
|
}
|
||||||
|
|
||||||
ipsResult = []net.IP{ip2}
|
lookup.Set("testgrpc", []net.IP{ip2})
|
||||||
drainWakeupChannel(ch)
|
drainWakeupChannel(ch)
|
||||||
client.updateGrpcIPs()
|
dnsMonitor.checkHostnames()
|
||||||
waitForEvent(ctx, t, ch)
|
waitForEvent(ctx, t, ch)
|
||||||
|
|
||||||
if clients := client.GetClients(); len(clients) != 1 {
|
if clients := client.GetClients(); len(clients) != 1 {
|
||||||
|
@ -287,22 +282,11 @@ func Test_GrpcClients_DnsDiscovery(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) {
|
func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) {
|
||||||
var ipsResult []net.IP
|
lookup := newMockDnsLookupForTest(t)
|
||||||
lookupGrpcIp = func(host string) ([]net.IP, error) {
|
|
||||||
if host == "testgrpc" && len(ipsResult) > 0 {
|
|
||||||
return ipsResult, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, &net.DNSError{
|
|
||||||
Err: "no such host",
|
|
||||||
Name: host,
|
|
||||||
IsNotFound: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
target := "testgrpc:12345"
|
target := "testgrpc:12345"
|
||||||
ip1 := net.ParseIP("192.168.0.1")
|
ip1 := net.ParseIP("192.168.0.1")
|
||||||
targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1)
|
targetWithIp1 := fmt.Sprintf("%s (%s)", target, ip1)
|
||||||
client := NewGrpcClientsForTest(t, target)
|
client, dnsMonitor := NewGrpcClientsForTest(t, target)
|
||||||
ch := client.getWakeupChannelForTesting()
|
ch := client.getWakeupChannelForTesting()
|
||||||
|
|
||||||
testCtx, testCtxCancel := context.WithTimeout(context.Background(), testTimeout)
|
testCtx, testCtxCancel := context.WithTimeout(context.Background(), testTimeout)
|
||||||
|
@ -318,9 +302,9 @@ func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) {
|
||||||
t.Errorf("Expected no client, got %+v", clients)
|
t.Errorf("Expected no client, got %+v", clients)
|
||||||
}
|
}
|
||||||
|
|
||||||
ipsResult = []net.IP{ip1}
|
lookup.Set("testgrpc", []net.IP{ip1})
|
||||||
drainWakeupChannel(ch)
|
drainWakeupChannel(ch)
|
||||||
client.updateGrpcIPs()
|
dnsMonitor.checkHostnames()
|
||||||
waitForEvent(testCtx, t, ch)
|
waitForEvent(testCtx, t, ch)
|
||||||
|
|
||||||
if clients := client.GetClients(); len(clients) != 1 {
|
if clients := client.GetClients(); len(clients) != 1 {
|
||||||
|
@ -370,7 +354,7 @@ func Test_GrpcClients_Encryption(t *testing.T) {
|
||||||
clientConfig.AddOption("grpc", "clientcertificate", clientCertFile)
|
clientConfig.AddOption("grpc", "clientcertificate", clientCertFile)
|
||||||
clientConfig.AddOption("grpc", "clientkey", clientPrivkeyFile)
|
clientConfig.AddOption("grpc", "clientkey", clientPrivkeyFile)
|
||||||
clientConfig.AddOption("grpc", "serverca", serverCertFile)
|
clientConfig.AddOption("grpc", "serverca", serverCertFile)
|
||||||
clients := NewGrpcClientsForTestWithConfig(t, clientConfig, nil)
|
clients, _ := NewGrpcClientsForTestWithConfig(t, clientConfig, nil)
|
||||||
|
|
||||||
ctx, cancel1 := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel1 := context.WithTimeout(context.Background(), time.Second)
|
||||||
defer cancel1()
|
defer cancel1()
|
||||||
|
|
|
@ -211,7 +211,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
client1 := NewGrpcClientsForTest(t, addr2)
|
client1, _ := NewGrpcClientsForTest(t, addr2)
|
||||||
h1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version")
|
h1, err := NewHub(config1, events1, grpcServer1, client1, nil, r1, "no-version")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -231,7 +231,7 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
client2 := NewGrpcClientsForTest(t, addr1)
|
client2, _ := NewGrpcClientsForTest(t, addr1)
|
||||||
h2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version")
|
h2, err := NewHub(config2, events2, grpcServer2, client2, nil, r2, "no-version")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
|
@ -66,9 +66,6 @@ const (
|
||||||
defaultProxyTimeoutSeconds = 2
|
defaultProxyTimeoutSeconds = 2
|
||||||
|
|
||||||
rttLogDuration = 500 * time.Millisecond
|
rttLogDuration = 500 * time.Millisecond
|
||||||
|
|
||||||
// Update service IP addresses every 10 seconds.
|
|
||||||
updateDnsInterval = 10 * time.Second
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type McuProxy interface {
|
type McuProxy interface {
|
||||||
|
|
|
@ -183,7 +183,7 @@ func main() {
|
||||||
}()
|
}()
|
||||||
defer rpcServer.Close()
|
defer rpcServer.Close()
|
||||||
|
|
||||||
rpcClients, err := signaling.NewGrpcClients(config, etcdClient)
|
rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Could not create RPC clients: %s", err)
|
log.Fatalf("Could not create RPC clients: %s", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue