From 659730d3715534f7f6b56be79227bf085fde87fa Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Mon, 4 Apr 2022 15:39:49 +0200 Subject: [PATCH] Support DNS discovery for proxy server URLs. If the hostname of a proxy server resolves to multiple IP addresses, a connection is established to each of them. Changes to the DNS are monitored regularly and proxy connections are created or deleted as necessary. --- mcu_proxy.go | 473 ++++++++++++++++++++++++++++++++++++------------- server.conf.in | 7 + 2 files changed, 355 insertions(+), 125 deletions(-) diff --git a/mcu_proxy.go b/mcu_proxy.go index 016c5e0..0d3324e 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -29,6 +29,7 @@ import ( "fmt" "io/ioutil" "log" + "net" "net/http" "net/url" "sort" @@ -68,6 +69,9 @@ const ( defaultProxyTimeoutSeconds = 2 rttLogDuration = 500 * time.Millisecond + + // Update service IP addresses every 10 seconds. + updateDnsInterval = 10 * time.Second ) type mcuProxyPubSubCommon struct { @@ -93,7 +97,7 @@ func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClie } if proxyDebugMessages { - log.Printf("Response from %s: %+v", c.conn.url, response) + log.Printf("Response from %s: %+v", c.conn, response) } if response.Type == "error" { callback(response.Error, nil) @@ -112,7 +116,7 @@ func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadPr case "candidate": c.listener.OnIceCandidate(client, msg.Payload["candidate"]) default: - log.Printf("Unsupported payload from %s: %+v", c.conn.url, msg) + log.Printf("Unsupported payload from %s: %+v", c.conn, msg) } } @@ -157,11 +161,11 @@ func (p *mcuProxyPublisher) Close(ctx context.Context) { } if _, err := p.conn.performSyncRequest(ctx, msg); err != nil { - log.Printf("Could not delete publisher %s at %s: %s", p.proxyId, p.conn.url, err) + log.Printf("Could not delete publisher %s at %s: %s", p.proxyId, p.conn, err) return } - log.Printf("Delete publisher %s at %s", p.proxyId, p.conn.url) + log.Printf("Delete publisher %s at %s", p.proxyId, p.conn) } func (p *mcuProxyPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { @@ -188,7 +192,7 @@ func (p *mcuProxyPublisher) ProcessEvent(msg *EventProxyServerMessage) { case "publisher-closed": p.NotifyClosed() default: - log.Printf("Unsupported event from %s: %+v", p.conn.url, msg) + log.Printf("Unsupported event from %s: %+v", p.conn, msg) } } @@ -232,11 +236,11 @@ func (s *mcuProxySubscriber) Close(ctx context.Context) { } if _, err := s.conn.performSyncRequest(ctx, msg); err != nil { - log.Printf("Could not delete subscriber %s at %s: %s", s.proxyId, s.conn.url, err) + log.Printf("Could not delete subscriber %s at %s: %s", s.proxyId, s.conn, err) return } - log.Printf("Delete subscriber %s at %s", s.proxyId, s.conn.url) + log.Printf("Delete subscriber %s at %s", s.proxyId, s.conn) } func (s *mcuProxySubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { @@ -263,7 +267,7 @@ func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) { case "subscriber-closed": s.NotifyClosed() default: - log.Printf("Unsupported event from %s: %+v", s.conn.url, msg) + log.Printf("Unsupported event from %s: %+v", s.conn, msg) } } @@ -276,6 +280,7 @@ type mcuProxyConnection struct { proxy *mcuProxy rawUrl string url *url.URL + ip net.IP mu sync.Mutex closeChan chan bool @@ -303,7 +308,7 @@ type mcuProxyConnection struct { subscribers map[string]*mcuProxySubscriber } -func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection, error) { +func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP) (*mcuProxyConnection, error) { parsed, err := url.Parse(baseUrl) if err != nil { return nil, err @@ -313,6 +318,7 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection proxy: proxy, rawUrl: baseUrl, url: parsed, + ip: ip, closeChan: make(chan bool, 1), closedChan: make(chan bool, 1), reconnectInterval: int64(initialReconnectInterval), @@ -326,8 +332,17 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection return conn, nil } +func (c *mcuProxyConnection) String() string { + if c.ip != nil { + return fmt.Sprintf("%s (%s)", c.rawUrl, c.ip) + } + + return c.rawUrl +} + type mcuProxyConnectionStats struct { Url string `json:"url"` + IP net.IP `json:"ip,omitempty"` Connected bool `json:"connected"` Publishers int64 `json:"publishers"` Clients int64 `json:"clients"` @@ -339,6 +354,7 @@ type mcuProxyConnectionStats struct { func (c *mcuProxyConnection) GetStats() *mcuProxyConnectionStats { result := &mcuProxyConnectionStats{ Url: c.url.String(), + IP: c.ip, } c.mu.Lock() if c.conn != nil { @@ -397,7 +413,7 @@ func (c *mcuProxyConnection) readPump() { rtt := now.Sub(time.Unix(0, ts)) if rtt >= rttLogDuration { rtt_ms := rtt.Nanoseconds() / time.Millisecond.Nanoseconds() - log.Printf("Proxy at %s has RTT of %d ms (%s)", c.url, rtt_ms, rtt) + log.Printf("Proxy at %s has RTT of %d ms (%s)", c, rtt_ms, rtt) } } return nil @@ -411,14 +427,14 @@ func (c *mcuProxyConnection) readPump() { websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) { - log.Printf("Error reading from %s: %v", c.url, err) + log.Printf("Error reading from %s: %v", c, err) } break } var msg ProxyServerMessage if err := json.Unmarshal(message, &msg); err != nil { - log.Printf("Error unmarshaling %s from %s: %s", string(message), c.url, err) + log.Printf("Error unmarshaling %s from %s: %s", string(message), c, err) continue } @@ -437,7 +453,7 @@ func (c *mcuProxyConnection) sendPing() bool { msg := strconv.FormatInt(now.UnixNano(), 10) c.conn.SetWriteDeadline(now.Add(writeWait)) // nolint if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil { - log.Printf("Could not send ping to proxy at %s: %v", c.url, err) + log.Printf("Could not send ping to proxy at %s: %v", c, err) c.scheduleReconnect() return false } @@ -489,7 +505,7 @@ func (c *mcuProxyConnection) stop(ctx context.Context) { c.closeChan <- true if err := c.sendClose(); err != nil { if err != ErrNotConnected { - log.Printf("Could not send close message to %s: %s", c.url, err) + log.Printf("Could not send close message to %s: %s", c, err) } c.close() return @@ -499,7 +515,7 @@ func (c *mcuProxyConnection) stop(ctx context.Context) { case <-c.closedChan: case <-ctx.Done(): if err := ctx.Err(); err != nil { - log.Printf("Error waiting for connection to %s get closed: %s", c.url, err) + log.Printf("Error waiting for connection to %s get closed: %s", c, err) c.close() } } @@ -534,7 +550,7 @@ func (c *mcuProxyConnection) closeIfEmpty() bool { c.subscribersLock.RUnlock() if total > 0 { // Connection will be closed once all clients have disconnected. - log.Printf("Connection to %s is still used by %d clients, defer closing", c.url, total) + log.Printf("Connection to %s is still used by %d clients, defer closing", c, total) return false } @@ -542,7 +558,7 @@ func (c *mcuProxyConnection) closeIfEmpty() bool { ctx, cancel := context.WithTimeout(context.Background(), closeTimeout) defer cancel() - log.Printf("All clients disconnected, closing connection to %s", c.url) + log.Printf("All clients disconnected, closing connection to %s", c) c.stop(ctx) c.proxy.removeConnection(c) @@ -552,7 +568,7 @@ func (c *mcuProxyConnection) closeIfEmpty() bool { func (c *mcuProxyConnection) scheduleReconnect() { if err := c.sendClose(); err != nil && err != ErrNotConnected { - log.Printf("Could not send close message to %s: %s", c.url, err) + log.Printf("Could not send close message to %s: %s", c, err) } c.close() @@ -569,7 +585,7 @@ func (c *mcuProxyConnection) scheduleReconnect() { func (c *mcuProxyConnection) reconnect() { u, err := c.url.Parse("proxy") if err != nil { - log.Printf("Could not resolve url to proxy at %s: %s", c.url, err) + log.Printf("Could not resolve url to proxy at %s: %s", c, err) c.scheduleReconnect() return } @@ -579,14 +595,31 @@ func (c *mcuProxyConnection) reconnect() { u.Scheme = "wss" } - conn, _, err := c.proxy.dialer.Dial(u.String(), nil) + dialer := c.proxy.dialer + if c.ip != nil { + dialer = &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: c.proxy.dialer.HandshakeTimeout, + TLSClientConfig: c.proxy.dialer.TLSClientConfig, + + // Override DNS lookup and connect to custom IP address. + NetDialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + if _, port, err := net.SplitHostPort(addr); err == nil { + addr = net.JoinHostPort(c.ip.String(), port) + } + + return net.Dial(network, addr) + }, + } + } + conn, _, err := dialer.Dial(u.String(), nil) if err != nil { - log.Printf("Could not connect to %s: %s", u, err) + log.Printf("Could not connect to %s: %s", c, err) c.scheduleReconnect() return } - log.Printf("Connected to %s", u) + log.Printf("Connected to %s", c) atomic.StoreUint32(&c.closed, 0) c.mu.Lock() @@ -597,7 +630,7 @@ func (c *mcuProxyConnection) reconnect() { atomic.StoreInt64(&c.reconnectInterval, int64(initialReconnectInterval)) atomic.StoreUint32(&c.shutdownScheduled, 0) if err := c.sendHello(); err != nil { - log.Printf("Could not send hello request to %s: %s", c.url, err) + log.Printf("Could not send hello request to %s: %s", c, err) c.scheduleReconnect() return } @@ -697,19 +730,19 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) { switch msg.Type { case "error": if msg.Error.Code == "no_such_session" { - log.Printf("Session %s could not be resumed on %s, registering new", c.sessionId, c.url) + log.Printf("Session %s could not be resumed on %s, registering new", c.sessionId, c) c.clearPublishers() c.clearSubscribers() c.clearCallbacks() c.sessionId = "" if err := c.sendHello(); err != nil { - log.Printf("Could not send hello request to %s: %s", c.url, err) + log.Printf("Could not send hello request to %s: %s", c, err) c.scheduleReconnect() } return } - log.Printf("Hello connection to %s failed with %+v, reconnecting", c.url, msg.Error) + log.Printf("Hello connection to %s failed with %+v, reconnecting", c, msg.Error) c.scheduleReconnect() case "hello": resumed := c.sessionId == msg.Hello.SessionId @@ -717,30 +750,30 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) { country := "" if msg.Hello.Server != nil { if country = msg.Hello.Server.Country; country != "" && !IsValidCountry(country) { - log.Printf("Proxy %s sent invalid country %s in hello response", c.url, country) + log.Printf("Proxy %s sent invalid country %s in hello response", c, country) country = "" } } c.country.Store(country) if resumed { - log.Printf("Resumed session %s on %s", c.sessionId, c.url) + log.Printf("Resumed session %s on %s", c.sessionId, c) } else if country != "" { - log.Printf("Received session %s from %s (in %s)", c.sessionId, c.url, country) + log.Printf("Received session %s from %s (in %s)", c.sessionId, c, country) } else { - log.Printf("Received session %s from %s", c.sessionId, c.url) + log.Printf("Received session %s from %s", c.sessionId, c) } if atomic.CompareAndSwapUint32(&c.trackClose, 0, 1) { statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Inc() } default: - log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c.url) + log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c) c.scheduleReconnect() } return } if proxyDebugMessages { - log.Printf("Received from %s: %+v", c.url, msg) + log.Printf("Received from %s: %+v", c, msg) } callback := c.getCallback(msg.Id) if callback != nil { @@ -756,7 +789,7 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) { case "bye": c.processBye(msg) default: - log.Printf("Unsupported message received from %s: %+v", c.url, msg) + log.Printf("Unsupported message received from %s: %+v", c, msg) } } @@ -778,37 +811,37 @@ func (c *mcuProxyConnection) processPayload(msg *ProxyServerMessage) { return } - log.Printf("Received payload for unknown client %+v from %s", payload, c.url) + log.Printf("Received payload for unknown client %+v from %s", payload, c) } func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) { event := msg.Event switch event.Type { case "backend-disconnected": - log.Printf("Upstream backend at %s got disconnected, reset MCU objects", c.url) + log.Printf("Upstream backend at %s got disconnected, reset MCU objects", c) c.clearPublishers() c.clearSubscribers() c.clearCallbacks() // TODO: Should we also reconnect? return case "backend-connected": - log.Printf("Upstream backend at %s is connected", c.url) + log.Printf("Upstream backend at %s is connected", c) return case "update-load": if proxyDebugMessages { - log.Printf("Load of %s now at %d", c.url, event.Load) + log.Printf("Load of %s now at %d", c, event.Load) } atomic.StoreInt64(&c.load, event.Load) statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load)) return case "shutdown-scheduled": - log.Printf("Proxy %s is scheduled to shutdown", c.url) + log.Printf("Proxy %s is scheduled to shutdown", c) atomic.StoreUint32(&c.shutdownScheduled, 1) return } if proxyDebugMessages { - log.Printf("Process event from %s: %+v", c.url, event) + log.Printf("Process event from %s: %+v", c, event) } c.publishersLock.RLock() publisher, found := c.publishers[event.ClientId] @@ -826,17 +859,17 @@ func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) { return } - log.Printf("Received event for unknown client %+v from %s", event, c.url) + log.Printf("Received event for unknown client %+v from %s", event, c) } func (c *mcuProxyConnection) processBye(msg *ProxyServerMessage) { bye := msg.Bye switch bye.Reason { case "session_resumed": - log.Printf("Session %s on %s was resumed by other client, resetting", c.sessionId, c.url) + log.Printf("Session %s on %s was resumed by other client, resetting", c.sessionId, c) c.sessionId = "" default: - log.Printf("Received bye with unsupported reason from %s %+v", c.url, bye) + log.Printf("Received bye with unsupported reason from %s %+v", c, bye) } } @@ -878,7 +911,7 @@ func (c *mcuProxyConnection) sendMessage(msg *ProxyClientMessage) error { func (c *mcuProxyConnection) sendMessageLocked(msg *ProxyClientMessage) error { if proxyDebugMessages { - log.Printf("Send message to %s: %+v", c.url, msg) + log.Printf("Send message to %s: %+v", c, msg) } if c.conn == nil { return ErrNotConnected @@ -946,7 +979,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe } proxyId := response.Command.Id - log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c.url, id) + log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id) publisher := newMcuProxyPublisher(id, streamType, mediaTypes, proxyId, c, listener) c.publishersLock.Lock() c.publishers[proxyId] = publisher @@ -981,7 +1014,7 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList } proxyId := response.Command.Id - log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c.url, publisher) + log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisher) subscriber := newMcuProxySubscriber(publisher, streamType, proxyId, c, listener) c.subscribersLock.Lock() c.subscribers[proxyId] = subscriber @@ -996,6 +1029,7 @@ type mcuProxy struct { connRequests int64 nextSort int64 + urlType string tokenId string tokenKey *rsa.PrivateKey @@ -1006,10 +1040,14 @@ type mcuProxy struct { dialer *websocket.Dialer connections []*mcuProxyConnection - connectionsMap map[string]*mcuProxyConnection + connectionsMap map[string][]*mcuProxyConnection connectionsMu sync.RWMutex proxyTimeout time.Duration + dnsDiscovery bool + stopping chan bool + stopped chan bool + maxStreamBitrate int maxScreenBitrate int @@ -1024,6 +1062,9 @@ type mcuProxy struct { func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) { urlType, _ := config.GetString("mcu", "urltype") + if urlType == "" { + urlType = proxyUrlTypeStatic + } tokenId, _ := config.GetString("mcu", "token_id") if tokenId == "" { @@ -1059,6 +1100,7 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) { } mcu := &mcuProxy{ + urlType: urlType, tokenId: tokenId, tokenKey: tokenKey, @@ -1066,9 +1108,12 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) { Proxy: http.ProxyFromEnvironment, HandshakeTimeout: proxyTimeout, }, - connectionsMap: make(map[string]*mcuProxyConnection), + connectionsMap: make(map[string][]*mcuProxyConnection), proxyTimeout: proxyTimeout, + stopping: make(chan bool, 1), + stopped: make(chan bool, 1), + maxStreamBitrate: maxStreamBitrate, maxScreenBitrate: maxScreenBitrate, @@ -1089,21 +1134,10 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) { } } - if urlType == "" { - urlType = proxyUrlTypeStatic - } - switch urlType { case proxyUrlTypeStatic: - mcuUrl, _ := config.GetString("mcu", "url") - for _, u := range strings.Split(mcuUrl, " ") { - conn, err := newMcuProxyConnection(mcu, u) - if err != nil { - return nil, err - } - - mcu.connections = append(mcu.connections, conn) - mcu.connectionsMap[u] = conn + if err := mcu.configureStatic(config, false); err != nil { + return nil, err } if len(mcu.connections) == 0 { return nil, fmt.Errorf("No MCU proxy connections configured") @@ -1180,6 +1214,11 @@ func (m *mcuProxy) Start() error { return err } } + + if m.urlType == proxyUrlTypeStatic && m.dnsDiscovery { + go m.monitorProxyIPs() + } + return nil } @@ -1192,6 +1231,218 @@ func (m *mcuProxy) Stop() { defer cancel() c.stop(ctx) } + + if m.urlType == proxyUrlTypeStatic && m.dnsDiscovery { + m.stopping <- true + <-m.stopped + } +} + +func (m *mcuProxy) monitorProxyIPs() { + log.Printf("Start monitoring proxy IPs") + ticker := time.NewTicker(updateDnsInterval) + for { + select { + case <-ticker.C: + m.updateProxyIPs() + case <-m.stopping: + m.stopped <- true + return + } + } +} + +func (m *mcuProxy) updateProxyIPs() { + m.connectionsMu.Lock() + defer m.connectionsMu.Unlock() + + for u, conns := range m.connectionsMap { + if len(conns) == 0 { + continue + } + + host := conns[0].url.Host + if h, _, err := net.SplitHostPort(host); err == nil { + host = h + } + + ips, err := net.LookupIP(host) + if err != nil { + log.Printf("Could not lookup %s: %s", host, err) + continue + } + + var newConns []*mcuProxyConnection + changed := false + for _, conn := range conns { + found := false + for idx, ip := range ips { + if ip.Equal(conn.ip) { + ips = append(ips[:idx], ips[idx+1:]...) + found = true + conn.stopCloseIfEmpty() + newConns = append(newConns, conn) + break + } + } + + if !found { + changed = true + log.Printf("Removing connection to %s", conn) + conn.closeIfEmpty() + } + } + + for _, ip := range ips { + conn, err := newMcuProxyConnection(m, u, ip) + if err != nil { + log.Printf("Could not create proxy connection to %s (%s): %s", u, ip, err) + continue + } + + if err := conn.start(); err != nil { + log.Printf("Could not start new connection to %s: %s", conn, err) + continue + } + + log.Printf("Adding new connection to %s", conn) + m.connections = append(m.connections, conn) + newConns = append(newConns, conn) + changed = true + } + + if changed { + m.connectionsMap[u] = newConns + } + } +} + +func (m *mcuProxy) configureStatic(config *goconf.ConfigFile, fromReload bool) error { + m.connectionsMu.Lock() + defer m.connectionsMu.Unlock() + + remove := make(map[string][]*mcuProxyConnection) + for u, conns := range m.connectionsMap { + remove[u] = conns + } + created := make(map[string][]*mcuProxyConnection) + changed := false + + mcuUrl, _ := config.GetString("mcu", "url") + dnsDiscovery, _ := config.GetBool("mcu", "dnsdiscovery") + if dnsDiscovery != m.dnsDiscovery { + if !dnsDiscovery && fromReload { + m.stopping <- true + <-m.stopped + } + m.dnsDiscovery = dnsDiscovery + if dnsDiscovery && fromReload { + go m.monitorProxyIPs() + } + } + + for _, u := range strings.Split(mcuUrl, " ") { + if existing, found := remove[u]; found { + // Proxy connection still exists in new configuration + delete(remove, u) + for _, conn := range existing { + conn.stopCloseIfEmpty() + } + continue + } + + var ips []net.IP + if dnsDiscovery { + parsed, err := url.Parse(u) + if err != nil { + if !fromReload { + return err + } + + log.Printf("Could not parse URL %s: %s", u, err) + continue + } + + if host, _, err := net.SplitHostPort(parsed.Host); err == nil { + parsed.Host = host + } + + ips, err = net.LookupIP(parsed.Host) + if err != nil { + // Will be retried later. + log.Printf("Could not lookup %s: %s\n", parsed.Host, err) + continue + } + } + + var conns []*mcuProxyConnection + if ips == nil { + conn, err := newMcuProxyConnection(m, u, nil) + if err != nil { + if !fromReload { + return err + } + + log.Printf("Could not create proxy connection to %s: %s", u, err) + continue + } + + conns = append(conns, conn) + } else { + for _, ip := range ips { + conn, err := newMcuProxyConnection(m, u, ip) + if err != nil { + if !fromReload { + return err + } + + log.Printf("Could not create proxy connection to %s (%s): %s", u, ip, err) + continue + } + + conns = append(conns, conn) + } + } + created[u] = conns + } + + for _, conns := range remove { + for _, conn := range conns { + go conn.closeIfEmpty() + } + } + + if fromReload { + for u, conns := range created { + var started []*mcuProxyConnection + for _, conn := range conns { + if err := conn.start(); err != nil { + log.Printf("Could not start new connection to %s: %s", conn, err) + continue + } + + log.Printf("Adding new connection to %s", conn) + started = append(started, conn) + m.connections = append(m.connections, conn) + } + + if len(started) > 0 { + m.connectionsMap[u] = started + changed = true + } + } + + if changed { + atomic.StoreInt64(&m.nextSort, 0) + } + } else { + for u, conns := range created { + m.connections = append(m.connections, conns...) + m.connectionsMap[u] = conns + } + } + + return nil } func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) error { @@ -1348,56 +1599,17 @@ func (m *mcuProxy) syncClient() error { } func (m *mcuProxy) Reload(config *goconf.ConfigFile) { - m.connectionsMu.Lock() - defer m.connectionsMu.Unlock() - if err := m.loadContinentsMap(config); err != nil { log.Printf("Error loading continents map: %s", err) } - remove := make(map[string]*mcuProxyConnection) - for u, conn := range m.connectionsMap { - remove[u] = conn - } - created := make(map[string]*mcuProxyConnection) - changed := false - - mcuUrl, _ := config.GetString("mcu", "url") - for _, u := range strings.Split(mcuUrl, " ") { - if existing, found := remove[u]; found { - // Proxy connection still exists in new configuration - delete(remove, u) - existing.stopCloseIfEmpty() - continue + switch m.urlType { + case proxyUrlTypeStatic: + if err := m.configureStatic(config, true); err != nil { + log.Printf("Could not configure static proxy urls: %s", err) } - - conn, err := newMcuProxyConnection(m, u) - if err != nil { - log.Printf("Could not create proxy connection to %s: %s", u, err) - continue - } - - created[u] = conn - } - - for _, conn := range remove { - go conn.closeIfEmpty() - } - - for u, conn := range created { - if err := conn.start(); err != nil { - log.Printf("Could not start new connection to %s: %s", u, err) - continue - } - - log.Printf("Adding new connection to %s", u) - m.connections = append(m.connections, conn) - m.connectionsMap[u] = conn - changed = true - } - - if changed { - atomic.StoreInt64(&m.nextSort, 0) + default: + // Reloading not supported yet. } } @@ -1443,12 +1655,14 @@ func (m *mcuProxy) addEtcdProxy(key string, data []byte) { m.connectionsMu.Lock() defer m.connectionsMu.Unlock() - if conn, found := m.connectionsMap[info.Address]; found { + if conns, found := m.connectionsMap[info.Address]; found { m.keyInfos[key] = &info m.urlToKey[info.Address] = key - conn.stopCloseIfEmpty() + for _, conn := range conns { + conn.stopCloseIfEmpty() + } } else { - conn, err := newMcuProxyConnection(m, info.Address) + conn, err := newMcuProxyConnection(m, info.Address, nil) if err != nil { log.Printf("Could not create proxy connection to %s: %s", info.Address, err) return @@ -1463,7 +1677,7 @@ func (m *mcuProxy) addEtcdProxy(key string, data []byte) { m.keyInfos[key] = &info m.urlToKey[info.Address] = key m.connections = append(m.connections, conn) - m.connectionsMap[info.Address] = conn + m.connectionsMap[info.Address] = []*mcuProxyConnection{conn} atomic.StoreInt64(&m.nextSort, 0) } } @@ -1488,8 +1702,10 @@ func (m *mcuProxy) removeEtcdProxyLocked(key string) { m.connectionsMu.RLock() defer m.connectionsMu.RUnlock() - if conn, found := m.connectionsMap[info.Address]; found { - go conn.closeIfEmpty() + if conns, found := m.connectionsMap[info.Address]; found { + for _, conn := range conns { + go conn.closeIfEmpty() + } } } @@ -1497,11 +1713,21 @@ func (m *mcuProxy) removeConnection(c *mcuProxyConnection) { m.connectionsMu.Lock() defer m.connectionsMu.Unlock() - if _, found := m.connectionsMap[c.rawUrl]; found { - delete(m.connectionsMap, c.rawUrl) - m.connections = nil - for _, conn := range m.connectionsMap { - m.connections = append(m.connections, conn) + if conns, found := m.connectionsMap[c.rawUrl]; found { + for idx, conn := range conns { + if conn == c { + conns = append(conns[:idx], conns[idx+1:]...) + break + } + } + if len(conns) == 0 { + delete(m.connectionsMap, c.rawUrl) + m.connections = nil + for _, conns := range m.connectionsMap { + m.connections = append(m.connections, conns...) + } + } else { + m.connectionsMap[c.rawUrl] = conns } atomic.StoreInt64(&m.nextSort, 0) @@ -1517,16 +1743,13 @@ func (m *mcuProxy) SetOnDisconnected(f func()) { } type mcuProxyStats struct { - Publishers int64 `json:"publishers"` - Clients int64 `json:"clients"` - Details map[string]*mcuProxyConnectionStats `json:"details"` + Publishers int64 `json:"publishers"` + Clients int64 `json:"clients"` + Details []*mcuProxyConnectionStats `json:"details"` } func (m *mcuProxy) GetStats() interface{} { - details := make(map[string]*mcuProxyConnectionStats) - result := &mcuProxyStats{ - Details: details, - } + result := &mcuProxyStats{} m.connectionsMu.RLock() defer m.connectionsMu.RUnlock() @@ -1535,7 +1758,7 @@ func (m *mcuProxy) GetStats() interface{} { stats := conn.GetStats() result.Publishers += stats.Publishers result.Clients += stats.Clients - details[stats.Url] = stats + result.Details = append(result.Details, stats) } return result } @@ -1711,7 +1934,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st } publisher, err := conn.newPublisher(subctx, listener, id, streamType, bitrate, mediaTypes) if err != nil { - log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn.url, err) + log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err) continue } diff --git a/server.conf.in b/server.conf.in index bd2d50d..9125b6c 100644 --- a/server.conf.in +++ b/server.conf.in @@ -158,6 +158,13 @@ connectionsperhost = 8 # connecting to proxy servers. #token_key = privkey.pem +# For url type "static": Enable DNS discovery on hostname of configured URL. +# If the hostname resolves to multiple IP addresses, a connection is established +# to each of them. +# Changes to the DNS are monitored regularly and proxy connections are created +# or deleted as necessary. +#dnsdiscovery = true + # For url type "etcd": Comma-separated list of static etcd endpoints to # connect to. #endpoints = 127.0.0.1:2379,127.0.0.1:22379,127.0.0.1:32379