Create temporary connection to proxy used by remote publisher.

This commit is contained in:
Joachim Bauch 2022-06-22 15:34:54 +02:00
parent 2ca9fb21c4
commit 6f64ff901d
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02

View file

@ -309,6 +309,9 @@ type mcuProxyConnection struct {
shutdownScheduled uint32
closeScheduled uint32
trackClose uint32
temporary uint32
connectedNotifier SingleNotifier
helloMsgId string
sessionId string
@ -364,6 +367,7 @@ type mcuProxyConnectionStats struct {
Clients int64 `json:"clients"`
Load *int64 `json:"load,omitempty"`
Shutdown *bool `json:"shutdown,omitempty"`
Temporary *bool `json:"temporary,omitempty"`
Uptime *time.Time `json:"uptime,omitempty"`
}
@ -380,6 +384,8 @@ func (c *mcuProxyConnection) GetStats() *mcuProxyConnectionStats {
result.Load = &load
shutdown := c.IsShutdownScheduled()
result.Shutdown = &shutdown
temporary := c.IsTemporary()
result.Temporary = &temporary
}
c.mu.Unlock()
c.publishersLock.RLock()
@ -400,6 +406,18 @@ func (c *mcuProxyConnection) Country() string {
return c.country.Load().(string)
}
func (c *mcuProxyConnection) IsTemporary() bool {
return atomic.LoadUint32(&c.temporary) != 0
}
func (c *mcuProxyConnection) setTemporary() {
atomic.StoreUint32(&c.temporary, 1)
}
func (c *mcuProxyConnection) clearTemporary() {
atomic.StoreUint32(&c.temporary, 0)
}
func (c *mcuProxyConnection) IsShutdownScheduled() bool {
return atomic.LoadUint32(&c.shutdownScheduled) != 0 || atomic.LoadUint32(&c.closeScheduled) != 0
}
@ -484,6 +502,7 @@ func (c *mcuProxyConnection) writePump() {
}()
c.reconnectTimer = time.NewTimer(0)
defer c.reconnectTimer.Stop()
for {
select {
case <-c.reconnectTimer.C:
@ -541,6 +560,8 @@ func (c *mcuProxyConnection) close() {
c.mu.Lock()
defer c.mu.Unlock()
c.connectedNotifier.Reset()
if c.conn != nil {
c.conn.Close()
c.conn = nil
@ -588,6 +609,11 @@ func (c *mcuProxyConnection) scheduleReconnect() {
}
c.close()
if c.IsShutdownScheduled() {
c.proxy.removeConnection(c)
return
}
interval := atomic.LoadInt64(&c.reconnectInterval)
c.reconnectTimer.Reset(time.Duration(interval))
@ -635,6 +661,11 @@ func (c *mcuProxyConnection) reconnect() {
return
}
if c.IsShutdownScheduled() {
c.proxy.removeConnection(c)
return
}
log.Printf("Connected to %s", c)
atomic.StoreUint32(&c.closed, 0)
@ -658,6 +689,22 @@ func (c *mcuProxyConnection) reconnect() {
go c.readPump()
}
func (c *mcuProxyConnection) waitUntilConnected(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
return nil
}
waiter := c.connectedNotifier.NewWaiter()
defer c.connectedNotifier.Release(waiter)
c.mu.Unlock()
defer c.mu.Lock()
return waiter.Wait(ctx)
}
func (c *mcuProxyConnection) removePublisher(publisher *mcuProxyPublisher) {
c.proxy.removePublisher(publisher)
@ -670,7 +717,7 @@ func (c *mcuProxyConnection) removePublisher(publisher *mcuProxyPublisher) {
}
delete(c.publisherIds, publisher.id+"|"+publisher.StreamType())
if len(c.publishers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 {
if len(c.publishers) == 0 && (atomic.LoadUint32(&c.closeScheduled) != 0 || c.IsTemporary()) {
go c.closeIfEmpty()
}
}
@ -687,7 +734,7 @@ func (c *mcuProxyConnection) clearPublishers() {
c.publishers = make(map[string]*mcuProxyPublisher)
c.publisherIds = make(map[string]string)
if atomic.LoadUint32(&c.closeScheduled) != 0 {
if atomic.LoadUint32(&c.closeScheduled) != 0 || c.IsTemporary() {
go c.closeIfEmpty()
}
}
@ -701,7 +748,7 @@ func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) {
statsSubscribersCurrent.WithLabelValues(subscriber.StreamType()).Dec()
}
if len(c.subscribers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 {
if len(c.subscribers) == 0 && (atomic.LoadUint32(&c.closeScheduled) != 0 || c.IsTemporary()) {
go c.closeIfEmpty()
}
}
@ -717,7 +764,7 @@ func (c *mcuProxyConnection) clearSubscribers() {
}(c.subscribers)
c.subscribers = make(map[string]*mcuProxySubscriber)
if atomic.LoadUint32(&c.closeScheduled) != 0 {
if atomic.LoadUint32(&c.closeScheduled) != 0 || c.IsTemporary() {
go c.closeIfEmpty()
}
}
@ -781,6 +828,8 @@ func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) {
if atomic.CompareAndSwapUint32(&c.trackClose, 0, 1) {
statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Inc()
}
c.connectedNotifier.Notify()
default:
log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c)
c.scheduleReconnect()
@ -1280,6 +1329,11 @@ func (m *mcuProxy) updateProxyIPs() {
host = h
}
if net.ParseIP(host) != nil {
// No need to lookup endpoints that connect to IP addresses.
continue
}
ips, err := net.LookupIP(host)
if err != nil {
log.Printf("Could not lookup %s: %s", host, err)
@ -1295,6 +1349,7 @@ func (m *mcuProxy) updateProxyIPs() {
ips = append(ips[:idx], ips[idx+1:]...)
found = true
conn.stopCloseIfEmpty()
conn.clearTemporary()
newConns = append(newConns, conn)
break
}
@ -1361,6 +1416,7 @@ func (m *mcuProxy) configureStatic(config *goconf.ConfigFile, fromReload bool) e
delete(remove, u)
for _, conn := range existing {
conn.stopCloseIfEmpty()
conn.clearTemporary()
}
continue
}
@ -1559,6 +1615,7 @@ func (m *mcuProxy) EtcdKeyUpdated(client *EtcdClient, key string, data []byte) {
m.urlToKey[info.Address] = key
for _, conn := range conns {
conn.stopCloseIfEmpty()
conn.clearTemporary()
}
} else {
conn, err := newMcuProxyConnection(m, info.Address, nil)
@ -1813,7 +1870,7 @@ func (m *mcuProxy) removeWaiter(id uint64) {
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType string, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
connections := m.getSortedConnections(initiator)
for _, conn := range connections {
if conn.IsShutdownScheduled() {
if conn.IsShutdownScheduled() || conn.IsTemporary() {
continue
}
@ -1947,24 +2004,59 @@ func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publ
m.connectionsMu.RLock()
connections := m.connections
m.connectionsMu.RUnlock()
var publisherConn *mcuProxyConnection
for _, conn := range connections {
if conn.rawUrl != url || !ip.Equal(conn.ip) {
continue
}
// Simple case, signaling server has a connection to the same endpoint
subscriber, err := conn.newSubscriber(ctx, listener, id, publisher, streamType)
publisherConn = conn
break
}
if publisherConn == nil {
publisherConn, err = newMcuProxyConnection(m, url, ip)
if err != nil {
log.Printf("Could not create subscriber for %s publisher %s: %s", streamType, publisher, err)
log.Printf("Could not create temporary connection to %s for %s publisher %s: %s", url, streamType, publisher, err)
return
}
publisherConn.setTemporary()
if err := publisherConn.start(); err != nil {
log.Printf("Could not start new connection to %s: %s", publisherConn, err)
publisherConn.closeIfEmpty()
return
}
ch <- subscriber
if err := publisherConn.waitUntilConnected(ctx); err != nil {
log.Printf("Could not establish new connection to %s: %s", publisherConn, err)
publisherConn.closeIfEmpty()
return
}
m.connectionsMu.Lock()
m.connections = append(m.connections, publisherConn)
conns, found := m.connectionsMap[url]
if found {
conns = append(conns, publisherConn)
} else {
conns = []*mcuProxyConnection{publisherConn}
}
m.connectionsMap[url] = conns
m.connectionsMu.Unlock()
}
subscriber, err := publisherConn.newSubscriber(ctx, listener, id, publisher, streamType)
if err != nil {
if publisherConn.IsTemporary() {
publisherConn.closeIfEmpty()
}
log.Printf("Could not create subscriber for %s publisher %s: %s", streamType, publisher, err)
return
}
// TODO: Create temporary connection to new proxy and tear down when last subscriber left.
log.Printf("Not implemented yet: need new connection to %s (%s) for %s publisher %s (%s)", url, ip, streamType, publisher, id)
ch <- subscriber
}(client)
}
}