From 4446b079514df3d5e3a6ea80e4bc9af7d807748a Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Fri, 7 Aug 2020 10:27:28 +0200 Subject: [PATCH] Add MCU type "proxy" that delegates to one or multiple MCU proxies. --- Makefile | 1 + dependencies.tsv | 1 + server.conf.in | 20 +- src/server/main.go | 2 + src/signaling/api_proxy.go | 254 +++++++++ src/signaling/mcu_common.go | 1 + src/signaling/mcu_proxy.go | 1035 +++++++++++++++++++++++++++++++++++ 7 files changed, 1309 insertions(+), 5 deletions(-) create mode 100644 src/signaling/api_proxy.go create mode 100644 src/signaling/mcu_proxy.go diff --git a/Makefile b/Makefile index ee0c9c1..9d1cdcf 100644 --- a/Makefile +++ b/Makefile @@ -97,6 +97,7 @@ coverhtml: dependencies vet common common: easyjson \ src/signaling/api_signaling_easyjson.go \ src/signaling/api_backend_easyjson.go \ + src/signaling/api_proxy_easyjson.go \ src/signaling/natsclient_easyjson.go \ src/signaling/room_easyjson.go diff --git a/dependencies.tsv b/dependencies.tsv index ae64cf9..1a865cd 100644 --- a/dependencies.tsv +++ b/dependencies.tsv @@ -10,3 +10,4 @@ github.com/notedit/janus-go git 8e6e2c423c03884d938d84442d37d6f6f5294197 2017-06 github.com/oschwald/maxminddb-golang git 1960b16a5147df3a4c61ac83b2f31cd8f811d609 2019-05-23T23:57:38Z golang.org/x/net git f01ecb60fe3835d80d9a0b7b2bf24b228c89260e 2017-07-11T18:12:19Z golang.org/x/sys git ac767d655b305d4e9612f5f6e33120b9176c4ad4 2018-07-15T08:55:29Z +gopkg.in/dgrijalva/jwt-go.v3 git 06ea1031745cb8b3dab3f6a236daf2b0aa468b7e 2018-03-08T23:13:08Z diff --git a/server.conf.in b/server.conf.in index 6488e4e..aaf10c2 100644 --- a/server.conf.in +++ b/server.conf.in @@ -98,21 +98,31 @@ connectionsperhost = 8 #url = nats://localhost:4222 [mcu] -# The type of the MCU to use. Currently only "janus" is supported. +# The type of the MCU to use. Currently only "janus" and "proxy" are supported. type = janus -# The URL to the websocket endpoint of the MCU server. Leave empty to disable -# MCU functionality. +# For type "janus": the URL to the websocket endpoint of the MCU server. +# For type "proxy": a space-separated list of proxy URLs to connect to. +# Leave empty to disable MCU functionality. url = -# The maximum bitrate per publishing stream (in bits per second). +# For type "janus": the maximum bitrate per publishing stream (in bits per +# second). # Defaults to 1 mbit/sec. #maxstreambitrate = 1048576 -# The maximum bitrate per screensharing stream (in bits per second). +# For type "janus": the maximum bitrate per screensharing stream (in bits per +# second). # Default is 2 mbit/sec. #maxscreenbitrate = 2097152 +# For type "proxy": the id of the token to use when connecting to proxy servers. +#token_id = server1 + +# For type "proxy": the private key for the configured token id to use when +# connecting to proxy servers. +#token_key = privkey.pem + [turn] # API key that the MCU will need to send when requesting TURN credentials. #apikey = the-api-key-for-the-rest-service diff --git a/src/server/main.go b/src/server/main.go index 468b899..f052aa7 100644 --- a/src/server/main.go +++ b/src/server/main.go @@ -166,6 +166,8 @@ func main() { switch mcuType { case signaling.McuTypeJanus: mcu, err = signaling.NewMcuJanus(mcuUrl, config, nats) + case signaling.McuTypeProxy: + mcu, err = signaling.NewMcuProxy(mcuUrl, config) default: log.Fatal("Unsupported MCU type: ", mcuType) } diff --git a/src/signaling/api_proxy.go b/src/signaling/api_proxy.go new file mode 100644 index 0000000..ad78a4c --- /dev/null +++ b/src/signaling/api_proxy.go @@ -0,0 +1,254 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2020 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "fmt" + + "gopkg.in/dgrijalva/jwt-go.v3" +) + +type ProxyClientMessage struct { + // The unique request id (optional). + Id string `json:"id,omitempty"` + + // The type of the request. + Type string `json:"type"` + + // Filled for type "hello" + Hello *HelloProxyClientMessage `json:"hello,omitempty"` + + Bye *ByeProxyClientMessage `json:"bye,omitempty"` + + Command *CommandProxyClientMessage `json:"command,omitempty"` + + Payload *PayloadProxyClientMessage `json:"payload,omitempty"` +} + +func (m *ProxyClientMessage) CheckValid() error { + switch m.Type { + case "": + return fmt.Errorf("type missing") + case "hello": + if m.Hello == nil { + return fmt.Errorf("hello missing") + } else if err := m.Hello.CheckValid(); err != nil { + return err + } + case "bye": + if m.Bye != nil { + // Bye contents are optional + if err := m.Bye.CheckValid(); err != nil { + return err + } + } + case "command": + if m.Command == nil { + return fmt.Errorf("command missing") + } else if err := m.Command.CheckValid(); err != nil { + return err + } + case "payload": + if m.Payload == nil { + return fmt.Errorf("payload missing") + } else if err := m.Payload.CheckValid(); err != nil { + return err + } + } + return nil +} + +func (m *ProxyClientMessage) NewErrorServerMessage(e *Error) *ProxyServerMessage { + return &ProxyServerMessage{ + Id: m.Id, + Type: "error", + Error: e, + } +} + +func (m *ProxyClientMessage) NewWrappedErrorServerMessage(e error) *ProxyServerMessage { + return m.NewErrorServerMessage(NewError("internal_error", e.Error())) +} + +// ProxyServerMessage is a message that is sent from the server to a client. +type ProxyServerMessage struct { + Id string `json:"id,omitempty"` + + Type string `json:"type"` + + Error *Error `json:"error,omitempty"` + + Hello *HelloProxyServerMessage `json:"hello,omitempty"` + + Bye *ByeProxyServerMessage `json:"bye,omitempty"` + + Command *CommandProxyServerMessage `json:"command,omitempty"` + + Payload *PayloadProxyServerMessage `json:"payload,omitempty"` + + Event *EventProxyServerMessage `json:"event,omitempty"` +} + +func (r *ProxyServerMessage) CloseAfterSend(session Session) bool { + if r.Type == "bye" { + return true + } + + return false +} + +// Type "hello" + +type TokenClaims struct { + jwt.StandardClaims +} + +type HelloProxyClientMessage struct { + Version string `json:"version"` + + ResumeId string `json:"resumeid"` + + Features []string `json:"features,omitempty"` + + // The authentication credentials. + Token string `json:"token"` +} + +func (m *HelloProxyClientMessage) CheckValid() error { + if m.Version != HelloVersion { + return fmt.Errorf("unsupported hello version: %s", m.Version) + } + if m.ResumeId == "" { + if m.Token == "" { + return fmt.Errorf("token missing") + } + } + return nil +} + +type HelloProxyServerMessage struct { + Version string `json:"version"` + + SessionId string `json:"sessionid"` + Server *HelloServerMessageServer `json:"server,omitempty"` +} + +// Type "bye" + +type ByeProxyClientMessage struct { +} + +func (m *ByeProxyClientMessage) CheckValid() error { + // No additional validation required. + return nil +} + +type ByeProxyServerMessage struct { + Reason string `json:"reason"` +} + +// Type "command" + +type CommandProxyClientMessage struct { + Type string `json:"type"` + + StreamType string `json:"streamType,omitempty"` + PublisherId string `json:"publisherId,omitempty"` + ClientId string `json:"clientId,omitempty"` +} + +func (m *CommandProxyClientMessage) CheckValid() error { + switch m.Type { + case "": + return fmt.Errorf("type missing") + case "create-publisher": + if m.StreamType == "" { + return fmt.Errorf("stream type missing") + } + case "create-subscriber": + if m.PublisherId == "" { + return fmt.Errorf("publisher id missing") + } + if m.StreamType == "" { + return fmt.Errorf("stream type missing") + } + case "delete-publisher": + fallthrough + case "delete-subscriber": + if m.ClientId == "" { + return fmt.Errorf("client id missing") + } + } + return nil +} + +type CommandProxyServerMessage struct { + Id string `json:"id,omitempty"` +} + +// Type "payload" + +type PayloadProxyClientMessage struct { + Type string `json:"type"` + + ClientId string `json:"clientId"` + Payload map[string]interface{} `json:"payload,omitempty"` +} + +func (m *PayloadProxyClientMessage) CheckValid() error { + switch m.Type { + case "": + return fmt.Errorf("type missing") + case "offer": + fallthrough + case "answer": + fallthrough + case "candidate": + if len(m.Payload) == 0 { + return fmt.Errorf("payload missing") + } + case "endOfCandidates": + fallthrough + case "requestoffer": + // No payload required. + } + if m.ClientId == "" { + return fmt.Errorf("client id missing") + } + return nil +} + +type PayloadProxyServerMessage struct { + Type string `json:"type"` + + ClientId string `json:"clientId"` + Payload map[string]interface{} `json:"payload"` +} + +// Type "event" + +type EventProxyServerMessage struct { + Type string `json:"type"` + + ClientId string `json:"clientId,omitempty"` + Load int64 `json:"load,omitempty"` +} diff --git a/src/signaling/mcu_common.go b/src/signaling/mcu_common.go index 3429e9d..20721af 100644 --- a/src/signaling/mcu_common.go +++ b/src/signaling/mcu_common.go @@ -29,6 +29,7 @@ import ( const ( McuTypeJanus = "janus" + McuTypeProxy = "proxy" McuTypeDefault = McuTypeJanus ) diff --git a/src/signaling/mcu_proxy.go b/src/signaling/mcu_proxy.go new file mode 100644 index 0000000..f5c0bbb --- /dev/null +++ b/src/signaling/mcu_proxy.go @@ -0,0 +1,1035 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2020 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "crypto/rsa" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/url" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/dlintw/goconf" + "github.com/gorilla/websocket" + + "golang.org/x/net/context" + + "gopkg.in/dgrijalva/jwt-go.v3" +) + +const ( + closeTimeout = time.Second + + proxyDebugMessages = false + + // Very high value so the connections get sorted at the end. + loadNotConnected = 1000000 + + // Sort connections by load every 10 publishing requests or once per second. + connectionSortRequests = 10 + connectionSortInterval = time.Second +) + +type mcuProxyPubSubCommon struct { + streamType string + proxyId string + conn *mcuProxyConnection + listener McuListener +} + +func (c *mcuProxyPubSubCommon) Id() string { + return c.proxyId +} + +func (c *mcuProxyPubSubCommon) StreamType() string { + return c.streamType +} + +func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClientMessage, callback func(error, map[string]interface{})) { + c.conn.performAsyncRequest(ctx, msg, func(err error, response *ProxyServerMessage) { + if err != nil { + callback(err, nil) + return + } + + if proxyDebugMessages { + log.Printf("Response from %s: %+v", c.conn.url, response) + } + if response.Type == "error" { + callback(response.Error, nil) + } else if response.Payload != nil { + callback(nil, response.Payload.Payload) + } else { + callback(nil, nil) + } + }) +} + +func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadProxyServerMessage) { + switch msg.Type { + case "candidate": + c.listener.OnIceCandidate(client, msg.Payload["candidate"]) + default: + log.Printf("Unsupported payload from %s: %+v", c.conn.url, msg) + } +} + +type mcuProxyPublisher struct { + mcuProxyPubSubCommon + + id string +} + +func newMcuProxyPublisher(id string, streamType string, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher { + return &mcuProxyPublisher{ + mcuProxyPubSubCommon: mcuProxyPubSubCommon{ + streamType: streamType, + proxyId: proxyId, + conn: conn, + listener: listener, + }, + id: id, + } +} + +func (p *mcuProxyPublisher) NotifyClosed() { + p.listener.PublisherClosed(p) + p.conn.removePublisher(p) +} + +func (p *mcuProxyPublisher) Close(ctx context.Context) { + p.NotifyClosed() + + msg := &ProxyClientMessage{ + Type: "command", + Command: &CommandProxyClientMessage{ + Type: "delete-publisher", + ClientId: p.proxyId, + }, + } + + if _, err := p.conn.performSyncRequest(ctx, msg); err != nil { + log.Printf("Could not delete publisher %s at %s: %s", p.proxyId, p.conn.url, err) + return + } +} + +func (p *mcuProxyPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { + msg := &ProxyClientMessage{ + Type: "payload", + Payload: &PayloadProxyClientMessage{ + Type: data.Type, + ClientId: p.proxyId, + Payload: data.Payload, + }, + } + + p.doSendMessage(ctx, msg, callback) +} + +func (p *mcuProxyPublisher) ProcessPayload(msg *PayloadProxyServerMessage) { + p.doProcessPayload(p, msg) +} + +func (p *mcuProxyPublisher) ProcessEvent(msg *EventProxyServerMessage) { + switch msg.Type { + case "ice-completed": + p.listener.OnIceCompleted(p) + case "publisher-closed": + p.NotifyClosed() + default: + log.Printf("Unsupported event from %s: %+v", p.conn.url, msg) + } +} + +type mcuProxySubscriber struct { + mcuProxyPubSubCommon + + publisherId string +} + +func newMcuProxySubscriber(publisherId string, streamType string, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber { + return &mcuProxySubscriber{ + mcuProxyPubSubCommon: mcuProxyPubSubCommon{ + streamType: streamType, + proxyId: proxyId, + conn: conn, + listener: listener, + }, + + publisherId: publisherId, + } +} + +func (s *mcuProxySubscriber) Publisher() string { + return s.publisherId +} + +func (s *mcuProxySubscriber) NotifyClosed() { + s.listener.SubscriberClosed(s) + s.conn.removeSubscriber(s) +} + +func (s *mcuProxySubscriber) Close(ctx context.Context) { + s.NotifyClosed() + + msg := &ProxyClientMessage{ + Type: "command", + Command: &CommandProxyClientMessage{ + Type: "delete-subscriber", + ClientId: s.proxyId, + }, + } + + if _, err := s.conn.performSyncRequest(ctx, msg); err != nil { + log.Printf("Could not delete subscriber %s at %s: %s", s.proxyId, s.conn.url, err) + return + } +} + +func (s *mcuProxySubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) { + msg := &ProxyClientMessage{ + Type: "payload", + Payload: &PayloadProxyClientMessage{ + Type: data.Type, + ClientId: s.proxyId, + Payload: data.Payload, + }, + } + + s.doSendMessage(ctx, msg, callback) +} + +func (s *mcuProxySubscriber) ProcessPayload(msg *PayloadProxyServerMessage) { + s.doProcessPayload(s, msg) +} + +func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) { + switch msg.Type { + case "ice-completed": + s.listener.OnIceCompleted(s) + case "subscriber-closed": + s.NotifyClosed() + default: + log.Printf("Unsupported event from %s: %+v", s.conn.url, msg) + } +} + +type mcuProxyConnection struct { + proxy *mcuProxy + url *url.URL + + mu sync.Mutex + closeChan chan bool + closedChan chan bool + closed uint32 + conn *websocket.Conn + + connectedSince time.Time + reconnectInterval int64 + reconnectTimer *time.Timer + + msgId int64 + helloMsgId string + sessionId string + load int64 + + callbacks map[string]func(*ProxyServerMessage) + + publishersLock sync.RWMutex + publishers map[string]*mcuProxyPublisher + publisherIds map[string]string + + subscribersLock sync.RWMutex + subscribers map[string]*mcuProxySubscriber +} + +func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection, error) { + parsed, err := url.Parse(baseUrl) + if err != nil { + return nil, err + } + + conn := &mcuProxyConnection{ + proxy: proxy, + url: parsed, + closeChan: make(chan bool, 1), + closedChan: make(chan bool, 1), + reconnectInterval: int64(initialReconnectInterval), + load: loadNotConnected, + callbacks: make(map[string]func(*ProxyServerMessage)), + publishers: make(map[string]*mcuProxyPublisher), + publisherIds: make(map[string]string), + subscribers: make(map[string]*mcuProxySubscriber), + } + return conn, nil +} + +type mcuProxyConnectionStats struct { + Url string `json:"url"` + Connected bool `json:"connected"` + Publishers int64 `json:"publishers"` + Clients int64 `json:"clients"` + Uptime *time.Time `json:"uptime,omitempty"` +} + +func (c *mcuProxyConnection) GetStats() *mcuProxyConnectionStats { + result := &mcuProxyConnectionStats{ + Url: c.url.String(), + } + c.mu.Lock() + if c.conn != nil { + result.Connected = true + result.Uptime = &c.connectedSince + } + c.mu.Unlock() + c.publishersLock.RLock() + result.Publishers = int64(len(c.publishers)) + c.publishersLock.RUnlock() + c.subscribersLock.RLock() + result.Clients = int64(len(c.subscribers)) + c.subscribersLock.RUnlock() + result.Clients += result.Publishers + return result +} + +func (c *mcuProxyConnection) Load() int64 { + return atomic.LoadInt64(&c.load) +} + +func (c *mcuProxyConnection) readPump() { + defer func() { + if atomic.LoadUint32(&c.closed) == 0 { + c.scheduleReconnect() + } else { + c.closedChan <- true + } + }() + defer c.close() + defer atomic.StoreInt64(&c.load, loadNotConnected) + + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + for { + _, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived) { + log.Printf("Error reading from %s: %v", c.url, err) + } + break + } + + var msg ProxyServerMessage + if err := json.Unmarshal(message, &msg); err != nil { + log.Printf("Error unmarshaling %s from %s: %s", string(message), c.url, err) + continue + } + + c.processMessage(&msg) + } +} + +func (c *mcuProxyConnection) writePump() { + c.reconnectTimer = time.NewTimer(0) + for { + select { + case <-c.reconnectTimer.C: + c.reconnect() + case <-c.closeChan: + return + } + } +} + +func (c *mcuProxyConnection) start() error { + go c.writePump() + return nil +} + +func (c *mcuProxyConnection) sendClose() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn == nil { + return ErrNotConnected + } + + return c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) +} + +func (c *mcuProxyConnection) stop(ctx context.Context) { + if !atomic.CompareAndSwapUint32(&c.closed, 0, 1) { + return + } + + c.closeChan <- true + if err := c.sendClose(); err != nil { + if err != ErrNotConnected { + log.Printf("Could not send close message to %s: %s", c.url, err) + } + c.close() + return + } + + select { + case <-c.closedChan: + case <-ctx.Done(): + if err := ctx.Err(); err != nil { + log.Printf("Error waiting for connection to %s get closed: %s", c.url, err) + c.close() + } + } +} + +func (c *mcuProxyConnection) close() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + c.conn.Close() + c.conn = nil + } +} + +func (c *mcuProxyConnection) scheduleReconnect() { + if err := c.sendClose(); err != nil && err != ErrNotConnected { + log.Printf("Could not send close message to %s: %s", c.url, err) + c.close() + } + + interval := atomic.LoadInt64(&c.reconnectInterval) + c.reconnectTimer.Reset(time.Duration(interval)) + + interval = interval * 2 + if interval > int64(maxReconnectInterval) { + interval = int64(maxReconnectInterval) + } + atomic.StoreInt64(&c.reconnectInterval, interval) +} + +func (c *mcuProxyConnection) reconnect() { + u, err := c.url.Parse("proxy") + if err != nil { + log.Printf("Could not resolve url to proxy at %s: %s", c.url, err) + c.scheduleReconnect() + return + } + if u.Scheme == "http" { + u.Scheme = "ws" + } else if u.Scheme == "https" { + u.Scheme = "wss" + } + + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Printf("Could not connect to %s: %s", u, err) + c.scheduleReconnect() + return + } + + log.Printf("Connected to %s", u) + atomic.StoreUint32(&c.closed, 0) + + c.mu.Lock() + c.connectedSince = time.Now() + c.conn = conn + c.mu.Unlock() + + atomic.StoreInt64(&c.reconnectInterval, int64(initialReconnectInterval)) + if err := c.sendHello(); err != nil { + log.Printf("Could not send hello request to %s: %s", c.url, err) + c.scheduleReconnect() + return + } + + go c.readPump() +} + +func (c *mcuProxyConnection) removePublisher(publisher *mcuProxyPublisher) { + c.proxy.removePublisher(publisher) + + c.publishersLock.Lock() + defer c.publishersLock.Unlock() + + delete(c.publishers, publisher.proxyId) + delete(c.publisherIds, publisher.id+"|"+publisher.StreamType()) +} + +func (c *mcuProxyConnection) clearPublishers() { + c.publishersLock.Lock() + defer c.publishersLock.Unlock() + + go func(publishers map[string]*mcuProxyPublisher) { + for _, publisher := range publishers { + publisher.NotifyClosed() + } + }(c.publishers) + c.publishers = make(map[string]*mcuProxyPublisher) + c.publisherIds = make(map[string]string) +} + +func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) { + c.subscribersLock.Lock() + defer c.subscribersLock.Unlock() + + delete(c.subscribers, subscriber.proxyId) +} + +func (c *mcuProxyConnection) clearSubscribers() { + c.subscribersLock.Lock() + defer c.subscribersLock.Unlock() + + go func(subscribers map[string]*mcuProxySubscriber) { + for _, subscriber := range subscribers { + subscriber.NotifyClosed() + } + }(c.subscribers) + c.subscribers = make(map[string]*mcuProxySubscriber) +} + +func (c *mcuProxyConnection) clearCallbacks() { + c.mu.Lock() + defer c.mu.Unlock() + + c.callbacks = make(map[string]func(*ProxyServerMessage)) +} + +func (c *mcuProxyConnection) getCallback(id string) func(*ProxyServerMessage) { + c.mu.Lock() + defer c.mu.Unlock() + + callback, found := c.callbacks[id] + if found { + delete(c.callbacks, id) + } + return callback +} + +func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) { + if c.helloMsgId != "" && msg.Id == c.helloMsgId { + c.helloMsgId = "" + switch msg.Type { + case "error": + if msg.Error.Code == "no_such_session" { + log.Printf("Session %s could not be resumed on %s, registering new", c.sessionId, c.url) + c.clearPublishers() + c.clearSubscribers() + c.clearCallbacks() + c.sessionId = "" + if err := c.sendHello(); err != nil { + log.Printf("Could not send hello request to %s: %s", c.url, err) + c.scheduleReconnect() + } + return + } + + log.Printf("Hello connection to %s failed with %+v, reconnecting", c.url, msg.Error) + c.scheduleReconnect() + case "hello": + c.sessionId = msg.Hello.SessionId + log.Printf("Received session %s from %s", c.sessionId, c.url) + default: + log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c.url) + c.scheduleReconnect() + } + return + } + + if proxyDebugMessages { + log.Printf("Received from %s: %+v", c.url, msg) + } + callback := c.getCallback(msg.Id) + if callback != nil { + callback(msg) + return + } + + switch msg.Type { + case "payload": + c.processPayload(msg) + case "event": + c.processEvent(msg) + default: + log.Printf("Unsupported message received from %s: %+v", c.url, msg) + } +} + +func (c *mcuProxyConnection) processPayload(msg *ProxyServerMessage) { + payload := msg.Payload + c.publishersLock.RLock() + publisher, found := c.publishers[payload.ClientId] + c.publishersLock.RUnlock() + if found { + publisher.ProcessPayload(payload) + return + } + + c.subscribersLock.RLock() + subscriber, found := c.subscribers[payload.ClientId] + c.subscribersLock.RUnlock() + if found { + subscriber.ProcessPayload(payload) + return + } + + log.Printf("Received payload for unknown client %+v from %s", payload, c.url) +} + +func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) { + event := msg.Event + if event.Type == "backend-disconnected" { + log.Printf("Upstream backend at %s got disconnected, reset MCU objects", c.url) + c.clearPublishers() + c.clearSubscribers() + c.clearCallbacks() + // TODO: Should we also reconnect? + return + } else if event.Type == "backend-connected" { + log.Printf("Upstream backend at %s is connected", c.url) + return + } else if event.Type == "update-load" { + if proxyDebugMessages { + log.Printf("Load of %s now at %d", c.url, event.Load) + } + atomic.StoreInt64(&c.load, event.Load) + return + } + + if proxyDebugMessages { + log.Printf("Process event from %s: %+v", c.url, event) + } + c.publishersLock.RLock() + publisher, found := c.publishers[event.ClientId] + c.publishersLock.RUnlock() + if found { + publisher.ProcessEvent(event) + return + } + + c.subscribersLock.RLock() + subscriber, found := c.subscribers[event.ClientId] + c.subscribersLock.RUnlock() + if found { + subscriber.ProcessEvent(event) + return + } + + log.Printf("Received event for unknown client %+v from %s", event, c.url) +} + +func (c *mcuProxyConnection) sendHello() error { + c.helloMsgId = strconv.FormatInt(atomic.AddInt64(&c.msgId, 1), 10) + msg := &ProxyClientMessage{ + Id: c.helloMsgId, + Type: "hello", + Hello: &HelloProxyClientMessage{ + Version: "1.0", + }, + } + if c.sessionId != "" { + msg.Hello.ResumeId = c.sessionId + } else { + claims := &TokenClaims{ + jwt.StandardClaims{ + IssuedAt: time.Now().Unix(), + Issuer: c.proxy.tokenId, + }, + } + token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims) + tokenString, err := token.SignedString(c.proxy.tokenKey) + if err != nil { + return err + } + + msg.Hello.Token = tokenString + } + return c.sendMessage(msg) +} + +func (c *mcuProxyConnection) sendMessage(msg *ProxyClientMessage) error { + c.mu.Lock() + defer c.mu.Unlock() + + return c.sendMessageLocked(msg) +} + +func (c *mcuProxyConnection) sendMessageLocked(msg *ProxyClientMessage) error { + if proxyDebugMessages { + log.Printf("Send message to %s: %+v", c.url, msg) + } + if c.conn == nil { + return ErrNotConnected + } + return c.conn.WriteJSON(msg) +} + +func (c *mcuProxyConnection) performAsyncRequest(ctx context.Context, msg *ProxyClientMessage, callback func(err error, response *ProxyServerMessage)) { + msgId := strconv.FormatInt(atomic.AddInt64(&c.msgId, 1), 10) + msg.Id = msgId + + c.mu.Lock() + defer c.mu.Unlock() + c.callbacks[msgId] = func(msg *ProxyServerMessage) { + callback(nil, msg) + } + if err := c.sendMessageLocked(msg); err != nil { + delete(c.callbacks, msgId) + go callback(err, nil) + return + } +} + +func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyClientMessage) (*ProxyServerMessage, error) { + errChan := make(chan error, 1) + responseChan := make(chan *ProxyServerMessage, 1) + c.performAsyncRequest(ctx, msg, func(err error, response *ProxyServerMessage) { + if err != nil { + errChan <- err + } else { + responseChan <- response + } + }) + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-errChan: + return nil, err + case response := <-responseChan: + return response, nil + } +} + +func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) { + msg := &ProxyClientMessage{ + Type: "command", + Command: &CommandProxyClientMessage{ + Type: "create-publisher", + StreamType: streamType, + }, + } + + response, err := c.performSyncRequest(ctx, msg) + if err != nil { + // TODO: Cancel request + return nil, err + } + + proxyId := response.Command.Id + log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c.url, id) + publisher := newMcuProxyPublisher(id, streamType, proxyId, c, listener) + c.publishersLock.Lock() + c.publishers[proxyId] = publisher + c.publisherIds[id+"|"+streamType] = proxyId + c.publishersLock.Unlock() + return publisher, nil +} + +func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) { + c.publishersLock.Lock() + id, found := c.publisherIds[publisher+"|"+streamType] + c.publishersLock.Unlock() + if !found { + return nil, fmt.Errorf("Unknown publisher %s", publisher) + } + + msg := &ProxyClientMessage{ + Type: "command", + Command: &CommandProxyClientMessage{ + Type: "create-subscriber", + StreamType: streamType, + PublisherId: id, + }, + } + + response, err := c.performSyncRequest(ctx, msg) + if err != nil { + // TODO: Cancel request + return nil, err + } + + proxyId := response.Command.Id + log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c.url, publisher) + subscriber := newMcuProxySubscriber(publisher, streamType, proxyId, c, listener) + c.subscribersLock.Lock() + c.subscribers[proxyId] = subscriber + c.subscribersLock.Unlock() + return subscriber, nil +} + +type mcuProxy struct { + tokenId string + tokenKey *rsa.PrivateKey + + connections atomic.Value + connRequests int64 + nextSort int64 + + mu sync.RWMutex + publishers map[string]*mcuProxyConnection + + publisherWaitersId uint64 + publisherWaiters map[uint64]chan bool +} + +func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) { + var connections []*mcuProxyConnection + + tokenId, _ := config.GetString("mcu", "token_id") + if tokenId == "" { + return nil, fmt.Errorf("No token id configured") + } + tokenKeyFilename, _ := config.GetString("mcu", "token_key") + if tokenKeyFilename == "" { + return nil, fmt.Errorf("No token key configured") + } + tokenKeyData, err := ioutil.ReadFile(tokenKeyFilename) + if err != nil { + return nil, fmt.Errorf("Could not read private key from %s: %s", tokenKeyFilename, err) + } + tokenKey, err := jwt.ParseRSAPrivateKeyFromPEM(tokenKeyData) + if err != nil { + return nil, fmt.Errorf("Could not parse private key from %s: %s", tokenKeyFilename, err) + } + + mcu := &mcuProxy{ + tokenId: tokenId, + tokenKey: tokenKey, + + publishers: make(map[string]*mcuProxyConnection), + + publisherWaiters: make(map[uint64]chan bool), + } + + for _, u := range strings.Split(baseUrl, " ") { + conn, err := newMcuProxyConnection(mcu, u) + if err != nil { + return nil, err + } + + connections = append(connections, conn) + } + if len(connections) == 0 { + return nil, fmt.Errorf("No MCU proxy connections configured") + } + + mcu.setConnections(connections) + return mcu, nil +} + +func (m *mcuProxy) setConnections(connections []*mcuProxyConnection) { + m.connections.Store(connections) +} + +func (m *mcuProxy) getConnections() []*mcuProxyConnection { + return m.connections.Load().([]*mcuProxyConnection) +} + +func (m *mcuProxy) Start() error { + for _, c := range m.getConnections() { + if err := c.start(); err != nil { + return err + } + } + return nil +} + +func (m *mcuProxy) Stop() { + for _, c := range m.getConnections() { + ctx, cancel := context.WithTimeout(context.Background(), closeTimeout) + defer cancel() + c.stop(ctx) + } +} + +func (m *mcuProxy) SetOnConnected(f func()) { + // Not supported. +} + +func (m *mcuProxy) SetOnDisconnected(f func()) { + // Not supported. +} + +type mcuProxyStats struct { + Publishers int64 `json:"publishers"` + Clients int64 `json:"clients"` + Details map[string]*mcuProxyConnectionStats `json:"details"` +} + +func (m *mcuProxy) GetStats() interface{} { + details := make(map[string]*mcuProxyConnectionStats) + result := &mcuProxyStats{ + Details: details, + } + for _, conn := range m.getConnections() { + stats := conn.GetStats() + result.Publishers += stats.Publishers + result.Clients += stats.Clients + details[stats.Url] = stats + } + return result +} + +type mcuProxyConnectionsList []*mcuProxyConnection + +func (l mcuProxyConnectionsList) Len() int { + return len(l) +} + +func (l mcuProxyConnectionsList) Less(i, j int) bool { + return l[i].Load() < l[j].Load() +} + +func (l mcuProxyConnectionsList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} + +func (l mcuProxyConnectionsList) Sort() { + sort.Sort(l) +} + +func (m *mcuProxy) getSortedConnections() []*mcuProxyConnection { + connections := m.getConnections() + if len(connections) < 2 { + return connections + } + + // Connections are re-sorted every requests or + // every . + now := time.Now().UnixNano() + if atomic.AddInt64(&m.connRequests, 1)%connectionSortRequests == 0 || atomic.LoadInt64(&m.nextSort) <= now { + atomic.StoreInt64(&m.nextSort, now+int64(connectionSortInterval)) + + sorted := make(mcuProxyConnectionsList, len(connections)) + copy(sorted, connections) + + sorted.Sort() + + m.setConnections(sorted) + connections = sorted + } + + return connections +} + +func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.publishers, publisher.id+"|"+publisher.StreamType()) +} + +func (m *mcuProxy) wakeupWaiters() { + m.mu.RLock() + defer m.mu.RUnlock() + for _, ch := range m.publisherWaiters { + ch <- true + } +} + +func (m *mcuProxy) addWaiter(ch chan bool) uint64 { + id := m.publisherWaitersId + 1 + m.publisherWaitersId = id + m.publisherWaiters[id] = ch + return id +} + +func (m *mcuProxy) removeWaiter(id uint64) { + delete(m.publisherWaiters, id) +} + +func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) { + connections := m.getSortedConnections() + for _, conn := range connections { + publisher, err := conn.newPublisher(ctx, listener, id, streamType) + if err != nil { + log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn.url, err) + continue + } + + m.mu.Lock() + m.publishers[id+"|"+streamType] = conn + m.mu.Unlock() + m.wakeupWaiters() + return publisher, nil + } + + return nil, fmt.Errorf("No MCU connection available") +} + +func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string, streamType string) *mcuProxyConnection { + m.mu.RLock() + conn := m.publishers[publisher+"|"+streamType] + m.mu.RUnlock() + if conn != nil { + return conn + } + + log.Printf("No %s publisher %s found yet, deferring", streamType, publisher) + m.mu.Lock() + defer m.mu.Unlock() + + conn = m.publishers[publisher+"|"+streamType] + if conn != nil { + return conn + } + + ch := make(chan bool, 1) + id := m.addWaiter(ch) + defer m.removeWaiter(id) + + for { + m.mu.Unlock() + select { + case <-ch: + m.mu.Lock() + conn = m.publishers[publisher+"|"+streamType] + if conn != nil { + return conn + } + case <-ctx.Done(): + m.mu.Lock() + return nil + } + } +} + +func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) { + conn := m.getPublisherConnection(ctx, publisher, streamType) + if conn == nil { + return nil, fmt.Errorf("No %s publisher %s found", streamType, publisher) + } + + return conn.newSubscriber(ctx, listener, publisher, streamType) +}