mirror of
https://github.com/strukturag/nextcloud-spreed-signaling
synced 2024-06-08 08:52:27 +02:00
Merge pull request #47 from strukturag/proxy-list-etcd
Add support for fetching proxy URLs from etcd cluster.
This commit is contained in:
commit
34de2b9b58
|
@ -99,12 +99,12 @@ connectionsperhost = 8
|
|||
|
||||
[mcu]
|
||||
# The type of the MCU to use. Currently only "janus" and "proxy" are supported.
|
||||
type = janus
|
||||
# Leave empty to disable MCU functionality.
|
||||
#type =
|
||||
|
||||
# For type "janus": the URL to the websocket endpoint of the MCU server.
|
||||
# For type "proxy": a space-separated list of proxy URLs to connect to.
|
||||
# Leave empty to disable MCU functionality.
|
||||
url =
|
||||
#url =
|
||||
|
||||
# For type "janus": the maximum bitrate per publishing stream (in bits per
|
||||
# second).
|
||||
|
@ -116,6 +116,14 @@ url =
|
|||
# Default is 2 mbit/sec.
|
||||
#maxscreenbitrate = 2097152
|
||||
|
||||
# For type "proxy": type of URL configuration for proxy servers.
|
||||
# Defaults to "static".
|
||||
#
|
||||
# Possible values:
|
||||
# - static: A space-separated list of proxy URLs is given in the "url" option.
|
||||
# - etcd: Proxy URLs are retrieved from an etcd cluster (see below).
|
||||
#urltype = "static"
|
||||
|
||||
# For type "proxy": the id of the token to use when connecting to proxy servers.
|
||||
#token_id = server1
|
||||
|
||||
|
@ -123,6 +131,31 @@ url =
|
|||
# connecting to proxy servers.
|
||||
#token_key = privkey.pem
|
||||
|
||||
# For url type "etcd": Comma-separated list of static etcd endpoints to
|
||||
# connect to.
|
||||
#endpoints = 127.0.0.1:2379,127.0.0.1:22379,127.0.0.1:32379
|
||||
|
||||
# For url type "etcd": Options to perform endpoint discovery through DNS SRV.
|
||||
# Only used if no endpoints are configured manually.
|
||||
#discoverysrv = example.com
|
||||
#discoveryservice = foo
|
||||
|
||||
# For url type "etcd": Path to private key, client certificate and CA
|
||||
# certificate if TLS authentication should be used.
|
||||
#clientkey = /path/to/etcd-client.key
|
||||
#clientcert = /path/to/etcd-client.crt
|
||||
#cacert = /path/to/etcd-ca.crt
|
||||
|
||||
# For url type "etcd": Key prefix of MCU proxy entries. All keys below will be
|
||||
# watched and assumed to contain a JSON document. The entry "address" from this
|
||||
# document will be used as proxy URL, other contents in the document will be
|
||||
# ignored.
|
||||
#
|
||||
# Example:
|
||||
# "/signaling/proxy/server/one" -> {"address": "https://proxy1.domain.invalid"}
|
||||
# "/signaling/proxy/server/two" -> {"address": "https://proxy2.domain.invalid"}
|
||||
#keyprefix = /signaling/proxy/server
|
||||
|
||||
[turn]
|
||||
# API key that the MCU will need to send when requesting TURN credentials.
|
||||
#apikey = the-api-key-for-the-rest-service
|
||||
|
|
|
@ -91,7 +91,7 @@ func createTLSListener(addr string, certFile, keyFile string) (net.Listener, err
|
|||
}
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile)
|
||||
log.SetFlags(log.Lshortfile)
|
||||
flag.Parse()
|
||||
|
||||
if *showVersion {
|
||||
|
@ -155,12 +155,13 @@ func main() {
|
|||
}
|
||||
|
||||
mcuUrl, _ := config.GetString("mcu", "url")
|
||||
if mcuUrl != "" {
|
||||
mcuType, _ := config.GetString("mcu", "type")
|
||||
if mcuType == "" {
|
||||
mcuType = signaling.McuTypeDefault
|
||||
if mcuType == "" && mcuUrl != "" {
|
||||
log.Printf("WARNING: Old-style MCU configuration detected with url but no type, defaulting to type %s", signaling.McuTypeJanus)
|
||||
mcuType = signaling.McuTypeJanus
|
||||
}
|
||||
|
||||
if mcuType != "" {
|
||||
var mcu signaling.Mcu
|
||||
mcuRetry := initialMcuRetry
|
||||
mcuRetryTimer := time.NewTimer(mcuRetry)
|
||||
|
@ -169,21 +170,21 @@ func main() {
|
|||
case signaling.McuTypeJanus:
|
||||
mcu, err = signaling.NewMcuJanus(mcuUrl, config, nats)
|
||||
case signaling.McuTypeProxy:
|
||||
mcu, err = signaling.NewMcuProxy(mcuUrl, config)
|
||||
mcu, err = signaling.NewMcuProxy(config)
|
||||
default:
|
||||
log.Fatal("Unsupported MCU type: ", mcuType)
|
||||
}
|
||||
if err == nil {
|
||||
err = mcu.Start()
|
||||
if err != nil {
|
||||
log.Printf("Could not create %s MCU at %s: %s", mcuType, mcuUrl, err)
|
||||
log.Printf("Could not create %s MCU: %s", mcuType, err)
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, mcuUrl, err, mcuRetry)
|
||||
log.Printf("Could not initialize %s MCU (%s) will retry in %s", mcuType, err, mcuRetry)
|
||||
mcuRetryTimer.Reset(mcuRetry)
|
||||
select {
|
||||
case sig := <-sigChan:
|
||||
|
@ -197,8 +198,9 @@ func main() {
|
|||
} else {
|
||||
mcuUrl, _ = config.GetString("mcu", "url")
|
||||
mcuType, _ = config.GetString("mcu", "type")
|
||||
if mcuType == "" {
|
||||
mcuType = signaling.McuTypeDefault
|
||||
if mcuType == "" && mcuUrl != "" {
|
||||
log.Printf("WARNING: Old-style MCU configuration detected with url but no type, defaulting to type %s", signaling.McuTypeJanus)
|
||||
mcuType = signaling.McuTypeJanus
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -212,7 +214,7 @@ func main() {
|
|||
}
|
||||
defer mcu.Stop()
|
||||
|
||||
log.Printf("Using MCU %s at %s\n", mcuType, mcuUrl)
|
||||
log.Printf("Using %s MCU", mcuType)
|
||||
hub.SetMcu(mcu)
|
||||
}
|
||||
|
||||
|
|
|
@ -252,3 +252,19 @@ type EventProxyServerMessage struct {
|
|||
ClientId string `json:"clientId,omitempty"`
|
||||
Load int64 `json:"load,omitempty"`
|
||||
}
|
||||
|
||||
// Information on a proxy in the etcd cluster.
|
||||
|
||||
type ProxyInformationEtcd struct {
|
||||
Address string `json:"address"`
|
||||
}
|
||||
|
||||
func (p *ProxyInformationEtcd) CheckValid() error {
|
||||
if p.Address == "" {
|
||||
return fmt.Errorf("address missing")
|
||||
}
|
||||
if p.Address[len(p.Address)-1] != '/' {
|
||||
p.Address += "/"
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -39,6 +39,10 @@ import (
|
|||
"github.com/dlintw/goconf"
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/pkg/srv"
|
||||
"go.etcd.io/etcd/pkg/transport"
|
||||
|
||||
"gopkg.in/dgrijalva/jwt-go.v3"
|
||||
)
|
||||
|
||||
|
@ -53,6 +57,9 @@ const (
|
|||
// Sort connections by load every 10 publishing requests or once per second.
|
||||
connectionSortRequests = 10
|
||||
connectionSortInterval = time.Second
|
||||
|
||||
proxyUrlTypeStatic = "static"
|
||||
proxyUrlTypeEtcd = "etcd"
|
||||
)
|
||||
|
||||
type mcuProxyPubSubCommon struct {
|
||||
|
@ -871,6 +878,12 @@ type mcuProxy struct {
|
|||
tokenId string
|
||||
tokenKey *rsa.PrivateKey
|
||||
|
||||
etcdMu sync.Mutex
|
||||
client atomic.Value
|
||||
keyPrefix atomic.Value
|
||||
keyInfos map[string]*ProxyInformationEtcd
|
||||
urlToKey map[string]string
|
||||
|
||||
connections []*mcuProxyConnection
|
||||
connectionsMap map[string]*mcuProxyConnection
|
||||
connectionsMu sync.RWMutex
|
||||
|
@ -884,7 +897,9 @@ type mcuProxy struct {
|
|||
publisherWaiters map[uint64]chan bool
|
||||
}
|
||||
|
||||
func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) {
|
||||
func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) {
|
||||
urlType, _ := config.GetString("mcu", "urltype")
|
||||
|
||||
tokenId, _ := config.GetString("mcu", "token_id")
|
||||
if tokenId == "" {
|
||||
return nil, fmt.Errorf("No token id configured")
|
||||
|
@ -913,7 +928,10 @@ func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) {
|
|||
publisherWaiters: make(map[uint64]chan bool),
|
||||
}
|
||||
|
||||
for _, u := range strings.Split(baseUrl, " ") {
|
||||
switch urlType {
|
||||
case proxyUrlTypeStatic:
|
||||
mcuUrl, _ := config.GetString("mcu", "url")
|
||||
for _, u := range strings.Split(mcuUrl, " ") {
|
||||
conn, err := newMcuProxyConnection(mcu, u)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -925,10 +943,28 @@ func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) {
|
|||
if len(mcu.connections) == 0 {
|
||||
return nil, fmt.Errorf("No MCU proxy connections configured")
|
||||
}
|
||||
case proxyUrlTypeEtcd:
|
||||
mcu.keyInfos = make(map[string]*ProxyInformationEtcd)
|
||||
mcu.urlToKey = make(map[string]string)
|
||||
if err := mcu.configureEtcd(config, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("Unsupported proxy URL type %s", urlType)
|
||||
}
|
||||
|
||||
return mcu, nil
|
||||
}
|
||||
|
||||
func (m *mcuProxy) getEtcdClient() *clientv3.Client {
|
||||
c := m.client.Load()
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.(*clientv3.Client)
|
||||
}
|
||||
|
||||
func (m *mcuProxy) Start() error {
|
||||
m.connectionsMu.RLock()
|
||||
defer m.connectionsMu.RUnlock()
|
||||
|
@ -952,6 +988,104 @@ func (m *mcuProxy) Stop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) error {
|
||||
keyPrefix, _ := config.GetString("mcu", "keyprefix")
|
||||
if keyPrefix == "" {
|
||||
keyPrefix = "/%s"
|
||||
}
|
||||
|
||||
var endpoints []string
|
||||
if endpointsString, _ := config.GetString("mcu", "endpoints"); endpointsString != "" {
|
||||
for _, ep := range strings.Split(endpointsString, ",") {
|
||||
ep := strings.TrimSpace(ep)
|
||||
if ep != "" {
|
||||
endpoints = append(endpoints, ep)
|
||||
}
|
||||
}
|
||||
} else if discoverySrv, _ := config.GetString("mcu", "discoverysrv"); discoverySrv != "" {
|
||||
discoveryService, _ := config.GetString("mcu", "discoveryservice")
|
||||
clients, err := srv.GetClient("etcd-client", discoverySrv, discoveryService)
|
||||
if err != nil {
|
||||
if !ignoreErrors {
|
||||
return fmt.Errorf("Could not discover endpoints for %s: %s", discoverySrv, err)
|
||||
}
|
||||
} else {
|
||||
endpoints = clients.Endpoints
|
||||
}
|
||||
}
|
||||
|
||||
if len(endpoints) == 0 {
|
||||
if !ignoreErrors {
|
||||
return fmt.Errorf("No proxy URL endpoints configured")
|
||||
}
|
||||
|
||||
log.Printf("No proxy URL endpoints configured, not changing client")
|
||||
} else {
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: endpoints,
|
||||
|
||||
// set timeout per request to fail fast when the target endpoint is unavailable
|
||||
DialTimeout: time.Second,
|
||||
}
|
||||
|
||||
clientKey, _ := config.GetString("mcu", "clientkey")
|
||||
clientCert, _ := config.GetString("mcu", "clientcert")
|
||||
caCert, _ := config.GetString("mcu", "cacert")
|
||||
if clientKey != "" && clientCert != "" && caCert != "" {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: clientCert,
|
||||
KeyFile: clientKey,
|
||||
TrustedCAFile: caCert,
|
||||
}
|
||||
tlsConfig, err := tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
if !ignoreErrors {
|
||||
return fmt.Errorf("Could not setup TLS configuration: %s", err)
|
||||
}
|
||||
|
||||
log.Printf("Could not setup TLS configuration, will be disabled (%s)", err)
|
||||
} else {
|
||||
cfg.TLS = tlsConfig
|
||||
}
|
||||
}
|
||||
|
||||
c, err := clientv3.New(cfg)
|
||||
if err != nil {
|
||||
if !ignoreErrors {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Could not create new client from proxy URL endpoints %+v: %s", endpoints, err)
|
||||
} else {
|
||||
prev := m.getEtcdClient()
|
||||
if prev != nil {
|
||||
prev.Close()
|
||||
}
|
||||
m.client.Store(c)
|
||||
log.Printf("Using proxy URL endpoints %+v", endpoints)
|
||||
|
||||
ch := c.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix())
|
||||
go m.processWatches(ch)
|
||||
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
response, err := c.Get(ctx, keyPrefix, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
log.Printf("Could not get initial list of proxy URLs: %s", err)
|
||||
} else {
|
||||
for _, ev := range response.Kvs {
|
||||
m.addEtcdProxy(string(ev.Key), ev.Value)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
|
||||
m.connectionsMu.Lock()
|
||||
defer m.connectionsMu.Unlock()
|
||||
|
@ -1002,6 +1136,98 @@ func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *mcuProxy) processWatches(ch clientv3.WatchChan) {
|
||||
for response := range ch {
|
||||
for _, ev := range response.Events {
|
||||
switch ev.Type {
|
||||
case clientv3.EventTypePut:
|
||||
m.addEtcdProxy(string(ev.Kv.Key), ev.Kv.Value)
|
||||
case clientv3.EventTypeDelete:
|
||||
m.removeEtcdProxy(string(ev.Kv.Key))
|
||||
default:
|
||||
log.Printf("Unsupported event %s %q -> %q", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mcuProxy) addEtcdProxy(key string, data []byte) {
|
||||
var info ProxyInformationEtcd
|
||||
if err := json.Unmarshal(data, &info); err != nil {
|
||||
log.Printf("Could not decode proxy information %s: %s", string(data), err)
|
||||
return
|
||||
}
|
||||
if err := info.CheckValid(); err != nil {
|
||||
log.Printf("Received invalid proxy information %s: %s", string(data), err)
|
||||
return
|
||||
}
|
||||
|
||||
m.etcdMu.Lock()
|
||||
defer m.etcdMu.Unlock()
|
||||
|
||||
prev, found := m.keyInfos[key]
|
||||
if found && info.Address != prev.Address {
|
||||
// Address of a proxy has changed.
|
||||
m.removeEtcdProxyLocked(key)
|
||||
}
|
||||
|
||||
if otherKey, found := m.urlToKey[info.Address]; found && otherKey != key {
|
||||
log.Printf("Address %s is already registered for key %s, ignoring %s", info.Address, otherKey, key)
|
||||
return
|
||||
}
|
||||
|
||||
m.connectionsMu.Lock()
|
||||
defer m.connectionsMu.Unlock()
|
||||
if conn, found := m.connectionsMap[info.Address]; found {
|
||||
m.keyInfos[key] = &info
|
||||
m.urlToKey[info.Address] = key
|
||||
conn.stopCloseIfEmpty()
|
||||
} else {
|
||||
conn, err := newMcuProxyConnection(m, info.Address)
|
||||
if err != nil {
|
||||
log.Printf("Could not create proxy connection to %s: %s", info.Address, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := conn.start(); err != nil {
|
||||
log.Printf("Could not start new connection to %s: %s", info.Address, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Adding new connection to %s (from %s)", info.Address, key)
|
||||
m.keyInfos[key] = &info
|
||||
m.urlToKey[info.Address] = key
|
||||
m.connections = append(m.connections, conn)
|
||||
m.connectionsMap[info.Address] = conn
|
||||
atomic.StoreInt64(&m.nextSort, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mcuProxy) removeEtcdProxy(key string) {
|
||||
m.etcdMu.Lock()
|
||||
defer m.etcdMu.Unlock()
|
||||
|
||||
m.removeEtcdProxyLocked(key)
|
||||
}
|
||||
|
||||
func (m *mcuProxy) removeEtcdProxyLocked(key string) {
|
||||
info, found := m.keyInfos[key]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
delete(m.keyInfos, key)
|
||||
delete(m.urlToKey, info.Address)
|
||||
|
||||
log.Printf("Removing connection to %s (from %s)", info.Address, key)
|
||||
|
||||
m.connectionsMu.RLock()
|
||||
defer m.connectionsMu.RUnlock()
|
||||
if conn, found := m.connectionsMap[info.Address]; found {
|
||||
go conn.closeIfEmpty()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mcuProxy) removeConnection(c *mcuProxyConnection) {
|
||||
m.connectionsMu.Lock()
|
||||
defer m.connectionsMu.Unlock()
|
||||
|
|
Loading…
Reference in a new issue