Support DNS discovery for proxy server URLs.

If the hostname of a proxy server resolves to multiple IP addresses,
a connection is established to each of them.

Changes to the DNS are monitored regularly and proxy connections are
created or deleted as necessary.
This commit is contained in:
Joachim Bauch 2022-04-04 15:39:49 +02:00
parent b267f0dc50
commit 659730d371
No known key found for this signature in database
GPG Key ID: 77C1D22D53E15F02
2 changed files with 355 additions and 125 deletions

View File

@ -29,6 +29,7 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"sort"
@ -68,6 +69,9 @@ const (
defaultProxyTimeoutSeconds = 2
rttLogDuration = 500 * time.Millisecond
// Update service IP addresses every 10 seconds.
updateDnsInterval = 10 * time.Second
)
type mcuProxyPubSubCommon struct {
@ -93,7 +97,7 @@ func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClie
}
if proxyDebugMessages {
log.Printf("Response from %s: %+v", c.conn.url, response)
log.Printf("Response from %s: %+v", c.conn, response)
}
if response.Type == "error" {
callback(response.Error, nil)
@ -112,7 +116,7 @@ func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadPr
case "candidate":
c.listener.OnIceCandidate(client, msg.Payload["candidate"])
default:
log.Printf("Unsupported payload from %s: %+v", c.conn.url, msg)
log.Printf("Unsupported payload from %s: %+v", c.conn, msg)
}
}
@ -157,11 +161,11 @@ func (p *mcuProxyPublisher) Close(ctx context.Context) {
}
if _, err := p.conn.performSyncRequest(ctx, msg); err != nil {
log.Printf("Could not delete publisher %s at %s: %s", p.proxyId, p.conn.url, err)
log.Printf("Could not delete publisher %s at %s: %s", p.proxyId, p.conn, err)
return
}
log.Printf("Delete publisher %s at %s", p.proxyId, p.conn.url)
log.Printf("Delete publisher %s at %s", p.proxyId, p.conn)
}
func (p *mcuProxyPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
@ -188,7 +192,7 @@ func (p *mcuProxyPublisher) ProcessEvent(msg *EventProxyServerMessage) {
case "publisher-closed":
p.NotifyClosed()
default:
log.Printf("Unsupported event from %s: %+v", p.conn.url, msg)
log.Printf("Unsupported event from %s: %+v", p.conn, msg)
}
}
@ -232,11 +236,11 @@ func (s *mcuProxySubscriber) Close(ctx context.Context) {
}
if _, err := s.conn.performSyncRequest(ctx, msg); err != nil {
log.Printf("Could not delete subscriber %s at %s: %s", s.proxyId, s.conn.url, err)
log.Printf("Could not delete subscriber %s at %s: %s", s.proxyId, s.conn, err)
return
}
log.Printf("Delete subscriber %s at %s", s.proxyId, s.conn.url)
log.Printf("Delete subscriber %s at %s", s.proxyId, s.conn)
}
func (s *mcuProxySubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
@ -263,7 +267,7 @@ func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) {
case "subscriber-closed":
s.NotifyClosed()
default:
log.Printf("Unsupported event from %s: %+v", s.conn.url, msg)
log.Printf("Unsupported event from %s: %+v", s.conn, msg)
}
}
@ -276,6 +280,7 @@ type mcuProxyConnection struct {
proxy *mcuProxy
rawUrl string
url *url.URL
ip net.IP
mu sync.Mutex
closeChan chan bool
@ -303,7 +308,7 @@ type mcuProxyConnection struct {
subscribers map[string]*mcuProxySubscriber
}
func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection, error) {
func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP) (*mcuProxyConnection, error) {
parsed, err := url.Parse(baseUrl)
if err != nil {
return nil, err
@ -313,6 +318,7 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection
proxy: proxy,
rawUrl: baseUrl,
url: parsed,
ip: ip,
closeChan: make(chan bool, 1),
closedChan: make(chan bool, 1),
reconnectInterval: int64(initialReconnectInterval),
@ -326,8 +332,17 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection
return conn, nil
}
func (c *mcuProxyConnection) String() string {
if c.ip != nil {
return fmt.Sprintf("%s (%s)", c.rawUrl, c.ip)
}
return c.rawUrl
}
type mcuProxyConnectionStats struct {
Url string `json:"url"`
IP net.IP `json:"ip,omitempty"`
Connected bool `json:"connected"`
Publishers int64 `json:"publishers"`
Clients int64 `json:"clients"`
@ -339,6 +354,7 @@ type mcuProxyConnectionStats struct {
func (c *mcuProxyConnection) GetStats() *mcuProxyConnectionStats {
result := &mcuProxyConnectionStats{
Url: c.url.String(),
IP: c.ip,
}
c.mu.Lock()
if c.conn != nil {
@ -397,7 +413,7 @@ func (c *mcuProxyConnection) readPump() {
rtt := now.Sub(time.Unix(0, ts))
if rtt >= rttLogDuration {
rtt_ms := rtt.Nanoseconds() / time.Millisecond.Nanoseconds()
log.Printf("Proxy at %s has RTT of %d ms (%s)", c.url, rtt_ms, rtt)
log.Printf("Proxy at %s has RTT of %d ms (%s)", c, rtt_ms, rtt)
}
}
return nil
@ -411,14 +427,14 @@ func (c *mcuProxyConnection) readPump() {
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
log.Printf("Error reading from %s: %v", c.url, err)
log.Printf("Error reading from %s: %v", c, err)
}
break
}
var msg ProxyServerMessage
if err := json.Unmarshal(message, &msg); err != nil {
log.Printf("Error unmarshaling %s from %s: %s", string(message), c.url, err)
log.Printf("Error unmarshaling %s from %s: %s", string(message), c, err)
continue
}
@ -437,7 +453,7 @@ func (c *mcuProxyConnection) sendPing() bool {
msg := strconv.FormatInt(now.UnixNano(), 10)
c.conn.SetWriteDeadline(now.Add(writeWait)) // nolint
if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil {
log.Printf("Could not send ping to proxy at %s: %v", c.url, err)
log.Printf("Could not send ping to proxy at %s: %v", c, err)
c.scheduleReconnect()
return false
}
@ -489,7 +505,7 @@ func (c *mcuProxyConnection) stop(ctx context.Context) {
c.closeChan <- true
if err := c.sendClose(); err != nil {
if err != ErrNotConnected {
log.Printf("Could not send close message to %s: %s", c.url, err)
log.Printf("Could not send close message to %s: %s", c, err)
}
c.close()
return
@ -499,7 +515,7 @@ func (c *mcuProxyConnection) stop(ctx context.Context) {
case <-c.closedChan:
case <-ctx.Done():
if err := ctx.Err(); err != nil {
log.Printf("Error waiting for connection to %s get closed: %s", c.url, err)
log.Printf("Error waiting for connection to %s get closed: %s", c, err)
c.close()
}
}
@ -534,7 +550,7 @@ func (c *mcuProxyConnection) closeIfEmpty() bool {
c.subscribersLock.RUnlock()
if total > 0 {
// Connection will be closed once all clients have disconnected.
log.Printf("Connection to %s is still used by %d clients, defer closing", c.url, total)
log.Printf("Connection to %s is still used by %d clients, defer closing", c, total)
return false
}
@ -542,7 +558,7 @@ func (c *mcuProxyConnection) closeIfEmpty() bool {
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
defer cancel()
log.Printf("All clients disconnected, closing connection to %s", c.url)
log.Printf("All clients disconnected, closing connection to %s", c)
c.stop(ctx)
c.proxy.removeConnection(c)
@ -552,7 +568,7 @@ func (c *mcuProxyConnection) closeIfEmpty() bool {
func (c *mcuProxyConnection) scheduleReconnect() {
if err := c.sendClose(); err != nil && err != ErrNotConnected {
log.Printf("Could not send close message to %s: %s", c.url, err)
log.Printf("Could not send close message to %s: %s", c, err)
}
c.close()
@ -569,7 +585,7 @@ func (c *mcuProxyConnection) scheduleReconnect() {
func (c *mcuProxyConnection) reconnect() {
u, err := c.url.Parse("proxy")
if err != nil {
log.Printf("Could not resolve url to proxy at %s: %s", c.url, err)
log.Printf("Could not resolve url to proxy at %s: %s", c, err)
c.scheduleReconnect()
return
}
@ -579,14 +595,31 @@ func (c *mcuProxyConnection) reconnect() {
u.Scheme = "wss"
}
conn, _, err := c.proxy.dialer.Dial(u.String(), nil)
dialer := c.proxy.dialer
if c.ip != nil {
dialer = &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: c.proxy.dialer.HandshakeTimeout,
TLSClientConfig: c.proxy.dialer.TLSClientConfig,
// Override DNS lookup and connect to custom IP address.
NetDialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
if _, port, err := net.SplitHostPort(addr); err == nil {
addr = net.JoinHostPort(c.ip.String(), port)
}
return net.Dial(network, addr)
},
}
}
conn, _, err := dialer.Dial(u.String(), nil)
if err != nil {
log.Printf("Could not connect to %s: %s", u, err)
log.Printf("Could not connect to %s: %s", c, err)
c.scheduleReconnect()
return
}
log.Printf("Connected to %s", u)
log.Printf("Connected to %s", c)
atomic.StoreUint32(&c.closed, 0)
c.mu.Lock()
@ -597,7 +630,7 @@ func (c *mcuProxyConnection) reconnect() {
atomic.StoreInt64(&c.reconnectInterval, int64(initialReconnectInterval))
atomic.StoreUint32(&c.shutdownScheduled, 0)
if err := c.sendHello(); err != nil {
log.Printf("Could not send hello request to %s: %s", c.url, err)
log.Printf("Could not send hello request to %s: %s", c, err)
c.scheduleReconnect()
return
}
@ -697,19 +730,19 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) {
switch msg.Type {
case "error":
if msg.Error.Code == "no_such_session" {
log.Printf("Session %s could not be resumed on %s, registering new", c.sessionId, c.url)
log.Printf("Session %s could not be resumed on %s, registering new", c.sessionId, c)
c.clearPublishers()
c.clearSubscribers()
c.clearCallbacks()
c.sessionId = ""
if err := c.sendHello(); err != nil {
log.Printf("Could not send hello request to %s: %s", c.url, err)
log.Printf("Could not send hello request to %s: %s", c, err)
c.scheduleReconnect()
}
return
}
log.Printf("Hello connection to %s failed with %+v, reconnecting", c.url, msg.Error)
log.Printf("Hello connection to %s failed with %+v, reconnecting", c, msg.Error)
c.scheduleReconnect()
case "hello":
resumed := c.sessionId == msg.Hello.SessionId
@ -717,30 +750,30 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) {
country := ""
if msg.Hello.Server != nil {
if country = msg.Hello.Server.Country; country != "" && !IsValidCountry(country) {
log.Printf("Proxy %s sent invalid country %s in hello response", c.url, country)
log.Printf("Proxy %s sent invalid country %s in hello response", c, country)
country = ""
}
}
c.country.Store(country)
if resumed {
log.Printf("Resumed session %s on %s", c.sessionId, c.url)
log.Printf("Resumed session %s on %s", c.sessionId, c)
} else if country != "" {
log.Printf("Received session %s from %s (in %s)", c.sessionId, c.url, country)
log.Printf("Received session %s from %s (in %s)", c.sessionId, c, country)
} else {
log.Printf("Received session %s from %s", c.sessionId, c.url)
log.Printf("Received session %s from %s", c.sessionId, c)
}
if atomic.CompareAndSwapUint32(&c.trackClose, 0, 1) {
statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Inc()
}
default:
log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c.url)
log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c)
c.scheduleReconnect()
}
return
}
if proxyDebugMessages {
log.Printf("Received from %s: %+v", c.url, msg)
log.Printf("Received from %s: %+v", c, msg)
}
callback := c.getCallback(msg.Id)
if callback != nil {
@ -756,7 +789,7 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) {
case "bye":
c.processBye(msg)
default:
log.Printf("Unsupported message received from %s: %+v", c.url, msg)
log.Printf("Unsupported message received from %s: %+v", c, msg)
}
}
@ -778,37 +811,37 @@ func (c *mcuProxyConnection) processPayload(msg *ProxyServerMessage) {
return
}
log.Printf("Received payload for unknown client %+v from %s", payload, c.url)
log.Printf("Received payload for unknown client %+v from %s", payload, c)
}
func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) {
event := msg.Event
switch event.Type {
case "backend-disconnected":
log.Printf("Upstream backend at %s got disconnected, reset MCU objects", c.url)
log.Printf("Upstream backend at %s got disconnected, reset MCU objects", c)
c.clearPublishers()
c.clearSubscribers()
c.clearCallbacks()
// TODO: Should we also reconnect?
return
case "backend-connected":
log.Printf("Upstream backend at %s is connected", c.url)
log.Printf("Upstream backend at %s is connected", c)
return
case "update-load":
if proxyDebugMessages {
log.Printf("Load of %s now at %d", c.url, event.Load)
log.Printf("Load of %s now at %d", c, event.Load)
}
atomic.StoreInt64(&c.load, event.Load)
statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load))
return
case "shutdown-scheduled":
log.Printf("Proxy %s is scheduled to shutdown", c.url)
log.Printf("Proxy %s is scheduled to shutdown", c)
atomic.StoreUint32(&c.shutdownScheduled, 1)
return
}
if proxyDebugMessages {
log.Printf("Process event from %s: %+v", c.url, event)
log.Printf("Process event from %s: %+v", c, event)
}
c.publishersLock.RLock()
publisher, found := c.publishers[event.ClientId]
@ -826,17 +859,17 @@ func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) {
return
}
log.Printf("Received event for unknown client %+v from %s", event, c.url)
log.Printf("Received event for unknown client %+v from %s", event, c)
}
func (c *mcuProxyConnection) processBye(msg *ProxyServerMessage) {
bye := msg.Bye
switch bye.Reason {
case "session_resumed":
log.Printf("Session %s on %s was resumed by other client, resetting", c.sessionId, c.url)
log.Printf("Session %s on %s was resumed by other client, resetting", c.sessionId, c)
c.sessionId = ""
default:
log.Printf("Received bye with unsupported reason from %s %+v", c.url, bye)
log.Printf("Received bye with unsupported reason from %s %+v", c, bye)
}
}
@ -878,7 +911,7 @@ func (c *mcuProxyConnection) sendMessage(msg *ProxyClientMessage) error {
func (c *mcuProxyConnection) sendMessageLocked(msg *ProxyClientMessage) error {
if proxyDebugMessages {
log.Printf("Send message to %s: %+v", c.url, msg)
log.Printf("Send message to %s: %+v", c, msg)
}
if c.conn == nil {
return ErrNotConnected
@ -946,7 +979,7 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe
}
proxyId := response.Command.Id
log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c.url, id)
log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id)
publisher := newMcuProxyPublisher(id, streamType, mediaTypes, proxyId, c, listener)
c.publishersLock.Lock()
c.publishers[proxyId] = publisher
@ -981,7 +1014,7 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList
}
proxyId := response.Command.Id
log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c.url, publisher)
log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisher)
subscriber := newMcuProxySubscriber(publisher, streamType, proxyId, c, listener)
c.subscribersLock.Lock()
c.subscribers[proxyId] = subscriber
@ -996,6 +1029,7 @@ type mcuProxy struct {
connRequests int64
nextSort int64
urlType string
tokenId string
tokenKey *rsa.PrivateKey
@ -1006,10 +1040,14 @@ type mcuProxy struct {
dialer *websocket.Dialer
connections []*mcuProxyConnection
connectionsMap map[string]*mcuProxyConnection
connectionsMap map[string][]*mcuProxyConnection
connectionsMu sync.RWMutex
proxyTimeout time.Duration
dnsDiscovery bool
stopping chan bool
stopped chan bool
maxStreamBitrate int
maxScreenBitrate int
@ -1024,6 +1062,9 @@ type mcuProxy struct {
func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) {
urlType, _ := config.GetString("mcu", "urltype")
if urlType == "" {
urlType = proxyUrlTypeStatic
}
tokenId, _ := config.GetString("mcu", "token_id")
if tokenId == "" {
@ -1059,6 +1100,7 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) {
}
mcu := &mcuProxy{
urlType: urlType,
tokenId: tokenId,
tokenKey: tokenKey,
@ -1066,9 +1108,12 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) {
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: proxyTimeout,
},
connectionsMap: make(map[string]*mcuProxyConnection),
connectionsMap: make(map[string][]*mcuProxyConnection),
proxyTimeout: proxyTimeout,
stopping: make(chan bool, 1),
stopped: make(chan bool, 1),
maxStreamBitrate: maxStreamBitrate,
maxScreenBitrate: maxScreenBitrate,
@ -1089,21 +1134,10 @@ func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) {
}
}
if urlType == "" {
urlType = proxyUrlTypeStatic
}
switch urlType {
case proxyUrlTypeStatic:
mcuUrl, _ := config.GetString("mcu", "url")
for _, u := range strings.Split(mcuUrl, " ") {
conn, err := newMcuProxyConnection(mcu, u)
if err != nil {
return nil, err
}
mcu.connections = append(mcu.connections, conn)
mcu.connectionsMap[u] = conn
if err := mcu.configureStatic(config, false); err != nil {
return nil, err
}
if len(mcu.connections) == 0 {
return nil, fmt.Errorf("No MCU proxy connections configured")
@ -1180,6 +1214,11 @@ func (m *mcuProxy) Start() error {
return err
}
}
if m.urlType == proxyUrlTypeStatic && m.dnsDiscovery {
go m.monitorProxyIPs()
}
return nil
}
@ -1192,6 +1231,218 @@ func (m *mcuProxy) Stop() {
defer cancel()
c.stop(ctx)
}
if m.urlType == proxyUrlTypeStatic && m.dnsDiscovery {
m.stopping <- true
<-m.stopped
}
}
func (m *mcuProxy) monitorProxyIPs() {
log.Printf("Start monitoring proxy IPs")
ticker := time.NewTicker(updateDnsInterval)
for {
select {
case <-ticker.C:
m.updateProxyIPs()
case <-m.stopping:
m.stopped <- true
return
}
}
}
func (m *mcuProxy) updateProxyIPs() {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
for u, conns := range m.connectionsMap {
if len(conns) == 0 {
continue
}
host := conns[0].url.Host
if h, _, err := net.SplitHostPort(host); err == nil {
host = h
}
ips, err := net.LookupIP(host)
if err != nil {
log.Printf("Could not lookup %s: %s", host, err)
continue
}
var newConns []*mcuProxyConnection
changed := false
for _, conn := range conns {
found := false
for idx, ip := range ips {
if ip.Equal(conn.ip) {
ips = append(ips[:idx], ips[idx+1:]...)
found = true
conn.stopCloseIfEmpty()
newConns = append(newConns, conn)
break
}
}
if !found {
changed = true
log.Printf("Removing connection to %s", conn)
conn.closeIfEmpty()
}
}
for _, ip := range ips {
conn, err := newMcuProxyConnection(m, u, ip)
if err != nil {
log.Printf("Could not create proxy connection to %s (%s): %s", u, ip, err)
continue
}
if err := conn.start(); err != nil {
log.Printf("Could not start new connection to %s: %s", conn, err)
continue
}
log.Printf("Adding new connection to %s", conn)
m.connections = append(m.connections, conn)
newConns = append(newConns, conn)
changed = true
}
if changed {
m.connectionsMap[u] = newConns
}
}
}
func (m *mcuProxy) configureStatic(config *goconf.ConfigFile, fromReload bool) error {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
remove := make(map[string][]*mcuProxyConnection)
for u, conns := range m.connectionsMap {
remove[u] = conns
}
created := make(map[string][]*mcuProxyConnection)
changed := false
mcuUrl, _ := config.GetString("mcu", "url")
dnsDiscovery, _ := config.GetBool("mcu", "dnsdiscovery")
if dnsDiscovery != m.dnsDiscovery {
if !dnsDiscovery && fromReload {
m.stopping <- true
<-m.stopped
}
m.dnsDiscovery = dnsDiscovery
if dnsDiscovery && fromReload {
go m.monitorProxyIPs()
}
}
for _, u := range strings.Split(mcuUrl, " ") {
if existing, found := remove[u]; found {
// Proxy connection still exists in new configuration
delete(remove, u)
for _, conn := range existing {
conn.stopCloseIfEmpty()
}
continue
}
var ips []net.IP
if dnsDiscovery {
parsed, err := url.Parse(u)
if err != nil {
if !fromReload {
return err
}
log.Printf("Could not parse URL %s: %s", u, err)
continue
}
if host, _, err := net.SplitHostPort(parsed.Host); err == nil {
parsed.Host = host
}
ips, err = net.LookupIP(parsed.Host)
if err != nil {
// Will be retried later.
log.Printf("Could not lookup %s: %s\n", parsed.Host, err)
continue
}
}
var conns []*mcuProxyConnection
if ips == nil {
conn, err := newMcuProxyConnection(m, u, nil)
if err != nil {
if !fromReload {
return err
}
log.Printf("Could not create proxy connection to %s: %s", u, err)
continue
}
conns = append(conns, conn)
} else {
for _, ip := range ips {
conn, err := newMcuProxyConnection(m, u, ip)
if err != nil {
if !fromReload {
return err
}
log.Printf("Could not create proxy connection to %s (%s): %s", u, ip, err)
continue
}
conns = append(conns, conn)
}
}
created[u] = conns
}
for _, conns := range remove {
for _, conn := range conns {
go conn.closeIfEmpty()
}
}
if fromReload {
for u, conns := range created {
var started []*mcuProxyConnection
for _, conn := range conns {
if err := conn.start(); err != nil {
log.Printf("Could not start new connection to %s: %s", conn, err)
continue
}
log.Printf("Adding new connection to %s", conn)
started = append(started, conn)
m.connections = append(m.connections, conn)
}
if len(started) > 0 {
m.connectionsMap[u] = started
changed = true
}
}
if changed {
atomic.StoreInt64(&m.nextSort, 0)
}
} else {
for u, conns := range created {
m.connections = append(m.connections, conns...)
m.connectionsMap[u] = conns
}
}
return nil
}
func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) error {
@ -1348,56 +1599,17 @@ func (m *mcuProxy) syncClient() error {
}
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
if err := m.loadContinentsMap(config); err != nil {
log.Printf("Error loading continents map: %s", err)
}
remove := make(map[string]*mcuProxyConnection)
for u, conn := range m.connectionsMap {
remove[u] = conn
}
created := make(map[string]*mcuProxyConnection)
changed := false
mcuUrl, _ := config.GetString("mcu", "url")
for _, u := range strings.Split(mcuUrl, " ") {
if existing, found := remove[u]; found {
// Proxy connection still exists in new configuration
delete(remove, u)
existing.stopCloseIfEmpty()
continue
switch m.urlType {
case proxyUrlTypeStatic:
if err := m.configureStatic(config, true); err != nil {
log.Printf("Could not configure static proxy urls: %s", err)
}
conn, err := newMcuProxyConnection(m, u)
if err != nil {
log.Printf("Could not create proxy connection to %s: %s", u, err)
continue
}
created[u] = conn
}
for _, conn := range remove {
go conn.closeIfEmpty()
}
for u, conn := range created {
if err := conn.start(); err != nil {
log.Printf("Could not start new connection to %s: %s", u, err)
continue
}
log.Printf("Adding new connection to %s", u)
m.connections = append(m.connections, conn)
m.connectionsMap[u] = conn
changed = true
}
if changed {
atomic.StoreInt64(&m.nextSort, 0)
default:
// Reloading not supported yet.
}
}
@ -1443,12 +1655,14 @@ func (m *mcuProxy) addEtcdProxy(key string, data []byte) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
if conn, found := m.connectionsMap[info.Address]; found {
if conns, found := m.connectionsMap[info.Address]; found {
m.keyInfos[key] = &info
m.urlToKey[info.Address] = key
conn.stopCloseIfEmpty()
for _, conn := range conns {
conn.stopCloseIfEmpty()
}
} else {
conn, err := newMcuProxyConnection(m, info.Address)
conn, err := newMcuProxyConnection(m, info.Address, nil)
if err != nil {
log.Printf("Could not create proxy connection to %s: %s", info.Address, err)
return
@ -1463,7 +1677,7 @@ func (m *mcuProxy) addEtcdProxy(key string, data []byte) {
m.keyInfos[key] = &info
m.urlToKey[info.Address] = key
m.connections = append(m.connections, conn)
m.connectionsMap[info.Address] = conn
m.connectionsMap[info.Address] = []*mcuProxyConnection{conn}
atomic.StoreInt64(&m.nextSort, 0)
}
}
@ -1488,8 +1702,10 @@ func (m *mcuProxy) removeEtcdProxyLocked(key string) {
m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock()
if conn, found := m.connectionsMap[info.Address]; found {
go conn.closeIfEmpty()
if conns, found := m.connectionsMap[info.Address]; found {
for _, conn := range conns {
go conn.closeIfEmpty()
}
}
}
@ -1497,11 +1713,21 @@ func (m *mcuProxy) removeConnection(c *mcuProxyConnection) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
if _, found := m.connectionsMap[c.rawUrl]; found {
delete(m.connectionsMap, c.rawUrl)
m.connections = nil
for _, conn := range m.connectionsMap {
m.connections = append(m.connections, conn)
if conns, found := m.connectionsMap[c.rawUrl]; found {
for idx, conn := range conns {
if conn == c {
conns = append(conns[:idx], conns[idx+1:]...)
break
}
}
if len(conns) == 0 {
delete(m.connectionsMap, c.rawUrl)
m.connections = nil
for _, conns := range m.connectionsMap {
m.connections = append(m.connections, conns...)
}
} else {
m.connectionsMap[c.rawUrl] = conns
}
atomic.StoreInt64(&m.nextSort, 0)
@ -1517,16 +1743,13 @@ func (m *mcuProxy) SetOnDisconnected(f func()) {
}
type mcuProxyStats struct {
Publishers int64 `json:"publishers"`
Clients int64 `json:"clients"`
Details map[string]*mcuProxyConnectionStats `json:"details"`
Publishers int64 `json:"publishers"`
Clients int64 `json:"clients"`
Details []*mcuProxyConnectionStats `json:"details"`
}
func (m *mcuProxy) GetStats() interface{} {
details := make(map[string]*mcuProxyConnectionStats)
result := &mcuProxyStats{
Details: details,
}
result := &mcuProxyStats{}
m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock()
@ -1535,7 +1758,7 @@ func (m *mcuProxy) GetStats() interface{} {
stats := conn.GetStats()
result.Publishers += stats.Publishers
result.Clients += stats.Clients
details[stats.Url] = stats
result.Details = append(result.Details, stats)
}
return result
}
@ -1711,7 +1934,7 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
}
publisher, err := conn.newPublisher(subctx, listener, id, streamType, bitrate, mediaTypes)
if err != nil {
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn.url, err)
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err)
continue
}

View File

@ -158,6 +158,13 @@ connectionsperhost = 8
# connecting to proxy servers.
#token_key = privkey.pem
# For url type "static": Enable DNS discovery on hostname of configured URL.
# If the hostname resolves to multiple IP addresses, a connection is established
# to each of them.
# Changes to the DNS are monitored regularly and proxy connections are created
# or deleted as necessary.
#dnsdiscovery = true
# For url type "etcd": Comma-separated list of static etcd endpoints to
# connect to.
#endpoints = 127.0.0.1:2379,127.0.0.1:22379,127.0.0.1:32379