Add support for fetching proxy URLs from etcd cluster.

This commit is contained in:
Joachim Bauch 2020-08-31 15:23:55 +02:00
parent 744b514e9f
commit dcf533b8f1
Failed to extract signature
4 changed files with 302 additions and 25 deletions

View file

@ -99,12 +99,12 @@ connectionsperhost = 8
[mcu]
# The type of the MCU to use. Currently only "janus" and "proxy" are supported.
type = janus
# Leave empty to disable MCU functionality.
#type =
# For type "janus": the URL to the websocket endpoint of the MCU server.
# For type "proxy": a space-separated list of proxy URLs to connect to.
# Leave empty to disable MCU functionality.
url =
#url =
# For type "janus": the maximum bitrate per publishing stream (in bits per
# second).
@ -116,6 +116,14 @@ url =
# Default is 2 mbit/sec.
#maxscreenbitrate = 2097152
# For type "proxy": type of URL configuration for proxy servers.
# Defaults to "static".
#
# Possible values:
# - static: A space-separated list of proxy URLs is given in the "url" option.
# - etcd: Proxy URLs are retrieved from an etcd cluster (see below).
#urltype = "static"
# For type "proxy": the id of the token to use when connecting to proxy servers.
#token_id = server1
@ -123,6 +131,31 @@ url =
# connecting to proxy servers.
#token_key = privkey.pem
# For url type "etcd": Comma-separated list of static etcd endpoints to
# connect to.
#endpoints = 127.0.0.1:2379,127.0.0.1:22379,127.0.0.1:32379
# For url type "etcd": Options to perform endpoint discovery through DNS SRV.
# Only used if no endpoints are configured manually.
#discoverysrv = example.com
#discoveryservice = foo
# For url type "etcd": Path to private key, client certificate and CA
# certificate if TLS authentication should be used.
#clientkey = /path/to/etcd-client.key
#clientcert = /path/to/etcd-client.crt
#cacert = /path/to/etcd-ca.crt
# For url type "etcd": Key prefix of MCU proxy entries. All keys below will be
# watched and assumed to contain a JSON document. The entry "address" from this
# document will be used as proxy URL, other contents in the document will be
# ignored.
#
# Example:
# "/signaling/proxy/server/one" -> {"address": "https://proxy1.domain.invalid"}
# "/signaling/proxy/server/two" -> {"address": "https://proxy2.domain.invalid"}
#keyprefix = /signaling/proxy/server
[turn]
# API key that the MCU will need to send when requesting TURN credentials.
#apikey = the-api-key-for-the-rest-service

View file

@ -91,7 +91,7 @@ func createTLSListener(addr string, certFile, keyFile string) (net.Listener, err
}
func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile)
log.SetFlags(log.Lshortfile)
flag.Parse()
if *showVersion {
@ -155,12 +155,13 @@ func main() {
}
mcuUrl, _ := config.GetString("mcu", "url")
if mcuUrl != "" {
mcuType, _ := config.GetString("mcu", "type")
if mcuType == "" {
mcuType = signaling.McuTypeDefault
}
mcuType, _ := config.GetString("mcu", "type")
if mcuType == "" && mcuUrl != "" {
log.Printf("WARNING: Old-style MCU configuration detected with url but no type, defaulting to type %s", signaling.McuTypeJanus)
mcuType = signaling.McuTypeJanus
}
if mcuType != "" {
var mcu signaling.Mcu
mcuRetry := initialMcuRetry
mcuRetryTimer := time.NewTimer(mcuRetry)
@ -169,21 +170,21 @@ func main() {
case signaling.McuTypeJanus:
mcu, err = signaling.NewMcuJanus(mcuUrl, config, nats)
case signaling.McuTypeProxy:
mcu, err = signaling.NewMcuProxy(mcuUrl, config)
mcu, err = signaling.NewMcuProxy(config)
default:
log.Fatal("Unsupported MCU type: ", mcuType)
}
if err == nil {
err = mcu.Start()
if err != nil {
log.Printf("Could not create %s MCU at %s: %s", mcuType, mcuUrl, err)
log.Printf("Could not create %s MCU: %s", mcuType, err)
}
}
if err == nil {
break
}
log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, mcuUrl, err, mcuRetry)
log.Printf("Could not initialize %s MCU (%s) will retry in %s", mcuType, err, mcuRetry)
mcuRetryTimer.Reset(mcuRetry)
select {
case sig := <-sigChan:
@ -197,8 +198,9 @@ func main() {
} else {
mcuUrl, _ = config.GetString("mcu", "url")
mcuType, _ = config.GetString("mcu", "type")
if mcuType == "" {
mcuType = signaling.McuTypeDefault
if mcuType == "" && mcuUrl != "" {
log.Printf("WARNING: Old-style MCU configuration detected with url but no type, defaulting to type %s", signaling.McuTypeJanus)
mcuType = signaling.McuTypeJanus
}
}
}
@ -212,7 +214,7 @@ func main() {
}
defer mcu.Stop()
log.Printf("Using MCU %s at %s\n", mcuType, mcuUrl)
log.Printf("Using %s MCU", mcuType)
hub.SetMcu(mcu)
}

View file

@ -252,3 +252,19 @@ type EventProxyServerMessage struct {
ClientId string `json:"clientId,omitempty"`
Load int64 `json:"load,omitempty"`
}
// Information on a proxy in the etcd cluster.
type ProxyInformationEtcd struct {
Address string `json:"address"`
}
func (p *ProxyInformationEtcd) CheckValid() error {
if p.Address == "" {
return fmt.Errorf("address missing")
}
if p.Address[len(p.Address)-1] != '/' {
p.Address += "/"
}
return nil
}

View file

@ -39,6 +39,10 @@ import (
"github.com/dlintw/goconf"
"github.com/gorilla/websocket"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/srv"
"go.etcd.io/etcd/pkg/transport"
"gopkg.in/dgrijalva/jwt-go.v3"
)
@ -53,6 +57,9 @@ const (
// Sort connections by load every 10 publishing requests or once per second.
connectionSortRequests = 10
connectionSortInterval = time.Second
proxyUrlTypeStatic = "static"
proxyUrlTypeEtcd = "etcd"
)
type mcuProxyPubSubCommon struct {
@ -871,6 +878,12 @@ type mcuProxy struct {
tokenId string
tokenKey *rsa.PrivateKey
etcdMu sync.Mutex
client atomic.Value
keyPrefix atomic.Value
keyInfos map[string]*ProxyInformationEtcd
urlToKey map[string]string
connections []*mcuProxyConnection
connectionsMap map[string]*mcuProxyConnection
connectionsMu sync.RWMutex
@ -884,7 +897,9 @@ type mcuProxy struct {
publisherWaiters map[uint64]chan bool
}
func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) {
func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) {
urlType, _ := config.GetString("mcu", "urltype")
tokenId, _ := config.GetString("mcu", "token_id")
if tokenId == "" {
return nil, fmt.Errorf("No token id configured")
@ -913,22 +928,43 @@ func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) {
publisherWaiters: make(map[uint64]chan bool),
}
for _, u := range strings.Split(baseUrl, " ") {
conn, err := newMcuProxyConnection(mcu, u)
if err != nil {
switch urlType {
case proxyUrlTypeStatic:
mcuUrl, _ := config.GetString("mcu", "url")
for _, u := range strings.Split(mcuUrl, " ") {
conn, err := newMcuProxyConnection(mcu, u)
if err != nil {
return nil, err
}
mcu.connections = append(mcu.connections, conn)
mcu.connectionsMap[u] = conn
}
if len(mcu.connections) == 0 {
return nil, fmt.Errorf("No MCU proxy connections configured")
}
case proxyUrlTypeEtcd:
mcu.keyInfos = make(map[string]*ProxyInformationEtcd)
mcu.urlToKey = make(map[string]string)
if err := mcu.configureEtcd(config, false); err != nil {
return nil, err
}
mcu.connections = append(mcu.connections, conn)
mcu.connectionsMap[u] = conn
}
if len(mcu.connections) == 0 {
return nil, fmt.Errorf("No MCU proxy connections configured")
default:
return nil, fmt.Errorf("Unsupported proxy URL type %s", urlType)
}
return mcu, nil
}
func (m *mcuProxy) getEtcdClient() *clientv3.Client {
c := m.client.Load()
if c == nil {
return nil
}
return c.(*clientv3.Client)
}
func (m *mcuProxy) Start() error {
m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock()
@ -952,6 +988,104 @@ func (m *mcuProxy) Stop() {
}
}
func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) error {
keyPrefix, _ := config.GetString("mcu", "keyprefix")
if keyPrefix == "" {
keyPrefix = "/%s"
}
var endpoints []string
if endpointsString, _ := config.GetString("mcu", "endpoints"); endpointsString != "" {
for _, ep := range strings.Split(endpointsString, ",") {
ep := strings.TrimSpace(ep)
if ep != "" {
endpoints = append(endpoints, ep)
}
}
} else if discoverySrv, _ := config.GetString("mcu", "discoverysrv"); discoverySrv != "" {
discoveryService, _ := config.GetString("mcu", "discoveryservice")
clients, err := srv.GetClient("etcd-client", discoverySrv, discoveryService)
if err != nil {
if !ignoreErrors {
return fmt.Errorf("Could not discover endpoints for %s: %s", discoverySrv, err)
}
} else {
endpoints = clients.Endpoints
}
}
if len(endpoints) == 0 {
if !ignoreErrors {
return fmt.Errorf("No proxy URL endpoints configured")
}
log.Printf("No proxy URL endpoints configured, not changing client")
} else {
cfg := clientv3.Config{
Endpoints: endpoints,
// set timeout per request to fail fast when the target endpoint is unavailable
DialTimeout: time.Second,
}
clientKey, _ := config.GetString("mcu", "clientkey")
clientCert, _ := config.GetString("mcu", "clientcert")
caCert, _ := config.GetString("mcu", "cacert")
if clientKey != "" && clientCert != "" && caCert != "" {
tlsInfo := transport.TLSInfo{
CertFile: clientCert,
KeyFile: clientKey,
TrustedCAFile: caCert,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
if !ignoreErrors {
return fmt.Errorf("Could not setup TLS configuration: %s", err)
}
log.Printf("Could not setup TLS configuration, will be disabled (%s)", err)
} else {
cfg.TLS = tlsConfig
}
}
c, err := clientv3.New(cfg)
if err != nil {
if !ignoreErrors {
return err
}
log.Printf("Could not create new client from proxy URL endpoints %+v: %s", endpoints, err)
} else {
prev := m.getEtcdClient()
if prev != nil {
prev.Close()
}
m.client.Store(c)
log.Printf("Using proxy URL endpoints %+v", endpoints)
ch := c.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix())
go m.processWatches(ch)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
response, err := c.Get(ctx, keyPrefix, clientv3.WithPrefix())
if err != nil {
log.Printf("Could not get initial list of proxy URLs: %s", err)
} else {
for _, ev := range response.Kvs {
m.addEtcdProxy(string(ev.Key), ev.Value)
}
}
}()
}
}
return nil
}
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
@ -1002,6 +1136,98 @@ func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
}
}
func (m *mcuProxy) processWatches(ch clientv3.WatchChan) {
for response := range ch {
for _, ev := range response.Events {
switch ev.Type {
case clientv3.EventTypePut:
m.addEtcdProxy(string(ev.Kv.Key), ev.Kv.Value)
case clientv3.EventTypeDelete:
m.removeEtcdProxy(string(ev.Kv.Key))
default:
log.Printf("Unsupported event %s %q -> %q", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
}
func (m *mcuProxy) addEtcdProxy(key string, data []byte) {
var info ProxyInformationEtcd
if err := json.Unmarshal(data, &info); err != nil {
log.Printf("Could not decode proxy information %s: %s", string(data), err)
return
}
if err := info.CheckValid(); err != nil {
log.Printf("Received invalid proxy information %s: %s", string(data), err)
return
}
m.etcdMu.Lock()
defer m.etcdMu.Unlock()
prev, found := m.keyInfos[key]
if found && info.Address != prev.Address {
// Address of a proxy has changed.
m.removeEtcdProxyLocked(key)
}
if otherKey, found := m.urlToKey[info.Address]; found && otherKey != key {
log.Printf("Address %s is already registered for key %s, ignoring %s", info.Address, otherKey, key)
return
}
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
if conn, found := m.connectionsMap[info.Address]; found {
m.keyInfos[key] = &info
m.urlToKey[info.Address] = key
conn.stopCloseIfEmpty()
} else {
conn, err := newMcuProxyConnection(m, info.Address)
if err != nil {
log.Printf("Could not create proxy connection to %s: %s", info.Address, err)
return
}
if err := conn.start(); err != nil {
log.Printf("Could not start new connection to %s: %s", info.Address, err)
return
}
log.Printf("Adding new connection to %s (from %s)", info.Address, key)
m.keyInfos[key] = &info
m.urlToKey[info.Address] = key
m.connections = append(m.connections, conn)
m.connectionsMap[info.Address] = conn
atomic.StoreInt64(&m.nextSort, 0)
}
}
func (m *mcuProxy) removeEtcdProxy(key string) {
m.etcdMu.Lock()
defer m.etcdMu.Unlock()
m.removeEtcdProxyLocked(key)
}
func (m *mcuProxy) removeEtcdProxyLocked(key string) {
info, found := m.keyInfos[key]
if !found {
return
}
delete(m.keyInfos, key)
delete(m.urlToKey, info.Address)
log.Printf("Removing connection to %s (from %s)", info.Address, key)
m.connectionsMu.RLock()
defer m.connectionsMu.RUnlock()
if conn, found := m.connectionsMap[info.Address]; found {
go conn.closeIfEmpty()
}
}
func (m *mcuProxy) removeConnection(c *mcuProxyConnection) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()