2020-08-07 10:27:28 +02:00
|
|
|
/**
|
|
|
|
* Standalone signaling server for the Nextcloud Spreed app.
|
|
|
|
* Copyright (C) 2020 struktur AG
|
|
|
|
*
|
|
|
|
* @author Joachim Bauch <bauch@struktur.de>
|
|
|
|
*
|
|
|
|
* @license GNU AGPL version 3 or any later version
|
|
|
|
*
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
|
* (at your option) any later version.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU Affero General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
package signaling
|
|
|
|
|
|
|
|
import (
|
2020-08-31 13:58:28 +02:00
|
|
|
"context"
|
2020-08-07 10:27:28 +02:00
|
|
|
"crypto/rsa"
|
2020-12-16 15:17:44 +01:00
|
|
|
"crypto/tls"
|
2020-08-07 10:27:28 +02:00
|
|
|
"encoding/json"
|
2022-06-21 16:04:40 +02:00
|
|
|
"errors"
|
2020-08-07 10:27:28 +02:00
|
|
|
"fmt"
|
|
|
|
"log"
|
2022-04-04 15:39:49 +02:00
|
|
|
"net"
|
2020-12-11 15:58:13 +01:00
|
|
|
"net/http"
|
2020-08-07 10:27:28 +02:00
|
|
|
"net/url"
|
2022-04-05 12:48:27 +02:00
|
|
|
"os"
|
2020-08-07 10:27:28 +02:00
|
|
|
"sort"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/dlintw/goconf"
|
2022-07-07 17:04:34 +02:00
|
|
|
"github.com/golang-jwt/jwt/v4"
|
2020-08-07 10:27:28 +02:00
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
closeTimeout = time.Second
|
|
|
|
|
|
|
|
proxyDebugMessages = false
|
|
|
|
|
|
|
|
// Very high value so the connections get sorted at the end.
|
|
|
|
loadNotConnected = 1000000
|
|
|
|
|
|
|
|
// Sort connections by load every 10 publishing requests or once per second.
|
|
|
|
connectionSortRequests = 10
|
|
|
|
connectionSortInterval = time.Second
|
2020-08-31 15:23:55 +02:00
|
|
|
|
|
|
|
proxyUrlTypeStatic = "static"
|
|
|
|
proxyUrlTypeEtcd = "etcd"
|
2020-09-09 14:42:18 +02:00
|
|
|
|
|
|
|
initialWaitDelay = time.Second
|
|
|
|
maxWaitDelay = 8 * time.Second
|
2020-09-16 10:13:09 +02:00
|
|
|
|
|
|
|
defaultProxyTimeoutSeconds = 2
|
2021-11-03 14:56:25 +01:00
|
|
|
|
|
|
|
rttLogDuration = 500 * time.Millisecond
|
2020-08-07 10:27:28 +02:00
|
|
|
)
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
type McuProxy interface {
|
|
|
|
AddConnection(ignoreErrors bool, url string, ips ...net.IP) error
|
|
|
|
KeepConnection(url string, ips ...net.IP)
|
|
|
|
RemoveConnection(url string, ips ...net.IP)
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
type mcuProxyPubSubCommon struct {
|
2022-04-13 03:03:36 +02:00
|
|
|
sid string
|
2024-02-27 13:52:59 +01:00
|
|
|
streamType StreamType
|
2024-02-27 14:23:52 +01:00
|
|
|
maxBitrate int
|
2020-08-07 10:27:28 +02:00
|
|
|
proxyId string
|
|
|
|
conn *mcuProxyConnection
|
|
|
|
listener McuListener
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyPubSubCommon) Id() string {
|
|
|
|
return c.proxyId
|
|
|
|
}
|
|
|
|
|
2022-04-13 03:03:36 +02:00
|
|
|
func (c *mcuProxyPubSubCommon) Sid() string {
|
|
|
|
return c.sid
|
|
|
|
}
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
func (c *mcuProxyPubSubCommon) StreamType() StreamType {
|
2020-08-07 10:27:28 +02:00
|
|
|
return c.streamType
|
|
|
|
}
|
|
|
|
|
2024-02-27 14:23:52 +01:00
|
|
|
func (c *mcuProxyPubSubCommon) MaxBitrate() int {
|
|
|
|
return c.maxBitrate
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClientMessage, callback func(error, map[string]interface{})) {
|
|
|
|
c.conn.performAsyncRequest(ctx, msg, func(err error, response *ProxyServerMessage) {
|
|
|
|
if err != nil {
|
|
|
|
callback(err, nil)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if proxyDebugMessages {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Response from %s: %+v", c.conn, response)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
if response.Type == "error" {
|
|
|
|
callback(response.Error, nil)
|
|
|
|
} else if response.Payload != nil {
|
|
|
|
callback(nil, response.Payload.Payload)
|
|
|
|
} else {
|
|
|
|
callback(nil, nil)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadProxyServerMessage) {
|
|
|
|
switch msg.Type {
|
2022-02-10 20:19:25 +01:00
|
|
|
case "offer":
|
|
|
|
c.listener.OnUpdateOffer(client, msg.Payload["offer"].(map[string]interface{}))
|
2020-08-07 10:27:28 +02:00
|
|
|
case "candidate":
|
|
|
|
c.listener.OnIceCandidate(client, msg.Payload["candidate"])
|
|
|
|
default:
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Unsupported payload from %s: %+v", c.conn, msg)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type mcuProxyPublisher struct {
|
|
|
|
mcuProxyPubSubCommon
|
|
|
|
|
2021-11-08 12:06:59 +01:00
|
|
|
id string
|
|
|
|
mediaTypes MediaType
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
2024-02-27 14:23:52 +01:00
|
|
|
func newMcuProxyPublisher(id string, sid string, streamType StreamType, maxBitrate int, mediaTypes MediaType, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxyPublisher {
|
2020-08-07 10:27:28 +02:00
|
|
|
return &mcuProxyPublisher{
|
|
|
|
mcuProxyPubSubCommon: mcuProxyPubSubCommon{
|
2022-04-13 03:03:36 +02:00
|
|
|
sid: sid,
|
2020-08-07 10:27:28 +02:00
|
|
|
streamType: streamType,
|
2024-02-27 14:23:52 +01:00
|
|
|
maxBitrate: maxBitrate,
|
2020-08-07 10:27:28 +02:00
|
|
|
proxyId: proxyId,
|
|
|
|
conn: conn,
|
|
|
|
listener: listener,
|
|
|
|
},
|
2021-11-08 12:06:59 +01:00
|
|
|
id: id,
|
|
|
|
mediaTypes: mediaTypes,
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-08 12:06:59 +01:00
|
|
|
func (p *mcuProxyPublisher) HasMedia(mt MediaType) bool {
|
|
|
|
return (p.mediaTypes & mt) == mt
|
|
|
|
}
|
|
|
|
|
2022-04-05 19:27:37 +02:00
|
|
|
func (p *mcuProxyPublisher) SetMedia(mt MediaType) {
|
|
|
|
// TODO: Also update mediaTypes on proxy.
|
|
|
|
p.mediaTypes = mt
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (p *mcuProxyPublisher) NotifyClosed() {
|
|
|
|
p.listener.PublisherClosed(p)
|
|
|
|
p.conn.removePublisher(p)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *mcuProxyPublisher) Close(ctx context.Context) {
|
|
|
|
p.NotifyClosed()
|
|
|
|
|
|
|
|
msg := &ProxyClientMessage{
|
|
|
|
Type: "command",
|
|
|
|
Command: &CommandProxyClientMessage{
|
|
|
|
Type: "delete-publisher",
|
|
|
|
ClientId: p.proxyId,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2022-08-30 16:02:07 +02:00
|
|
|
if response, err := p.conn.performSyncRequest(ctx, msg); err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not delete publisher %s at %s: %s", p.proxyId, p.conn, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
return
|
2022-08-30 16:02:07 +02:00
|
|
|
} else if response.Type == "error" {
|
|
|
|
log.Printf("Could not delete publisher %s at %s: %s", p.proxyId, p.conn, response.Error)
|
|
|
|
return
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
2020-09-24 12:43:27 +02:00
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Delete publisher %s at %s", p.proxyId, p.conn)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *mcuProxyPublisher) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
|
|
|
|
msg := &ProxyClientMessage{
|
|
|
|
Type: "payload",
|
|
|
|
Payload: &PayloadProxyClientMessage{
|
|
|
|
Type: data.Type,
|
|
|
|
ClientId: p.proxyId,
|
2022-04-13 03:05:50 +02:00
|
|
|
Sid: data.Sid,
|
2020-08-07 10:27:28 +02:00
|
|
|
Payload: data.Payload,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
p.doSendMessage(ctx, msg, callback)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *mcuProxyPublisher) ProcessPayload(msg *PayloadProxyServerMessage) {
|
|
|
|
p.doProcessPayload(p, msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *mcuProxyPublisher) ProcessEvent(msg *EventProxyServerMessage) {
|
|
|
|
switch msg.Type {
|
|
|
|
case "ice-completed":
|
|
|
|
p.listener.OnIceCompleted(p)
|
|
|
|
case "publisher-closed":
|
|
|
|
p.NotifyClosed()
|
|
|
|
default:
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Unsupported event from %s: %+v", p.conn, msg)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type mcuProxySubscriber struct {
|
|
|
|
mcuProxyPubSubCommon
|
|
|
|
|
|
|
|
publisherId string
|
|
|
|
}
|
|
|
|
|
2024-02-27 14:23:52 +01:00
|
|
|
func newMcuProxySubscriber(publisherId string, sid string, streamType StreamType, maxBitrate int, proxyId string, conn *mcuProxyConnection, listener McuListener) *mcuProxySubscriber {
|
2020-08-07 10:27:28 +02:00
|
|
|
return &mcuProxySubscriber{
|
|
|
|
mcuProxyPubSubCommon: mcuProxyPubSubCommon{
|
2022-04-13 03:03:36 +02:00
|
|
|
sid: sid,
|
2020-08-07 10:27:28 +02:00
|
|
|
streamType: streamType,
|
2024-02-27 14:23:52 +01:00
|
|
|
maxBitrate: maxBitrate,
|
2020-08-07 10:27:28 +02:00
|
|
|
proxyId: proxyId,
|
|
|
|
conn: conn,
|
|
|
|
listener: listener,
|
|
|
|
},
|
|
|
|
|
|
|
|
publisherId: publisherId,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mcuProxySubscriber) Publisher() string {
|
|
|
|
return s.publisherId
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mcuProxySubscriber) NotifyClosed() {
|
|
|
|
s.listener.SubscriberClosed(s)
|
|
|
|
s.conn.removeSubscriber(s)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mcuProxySubscriber) Close(ctx context.Context) {
|
|
|
|
s.NotifyClosed()
|
|
|
|
|
|
|
|
msg := &ProxyClientMessage{
|
|
|
|
Type: "command",
|
|
|
|
Command: &CommandProxyClientMessage{
|
|
|
|
Type: "delete-subscriber",
|
|
|
|
ClientId: s.proxyId,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2022-08-30 16:02:07 +02:00
|
|
|
if response, err := s.conn.performSyncRequest(ctx, msg); err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not delete subscriber %s at %s: %s", s.proxyId, s.conn, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
return
|
2022-08-30 16:02:07 +02:00
|
|
|
} else if response.Type == "error" {
|
|
|
|
log.Printf("Could not delete subscriber %s at %s: %s", s.proxyId, s.conn, response.Error)
|
|
|
|
return
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
2020-09-24 12:43:27 +02:00
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Delete subscriber %s at %s", s.proxyId, s.conn)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mcuProxySubscriber) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, map[string]interface{})) {
|
|
|
|
msg := &ProxyClientMessage{
|
|
|
|
Type: "payload",
|
|
|
|
Payload: &PayloadProxyClientMessage{
|
|
|
|
Type: data.Type,
|
|
|
|
ClientId: s.proxyId,
|
2022-04-13 03:05:50 +02:00
|
|
|
Sid: data.Sid,
|
2020-08-07 10:27:28 +02:00
|
|
|
Payload: data.Payload,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
s.doSendMessage(ctx, msg, callback)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mcuProxySubscriber) ProcessPayload(msg *PayloadProxyServerMessage) {
|
|
|
|
s.doProcessPayload(s, msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) {
|
|
|
|
switch msg.Type {
|
|
|
|
case "ice-completed":
|
|
|
|
s.listener.OnIceCompleted(s)
|
2022-04-13 03:03:36 +02:00
|
|
|
case "subscriber-sid-updated":
|
|
|
|
s.sid = msg.Sid
|
|
|
|
s.listener.SubscriberSidUpdated(s)
|
2020-08-07 10:27:28 +02:00
|
|
|
case "subscriber-closed":
|
|
|
|
s.NotifyClosed()
|
|
|
|
default:
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Unsupported event from %s: %+v", s.conn, msg)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type mcuProxyConnection struct {
|
2020-08-31 13:07:03 +02:00
|
|
|
proxy *mcuProxy
|
|
|
|
rawUrl string
|
|
|
|
url *url.URL
|
2022-04-04 15:39:49 +02:00
|
|
|
ip net.IP
|
2020-08-07 10:27:28 +02:00
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
load atomic.Int64
|
2020-08-07 10:27:28 +02:00
|
|
|
mu sync.Mutex
|
2023-01-19 14:51:37 +01:00
|
|
|
closer *Closer
|
|
|
|
closedDone *Closer
|
2023-06-15 13:36:53 +02:00
|
|
|
closed atomic.Bool
|
2020-08-07 10:27:28 +02:00
|
|
|
conn *websocket.Conn
|
|
|
|
|
|
|
|
connectedSince time.Time
|
|
|
|
reconnectTimer *time.Timer
|
2023-06-15 13:36:53 +02:00
|
|
|
reconnectInterval atomic.Int64
|
|
|
|
shutdownScheduled atomic.Bool
|
|
|
|
closeScheduled atomic.Bool
|
|
|
|
trackClose atomic.Bool
|
|
|
|
temporary atomic.Bool
|
2022-06-22 15:34:54 +02:00
|
|
|
|
|
|
|
connectedNotifier SingleNotifier
|
2020-08-07 10:27:28 +02:00
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
msgId atomic.Int64
|
2020-08-07 10:27:28 +02:00
|
|
|
helloMsgId string
|
|
|
|
sessionId string
|
2020-08-12 15:42:11 +02:00
|
|
|
country atomic.Value
|
2020-08-07 10:27:28 +02:00
|
|
|
|
|
|
|
callbacks map[string]func(*ProxyServerMessage)
|
|
|
|
|
|
|
|
publishersLock sync.RWMutex
|
|
|
|
publishers map[string]*mcuProxyPublisher
|
|
|
|
publisherIds map[string]string
|
|
|
|
|
|
|
|
subscribersLock sync.RWMutex
|
|
|
|
subscribers map[string]*mcuProxySubscriber
|
|
|
|
}
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP) (*mcuProxyConnection, error) {
|
2020-08-07 10:27:28 +02:00
|
|
|
parsed, err := url.Parse(baseUrl)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
conn := &mcuProxyConnection{
|
2023-06-15 13:36:53 +02:00
|
|
|
proxy: proxy,
|
|
|
|
rawUrl: baseUrl,
|
|
|
|
url: parsed,
|
|
|
|
ip: ip,
|
|
|
|
closer: NewCloser(),
|
|
|
|
closedDone: NewCloser(),
|
|
|
|
callbacks: make(map[string]func(*ProxyServerMessage)),
|
|
|
|
publishers: make(map[string]*mcuProxyPublisher),
|
|
|
|
publisherIds: make(map[string]string),
|
|
|
|
subscribers: make(map[string]*mcuProxySubscriber),
|
|
|
|
}
|
|
|
|
conn.reconnectInterval.Store(int64(initialReconnectInterval))
|
|
|
|
conn.load.Store(loadNotConnected)
|
2020-08-12 15:42:11 +02:00
|
|
|
conn.country.Store("")
|
2020-08-07 10:27:28 +02:00
|
|
|
return conn, nil
|
|
|
|
}
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
func (c *mcuProxyConnection) String() string {
|
|
|
|
if c.ip != nil {
|
|
|
|
return fmt.Sprintf("%s (%s)", c.rawUrl, c.ip)
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.rawUrl
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
type mcuProxyConnectionStats struct {
|
|
|
|
Url string `json:"url"`
|
2022-04-04 15:39:49 +02:00
|
|
|
IP net.IP `json:"ip,omitempty"`
|
2020-08-07 10:27:28 +02:00
|
|
|
Connected bool `json:"connected"`
|
|
|
|
Publishers int64 `json:"publishers"`
|
|
|
|
Clients int64 `json:"clients"`
|
2020-10-15 13:27:08 +02:00
|
|
|
Load *int64 `json:"load,omitempty"`
|
|
|
|
Shutdown *bool `json:"shutdown,omitempty"`
|
2022-06-22 15:34:54 +02:00
|
|
|
Temporary *bool `json:"temporary,omitempty"`
|
2020-08-07 10:27:28 +02:00
|
|
|
Uptime *time.Time `json:"uptime,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) GetStats() *mcuProxyConnectionStats {
|
|
|
|
result := &mcuProxyConnectionStats{
|
|
|
|
Url: c.url.String(),
|
2022-04-04 15:39:49 +02:00
|
|
|
IP: c.ip,
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
c.mu.Lock()
|
|
|
|
if c.conn != nil {
|
|
|
|
result.Connected = true
|
|
|
|
result.Uptime = &c.connectedSince
|
2020-10-15 13:27:08 +02:00
|
|
|
load := c.Load()
|
|
|
|
result.Load = &load
|
|
|
|
shutdown := c.IsShutdownScheduled()
|
|
|
|
result.Shutdown = &shutdown
|
2022-06-22 15:34:54 +02:00
|
|
|
temporary := c.IsTemporary()
|
|
|
|
result.Temporary = &temporary
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
c.mu.Unlock()
|
|
|
|
c.publishersLock.RLock()
|
|
|
|
result.Publishers = int64(len(c.publishers))
|
|
|
|
c.publishersLock.RUnlock()
|
|
|
|
c.subscribersLock.RLock()
|
|
|
|
result.Clients = int64(len(c.subscribers))
|
|
|
|
c.subscribersLock.RUnlock()
|
|
|
|
result.Clients += result.Publishers
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) Load() int64 {
|
2023-06-15 13:36:53 +02:00
|
|
|
return c.load.Load()
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
2020-08-12 15:42:11 +02:00
|
|
|
func (c *mcuProxyConnection) Country() string {
|
|
|
|
return c.country.Load().(string)
|
|
|
|
}
|
|
|
|
|
2022-06-22 15:34:54 +02:00
|
|
|
func (c *mcuProxyConnection) IsTemporary() bool {
|
2023-06-15 13:36:53 +02:00
|
|
|
return c.temporary.Load()
|
2022-06-22 15:34:54 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) setTemporary() {
|
2023-06-15 13:36:53 +02:00
|
|
|
c.temporary.Store(true)
|
2022-06-22 15:34:54 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) clearTemporary() {
|
2023-06-15 13:36:53 +02:00
|
|
|
c.temporary.Store(false)
|
2022-06-22 15:34:54 +02:00
|
|
|
}
|
|
|
|
|
2020-08-07 15:16:13 +02:00
|
|
|
func (c *mcuProxyConnection) IsShutdownScheduled() bool {
|
2023-06-15 13:36:53 +02:00
|
|
|
return c.shutdownScheduled.Load() || c.closeScheduled.Load()
|
2020-08-07 15:16:13 +02:00
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (c *mcuProxyConnection) readPump() {
|
|
|
|
defer func() {
|
2023-06-15 13:36:53 +02:00
|
|
|
if !c.closed.Load() {
|
2020-08-07 10:27:28 +02:00
|
|
|
c.scheduleReconnect()
|
|
|
|
} else {
|
2023-01-19 14:51:37 +01:00
|
|
|
c.closedDone.Close()
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
defer c.close()
|
2023-06-15 13:36:53 +02:00
|
|
|
defer c.load.Store(loadNotConnected)
|
2020-08-07 10:27:28 +02:00
|
|
|
|
|
|
|
c.mu.Lock()
|
|
|
|
conn := c.conn
|
|
|
|
c.mu.Unlock()
|
|
|
|
|
2020-12-11 15:58:13 +01:00
|
|
|
conn.SetPongHandler(func(msg string) error {
|
|
|
|
now := time.Now()
|
2021-04-26 17:19:39 +02:00
|
|
|
conn.SetReadDeadline(now.Add(pongWait)) // nolint
|
2020-12-11 15:58:13 +01:00
|
|
|
if msg == "" {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if ts, err := strconv.ParseInt(msg, 10, 64); err == nil {
|
|
|
|
rtt := now.Sub(time.Unix(0, ts))
|
2021-11-03 14:56:25 +01:00
|
|
|
if rtt >= rttLogDuration {
|
|
|
|
rtt_ms := rtt.Nanoseconds() / time.Millisecond.Nanoseconds()
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Proxy at %s has RTT of %d ms (%s)", c, rtt_ms, rtt)
|
2021-11-03 14:56:25 +01:00
|
|
|
}
|
2020-12-11 15:58:13 +01:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
for {
|
2021-04-26 17:19:39 +02:00
|
|
|
conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint
|
2020-08-07 10:27:28 +02:00
|
|
|
_, message, err := conn.ReadMessage()
|
|
|
|
if err != nil {
|
2023-12-21 13:23:14 +01:00
|
|
|
if errors.Is(err, websocket.ErrCloseSent) {
|
|
|
|
break
|
|
|
|
} else if _, ok := err.(*websocket.CloseError); !ok || websocket.IsUnexpectedCloseError(err,
|
2020-08-07 10:27:28 +02:00
|
|
|
websocket.CloseNormalClosure,
|
|
|
|
websocket.CloseGoingAway,
|
|
|
|
websocket.CloseNoStatusReceived) {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Error reading from %s: %v", c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
var msg ProxyServerMessage
|
|
|
|
if err := json.Unmarshal(message, &msg); err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Error unmarshaling %s from %s: %s", string(message), c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
c.processMessage(&msg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-11 15:58:13 +01:00
|
|
|
func (c *mcuProxyConnection) sendPing() bool {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
if c.conn == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
msg := strconv.FormatInt(now.UnixNano(), 10)
|
2021-04-26 17:19:39 +02:00
|
|
|
c.conn.SetWriteDeadline(now.Add(writeWait)) // nolint
|
2020-12-11 15:58:13 +01:00
|
|
|
if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not send ping to proxy at %s: %v", c, err)
|
2022-08-30 11:46:45 +02:00
|
|
|
go c.scheduleReconnect()
|
2020-12-11 15:58:13 +01:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (c *mcuProxyConnection) writePump() {
|
2020-12-11 15:58:13 +01:00
|
|
|
ticker := time.NewTicker(pingPeriod)
|
|
|
|
defer func() {
|
|
|
|
ticker.Stop()
|
|
|
|
}()
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
c.reconnectTimer = time.NewTimer(0)
|
2022-06-22 15:34:54 +02:00
|
|
|
defer c.reconnectTimer.Stop()
|
2020-08-07 10:27:28 +02:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-c.reconnectTimer.C:
|
|
|
|
c.reconnect()
|
2020-12-11 15:58:13 +01:00
|
|
|
case <-ticker.C:
|
|
|
|
c.sendPing()
|
2023-01-19 14:51:37 +01:00
|
|
|
case <-c.closer.C:
|
2020-08-07 10:27:28 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
func (c *mcuProxyConnection) start() {
|
2020-08-07 10:27:28 +02:00
|
|
|
go c.writePump()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) sendClose() error {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
if c.conn == nil {
|
|
|
|
return ErrNotConnected
|
|
|
|
}
|
|
|
|
|
2021-04-26 17:19:39 +02:00
|
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
|
2020-08-07 10:27:28 +02:00
|
|
|
return c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) stop(ctx context.Context) {
|
2023-06-15 13:36:53 +02:00
|
|
|
if !c.closed.CompareAndSwap(false, true) {
|
2020-08-07 10:27:28 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-01-19 14:51:37 +01:00
|
|
|
c.closer.Close()
|
2020-08-07 10:27:28 +02:00
|
|
|
if err := c.sendClose(); err != nil {
|
|
|
|
if err != ErrNotConnected {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not send close message to %s: %s", c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
c.close()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
2023-01-19 14:51:37 +01:00
|
|
|
case <-c.closedDone.C:
|
2020-08-07 10:27:28 +02:00
|
|
|
case <-ctx.Done():
|
|
|
|
if err := ctx.Err(); err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Error waiting for connection to %s get closed: %s", c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.close()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) close() {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2022-06-22 15:34:54 +02:00
|
|
|
c.connectedNotifier.Reset()
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
if c.conn != nil {
|
|
|
|
c.conn.Close()
|
|
|
|
c.conn = nil
|
2023-06-15 13:36:53 +02:00
|
|
|
if c.trackClose.CompareAndSwap(true, false) {
|
2021-04-20 17:12:28 +02:00
|
|
|
statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Dec()
|
|
|
|
}
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-31 13:07:03 +02:00
|
|
|
func (c *mcuProxyConnection) stopCloseIfEmpty() {
|
2023-06-15 13:36:53 +02:00
|
|
|
c.closeScheduled.Store(false)
|
2020-08-31 13:07:03 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) closeIfEmpty() bool {
|
2023-06-15 13:36:53 +02:00
|
|
|
c.closeScheduled.Store(true)
|
2020-08-31 13:07:03 +02:00
|
|
|
|
|
|
|
var total int64
|
|
|
|
c.publishersLock.RLock()
|
|
|
|
total += int64(len(c.publishers))
|
|
|
|
c.publishersLock.RUnlock()
|
|
|
|
c.subscribersLock.RLock()
|
|
|
|
total += int64(len(c.subscribers))
|
|
|
|
c.subscribersLock.RUnlock()
|
|
|
|
if total > 0 {
|
|
|
|
// Connection will be closed once all clients have disconnected.
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Connection to %s is still used by %d clients, defer closing", c, total)
|
2020-08-31 13:07:03 +02:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
|
|
|
|
defer cancel()
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("All clients disconnected, closing connection to %s", c)
|
2020-08-31 13:07:03 +02:00
|
|
|
c.stop(ctx)
|
|
|
|
|
|
|
|
c.proxy.removeConnection(c)
|
|
|
|
}()
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (c *mcuProxyConnection) scheduleReconnect() {
|
|
|
|
if err := c.sendClose(); err != nil && err != ErrNotConnected {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not send close message to %s: %s", c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
2021-04-20 17:12:28 +02:00
|
|
|
c.close()
|
2020-08-07 10:27:28 +02:00
|
|
|
|
2022-06-22 15:34:54 +02:00
|
|
|
if c.IsShutdownScheduled() {
|
|
|
|
c.proxy.removeConnection(c)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
interval := c.reconnectInterval.Load()
|
2020-08-07 10:27:28 +02:00
|
|
|
c.reconnectTimer.Reset(time.Duration(interval))
|
|
|
|
|
|
|
|
interval = interval * 2
|
|
|
|
if interval > int64(maxReconnectInterval) {
|
|
|
|
interval = int64(maxReconnectInterval)
|
|
|
|
}
|
2023-06-15 13:36:53 +02:00
|
|
|
c.reconnectInterval.Store(interval)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) reconnect() {
|
|
|
|
u, err := c.url.Parse("proxy")
|
|
|
|
if err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not resolve url to proxy at %s: %s", c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.scheduleReconnect()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if u.Scheme == "http" {
|
|
|
|
u.Scheme = "ws"
|
|
|
|
} else if u.Scheme == "https" {
|
|
|
|
u.Scheme = "wss"
|
|
|
|
}
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
dialer := c.proxy.dialer
|
|
|
|
if c.ip != nil {
|
|
|
|
dialer = &websocket.Dialer{
|
|
|
|
Proxy: http.ProxyFromEnvironment,
|
|
|
|
HandshakeTimeout: c.proxy.dialer.HandshakeTimeout,
|
|
|
|
TLSClientConfig: c.proxy.dialer.TLSClientConfig,
|
|
|
|
|
|
|
|
// Override DNS lookup and connect to custom IP address.
|
|
|
|
NetDialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
|
|
if _, port, err := net.SplitHostPort(addr); err == nil {
|
|
|
|
addr = net.JoinHostPort(c.ip.String(), port)
|
|
|
|
}
|
|
|
|
|
|
|
|
return net.Dial(network, addr)
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
conn, _, err := dialer.Dial(u.String(), nil)
|
2020-08-07 10:27:28 +02:00
|
|
|
if err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not connect to %s: %s", c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.scheduleReconnect()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-06-22 15:34:54 +02:00
|
|
|
if c.IsShutdownScheduled() {
|
|
|
|
c.proxy.removeConnection(c)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Connected to %s", c)
|
2023-06-15 13:36:53 +02:00
|
|
|
c.closed.Store(false)
|
2020-08-07 10:27:28 +02:00
|
|
|
|
|
|
|
c.mu.Lock()
|
|
|
|
c.connectedSince = time.Now()
|
|
|
|
c.conn = conn
|
|
|
|
c.mu.Unlock()
|
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
c.reconnectInterval.Store(int64(initialReconnectInterval))
|
|
|
|
c.shutdownScheduled.Store(false)
|
2020-08-07 10:27:28 +02:00
|
|
|
if err := c.sendHello(); err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not send hello request to %s: %s", c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.scheduleReconnect()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-12-11 15:58:13 +01:00
|
|
|
if !c.sendPing() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
go c.readPump()
|
|
|
|
}
|
|
|
|
|
2022-06-22 15:34:54 +02:00
|
|
|
func (c *mcuProxyConnection) waitUntilConnected(ctx context.Context) error {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
if c.conn != nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
waiter := c.connectedNotifier.NewWaiter()
|
|
|
|
defer c.connectedNotifier.Release(waiter)
|
|
|
|
|
|
|
|
c.mu.Unlock()
|
|
|
|
defer c.mu.Lock()
|
|
|
|
return waiter.Wait(ctx)
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (c *mcuProxyConnection) removePublisher(publisher *mcuProxyPublisher) {
|
|
|
|
c.proxy.removePublisher(publisher)
|
|
|
|
|
|
|
|
c.publishersLock.Lock()
|
|
|
|
defer c.publishersLock.Unlock()
|
|
|
|
|
2021-04-20 17:12:28 +02:00
|
|
|
if _, found := c.publishers[publisher.proxyId]; found {
|
|
|
|
delete(c.publishers, publisher.proxyId)
|
2024-02-27 13:52:59 +01:00
|
|
|
statsPublishersCurrent.WithLabelValues(string(publisher.StreamType())).Dec()
|
2021-04-20 17:12:28 +02:00
|
|
|
}
|
2024-02-27 13:52:59 +01:00
|
|
|
delete(c.publisherIds, getStreamId(publisher.id, publisher.StreamType()))
|
2020-08-31 13:07:03 +02:00
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
if len(c.publishers) == 0 && (c.closeScheduled.Load() || c.IsTemporary()) {
|
2020-08-31 13:07:03 +02:00
|
|
|
go c.closeIfEmpty()
|
|
|
|
}
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) clearPublishers() {
|
|
|
|
c.publishersLock.Lock()
|
|
|
|
defer c.publishersLock.Unlock()
|
|
|
|
|
|
|
|
go func(publishers map[string]*mcuProxyPublisher) {
|
|
|
|
for _, publisher := range publishers {
|
|
|
|
publisher.NotifyClosed()
|
|
|
|
}
|
|
|
|
}(c.publishers)
|
|
|
|
c.publishers = make(map[string]*mcuProxyPublisher)
|
|
|
|
c.publisherIds = make(map[string]string)
|
2020-08-31 13:07:03 +02:00
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
if c.closeScheduled.Load() || c.IsTemporary() {
|
2020-08-31 13:07:03 +02:00
|
|
|
go c.closeIfEmpty()
|
|
|
|
}
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) {
|
|
|
|
c.subscribersLock.Lock()
|
|
|
|
defer c.subscribersLock.Unlock()
|
|
|
|
|
2021-04-20 17:12:28 +02:00
|
|
|
if _, found := c.subscribers[subscriber.proxyId]; found {
|
|
|
|
delete(c.subscribers, subscriber.proxyId)
|
2024-02-27 13:52:59 +01:00
|
|
|
statsSubscribersCurrent.WithLabelValues(string(subscriber.StreamType())).Dec()
|
2021-04-20 17:12:28 +02:00
|
|
|
}
|
2020-08-31 13:07:03 +02:00
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
if len(c.subscribers) == 0 && (c.closeScheduled.Load() || c.IsTemporary()) {
|
2020-08-31 13:07:03 +02:00
|
|
|
go c.closeIfEmpty()
|
|
|
|
}
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) clearSubscribers() {
|
|
|
|
c.subscribersLock.Lock()
|
|
|
|
defer c.subscribersLock.Unlock()
|
|
|
|
|
|
|
|
go func(subscribers map[string]*mcuProxySubscriber) {
|
|
|
|
for _, subscriber := range subscribers {
|
|
|
|
subscriber.NotifyClosed()
|
|
|
|
}
|
|
|
|
}(c.subscribers)
|
|
|
|
c.subscribers = make(map[string]*mcuProxySubscriber)
|
2020-08-31 13:07:03 +02:00
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
if c.closeScheduled.Load() || c.IsTemporary() {
|
2020-08-31 13:07:03 +02:00
|
|
|
go c.closeIfEmpty()
|
|
|
|
}
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) clearCallbacks() {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
c.callbacks = make(map[string]func(*ProxyServerMessage))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) getCallback(id string) func(*ProxyServerMessage) {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
callback, found := c.callbacks[id]
|
|
|
|
if found {
|
|
|
|
delete(c.callbacks, id)
|
|
|
|
}
|
|
|
|
return callback
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) processMessage(msg *ProxyServerMessage) {
|
|
|
|
if c.helloMsgId != "" && msg.Id == c.helloMsgId {
|
|
|
|
c.helloMsgId = ""
|
|
|
|
switch msg.Type {
|
|
|
|
case "error":
|
|
|
|
if msg.Error.Code == "no_such_session" {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Session %s could not be resumed on %s, registering new", c.sessionId, c)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.clearPublishers()
|
|
|
|
c.clearSubscribers()
|
|
|
|
c.clearCallbacks()
|
|
|
|
c.sessionId = ""
|
|
|
|
if err := c.sendHello(); err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not send hello request to %s: %s", c, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.scheduleReconnect()
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Hello connection to %s failed with %+v, reconnecting", c, msg.Error)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.scheduleReconnect()
|
|
|
|
case "hello":
|
2020-09-07 09:12:32 +02:00
|
|
|
resumed := c.sessionId == msg.Hello.SessionId
|
2020-08-07 10:27:28 +02:00
|
|
|
c.sessionId = msg.Hello.SessionId
|
2020-08-12 15:42:11 +02:00
|
|
|
country := ""
|
|
|
|
if msg.Hello.Server != nil {
|
|
|
|
if country = msg.Hello.Server.Country; country != "" && !IsValidCountry(country) {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Proxy %s sent invalid country %s in hello response", c, country)
|
2020-08-12 15:42:11 +02:00
|
|
|
country = ""
|
|
|
|
}
|
|
|
|
}
|
|
|
|
c.country.Store(country)
|
2020-09-07 09:12:32 +02:00
|
|
|
if resumed {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Resumed session %s on %s", c.sessionId, c)
|
2020-09-07 09:12:32 +02:00
|
|
|
} else if country != "" {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Received session %s from %s (in %s)", c.sessionId, c, country)
|
2020-08-12 15:42:11 +02:00
|
|
|
} else {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Received session %s from %s", c.sessionId, c)
|
2020-08-12 15:42:11 +02:00
|
|
|
}
|
2023-06-15 13:36:53 +02:00
|
|
|
if c.trackClose.CompareAndSwap(false, true) {
|
2021-04-20 17:12:28 +02:00
|
|
|
statsConnectedProxyBackendsCurrent.WithLabelValues(c.Country()).Inc()
|
|
|
|
}
|
2022-06-22 15:34:54 +02:00
|
|
|
|
|
|
|
c.connectedNotifier.Notify()
|
2020-08-07 10:27:28 +02:00
|
|
|
default:
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.scheduleReconnect()
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if proxyDebugMessages {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Received from %s: %+v", c, msg)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
callback := c.getCallback(msg.Id)
|
|
|
|
if callback != nil {
|
|
|
|
callback(msg)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
switch msg.Type {
|
|
|
|
case "payload":
|
|
|
|
c.processPayload(msg)
|
|
|
|
case "event":
|
|
|
|
c.processEvent(msg)
|
2020-09-07 08:56:17 +02:00
|
|
|
case "bye":
|
|
|
|
c.processBye(msg)
|
2020-08-07 10:27:28 +02:00
|
|
|
default:
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Unsupported message received from %s: %+v", c, msg)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) processPayload(msg *ProxyServerMessage) {
|
|
|
|
payload := msg.Payload
|
|
|
|
c.publishersLock.RLock()
|
|
|
|
publisher, found := c.publishers[payload.ClientId]
|
|
|
|
c.publishersLock.RUnlock()
|
|
|
|
if found {
|
|
|
|
publisher.ProcessPayload(payload)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
c.subscribersLock.RLock()
|
|
|
|
subscriber, found := c.subscribers[payload.ClientId]
|
|
|
|
c.subscribersLock.RUnlock()
|
|
|
|
if found {
|
|
|
|
subscriber.ProcessPayload(payload)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Received payload for unknown client %+v from %s", payload, c)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) {
|
|
|
|
event := msg.Event
|
2020-08-07 15:16:13 +02:00
|
|
|
switch event.Type {
|
|
|
|
case "backend-disconnected":
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Upstream backend at %s got disconnected, reset MCU objects", c)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.clearPublishers()
|
|
|
|
c.clearSubscribers()
|
|
|
|
c.clearCallbacks()
|
|
|
|
// TODO: Should we also reconnect?
|
|
|
|
return
|
2020-08-07 15:16:13 +02:00
|
|
|
case "backend-connected":
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Upstream backend at %s is connected", c)
|
2020-08-07 10:27:28 +02:00
|
|
|
return
|
2020-08-07 15:16:13 +02:00
|
|
|
case "update-load":
|
2020-08-07 10:27:28 +02:00
|
|
|
if proxyDebugMessages {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Load of %s now at %d", c, event.Load)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
2023-06-15 13:36:53 +02:00
|
|
|
c.load.Store(event.Load)
|
2021-04-20 17:12:28 +02:00
|
|
|
statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load))
|
2020-08-07 10:27:28 +02:00
|
|
|
return
|
2020-08-07 15:16:13 +02:00
|
|
|
case "shutdown-scheduled":
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Proxy %s is scheduled to shutdown", c)
|
2023-06-15 13:36:53 +02:00
|
|
|
c.shutdownScheduled.Store(true)
|
2020-08-07 15:16:13 +02:00
|
|
|
return
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if proxyDebugMessages {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Process event from %s: %+v", c, event)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
c.publishersLock.RLock()
|
|
|
|
publisher, found := c.publishers[event.ClientId]
|
|
|
|
c.publishersLock.RUnlock()
|
|
|
|
if found {
|
|
|
|
publisher.ProcessEvent(event)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
c.subscribersLock.RLock()
|
|
|
|
subscriber, found := c.subscribers[event.ClientId]
|
|
|
|
c.subscribersLock.RUnlock()
|
|
|
|
if found {
|
|
|
|
subscriber.ProcessEvent(event)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Received event for unknown client %+v from %s", event, c)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
2020-09-07 08:56:17 +02:00
|
|
|
func (c *mcuProxyConnection) processBye(msg *ProxyServerMessage) {
|
|
|
|
bye := msg.Bye
|
|
|
|
switch bye.Reason {
|
|
|
|
case "session_resumed":
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Session %s on %s was resumed by other client, resetting", c.sessionId, c)
|
2020-09-07 08:56:17 +02:00
|
|
|
c.sessionId = ""
|
|
|
|
default:
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Received bye with unsupported reason from %s %+v", c, bye)
|
2020-09-07 08:56:17 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (c *mcuProxyConnection) sendHello() error {
|
2023-06-15 13:36:53 +02:00
|
|
|
c.helloMsgId = strconv.FormatInt(c.msgId.Add(1), 10)
|
2020-08-07 10:27:28 +02:00
|
|
|
msg := &ProxyClientMessage{
|
|
|
|
Id: c.helloMsgId,
|
|
|
|
Type: "hello",
|
|
|
|
Hello: &HelloProxyClientMessage{
|
|
|
|
Version: "1.0",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
if c.sessionId != "" {
|
|
|
|
msg.Hello.ResumeId = c.sessionId
|
|
|
|
} else {
|
|
|
|
claims := &TokenClaims{
|
2022-07-07 17:12:21 +02:00
|
|
|
jwt.RegisteredClaims{
|
|
|
|
IssuedAt: jwt.NewNumericDate(time.Now()),
|
2020-08-07 10:27:28 +02:00
|
|
|
Issuer: c.proxy.tokenId,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
|
|
|
|
tokenString, err := token.SignedString(c.proxy.tokenKey)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
msg.Hello.Token = tokenString
|
|
|
|
}
|
|
|
|
return c.sendMessage(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) sendMessage(msg *ProxyClientMessage) error {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
return c.sendMessageLocked(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) sendMessageLocked(msg *ProxyClientMessage) error {
|
|
|
|
if proxyDebugMessages {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Send message to %s: %+v", c, msg)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
if c.conn == nil {
|
|
|
|
return ErrNotConnected
|
|
|
|
}
|
2021-04-26 17:19:39 +02:00
|
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
|
2020-08-07 10:27:28 +02:00
|
|
|
return c.conn.WriteJSON(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) performAsyncRequest(ctx context.Context, msg *ProxyClientMessage, callback func(err error, response *ProxyServerMessage)) {
|
2023-06-15 13:36:53 +02:00
|
|
|
msgId := strconv.FormatInt(c.msgId.Add(1), 10)
|
2020-08-07 10:27:28 +02:00
|
|
|
msg.Id = msgId
|
|
|
|
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
c.callbacks[msgId] = func(msg *ProxyServerMessage) {
|
|
|
|
callback(nil, msg)
|
|
|
|
}
|
|
|
|
if err := c.sendMessageLocked(msg); err != nil {
|
|
|
|
delete(c.callbacks, msgId)
|
|
|
|
go callback(err, nil)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *mcuProxyConnection) performSyncRequest(ctx context.Context, msg *ProxyClientMessage) (*ProxyServerMessage, error) {
|
2020-09-16 09:23:36 +02:00
|
|
|
if err := ctx.Err(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
errChan := make(chan error, 1)
|
|
|
|
responseChan := make(chan *ProxyServerMessage, 1)
|
|
|
|
c.performAsyncRequest(ctx, msg, func(err error, response *ProxyServerMessage) {
|
|
|
|
if err != nil {
|
|
|
|
errChan <- err
|
|
|
|
} else {
|
|
|
|
responseChan <- response
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return nil, ctx.Err()
|
|
|
|
case err := <-errChan:
|
|
|
|
return nil, err
|
|
|
|
case response := <-responseChan:
|
|
|
|
return response, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType) (McuPublisher, error) {
|
2020-08-07 10:27:28 +02:00
|
|
|
msg := &ProxyClientMessage{
|
|
|
|
Type: "command",
|
|
|
|
Command: &CommandProxyClientMessage{
|
|
|
|
Type: "create-publisher",
|
2022-04-13 03:03:36 +02:00
|
|
|
Sid: sid,
|
2020-08-07 10:27:28 +02:00
|
|
|
StreamType: streamType,
|
2021-01-21 14:39:33 +01:00
|
|
|
Bitrate: bitrate,
|
2021-11-08 12:06:59 +01:00
|
|
|
MediaTypes: mediaTypes,
|
2020-08-07 10:27:28 +02:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
response, err := c.performSyncRequest(ctx, msg)
|
|
|
|
if err != nil {
|
|
|
|
// TODO: Cancel request
|
|
|
|
return nil, err
|
2022-08-30 11:49:08 +02:00
|
|
|
} else if response.Type == "error" {
|
|
|
|
return nil, fmt.Errorf("Error creating %s publisher for %s on %s: %+v", streamType, id, c, response.Error)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
proxyId := response.Command.Id
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Created %s publisher %s on %s for %s", streamType, proxyId, c, id)
|
2024-02-27 14:23:52 +01:00
|
|
|
publisher := newMcuProxyPublisher(id, sid, streamType, response.Command.Bitrate, mediaTypes, proxyId, c, listener)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.publishersLock.Lock()
|
|
|
|
c.publishers[proxyId] = publisher
|
2024-02-27 13:52:59 +01:00
|
|
|
c.publisherIds[getStreamId(id, streamType)] = proxyId
|
2020-08-07 10:27:28 +02:00
|
|
|
c.publishersLock.Unlock()
|
2024-02-27 13:52:59 +01:00
|
|
|
statsPublishersCurrent.WithLabelValues(string(streamType)).Inc()
|
|
|
|
statsPublishersTotal.WithLabelValues(string(streamType)).Inc()
|
2020-08-07 10:27:28 +02:00
|
|
|
return publisher, nil
|
|
|
|
}
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuListener, publisherId string, publisherSessionId string, streamType StreamType) (McuSubscriber, error) {
|
2020-08-07 10:27:28 +02:00
|
|
|
msg := &ProxyClientMessage{
|
|
|
|
Type: "command",
|
|
|
|
Command: &CommandProxyClientMessage{
|
|
|
|
Type: "create-subscriber",
|
|
|
|
StreamType: streamType,
|
2022-06-21 16:04:40 +02:00
|
|
|
PublisherId: publisherId,
|
2020-08-07 10:27:28 +02:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
response, err := c.performSyncRequest(ctx, msg)
|
|
|
|
if err != nil {
|
|
|
|
// TODO: Cancel request
|
|
|
|
return nil, err
|
2022-08-30 11:49:08 +02:00
|
|
|
} else if response.Type == "error" {
|
|
|
|
return nil, fmt.Errorf("Error creating %s subscriber for %s on %s: %+v", streamType, publisherSessionId, c, response.Error)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
proxyId := response.Command.Id
|
2022-06-21 16:04:40 +02:00
|
|
|
log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisherSessionId)
|
2024-02-27 14:23:52 +01:00
|
|
|
subscriber := newMcuProxySubscriber(publisherSessionId, response.Command.Sid, streamType, response.Command.Bitrate, proxyId, c, listener)
|
2020-08-07 10:27:28 +02:00
|
|
|
c.subscribersLock.Lock()
|
|
|
|
c.subscribers[proxyId] = subscriber
|
|
|
|
c.subscribersLock.Unlock()
|
2024-02-27 13:52:59 +01:00
|
|
|
statsSubscribersCurrent.WithLabelValues(string(streamType)).Inc()
|
|
|
|
statsSubscribersTotal.WithLabelValues(string(streamType)).Inc()
|
2020-08-07 10:27:28 +02:00
|
|
|
return subscriber, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type mcuProxy struct {
|
2022-04-04 15:39:49 +02:00
|
|
|
urlType string
|
2020-08-07 10:27:28 +02:00
|
|
|
tokenId string
|
|
|
|
tokenKey *rsa.PrivateKey
|
2023-12-01 23:42:59 +01:00
|
|
|
config ProxyConfig
|
2020-08-31 15:23:55 +02:00
|
|
|
|
2020-12-16 15:17:44 +01:00
|
|
|
dialer *websocket.Dialer
|
2020-08-31 13:07:03 +02:00
|
|
|
connections []*mcuProxyConnection
|
2022-04-04 15:39:49 +02:00
|
|
|
connectionsMap map[string][]*mcuProxyConnection
|
2020-08-31 13:07:03 +02:00
|
|
|
connectionsMu sync.RWMutex
|
2020-09-16 10:13:09 +02:00
|
|
|
proxyTimeout time.Duration
|
2023-06-15 13:36:53 +02:00
|
|
|
connRequests atomic.Int64
|
|
|
|
nextSort atomic.Int64
|
2020-08-07 10:27:28 +02:00
|
|
|
|
2021-01-21 14:39:33 +01:00
|
|
|
maxStreamBitrate int
|
|
|
|
maxScreenBitrate int
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
mu sync.RWMutex
|
|
|
|
publishers map[string]*mcuProxyConnection
|
|
|
|
|
2023-01-19 15:35:11 +01:00
|
|
|
publisherWaiters ChannelWaiters
|
2021-08-06 16:00:54 +02:00
|
|
|
|
|
|
|
continentsMap atomic.Value
|
2022-06-21 16:04:40 +02:00
|
|
|
|
|
|
|
rpcClients *GrpcClients
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
2023-12-22 21:49:46 +01:00
|
|
|
func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *GrpcClients, dnsMonitor *DnsMonitor) (Mcu, error) {
|
2020-08-31 15:23:55 +02:00
|
|
|
urlType, _ := config.GetString("mcu", "urltype")
|
2022-04-04 15:39:49 +02:00
|
|
|
if urlType == "" {
|
|
|
|
urlType = proxyUrlTypeStatic
|
|
|
|
}
|
2020-08-31 15:23:55 +02:00
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
tokenId, _ := config.GetString("mcu", "token_id")
|
|
|
|
if tokenId == "" {
|
|
|
|
return nil, fmt.Errorf("No token id configured")
|
|
|
|
}
|
|
|
|
tokenKeyFilename, _ := config.GetString("mcu", "token_key")
|
|
|
|
if tokenKeyFilename == "" {
|
|
|
|
return nil, fmt.Errorf("No token key configured")
|
|
|
|
}
|
2022-04-05 12:48:27 +02:00
|
|
|
tokenKeyData, err := os.ReadFile(tokenKeyFilename)
|
2020-08-07 10:27:28 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("Could not read private key from %s: %s", tokenKeyFilename, err)
|
|
|
|
}
|
|
|
|
tokenKey, err := jwt.ParseRSAPrivateKeyFromPEM(tokenKeyData)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("Could not parse private key from %s: %s", tokenKeyFilename, err)
|
|
|
|
}
|
|
|
|
|
2020-09-16 10:13:09 +02:00
|
|
|
proxyTimeoutSeconds, _ := config.GetInt("mcu", "proxytimeout")
|
|
|
|
if proxyTimeoutSeconds <= 0 {
|
|
|
|
proxyTimeoutSeconds = defaultProxyTimeoutSeconds
|
|
|
|
}
|
|
|
|
proxyTimeout := time.Duration(proxyTimeoutSeconds) * time.Second
|
|
|
|
log.Printf("Using a timeout of %s for proxy requests", proxyTimeout)
|
|
|
|
|
2021-01-21 14:39:33 +01:00
|
|
|
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
|
|
|
|
if maxStreamBitrate <= 0 {
|
|
|
|
maxStreamBitrate = defaultMaxStreamBitrate
|
|
|
|
}
|
|
|
|
maxScreenBitrate, _ := config.GetInt("mcu", "maxscreenbitrate")
|
|
|
|
if maxScreenBitrate <= 0 {
|
|
|
|
maxScreenBitrate = defaultMaxScreenBitrate
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
mcu := &mcuProxy{
|
2022-04-04 15:39:49 +02:00
|
|
|
urlType: urlType,
|
2020-08-07 10:27:28 +02:00
|
|
|
tokenId: tokenId,
|
|
|
|
tokenKey: tokenKey,
|
|
|
|
|
2020-12-16 15:17:44 +01:00
|
|
|
dialer: &websocket.Dialer{
|
|
|
|
Proxy: http.ProxyFromEnvironment,
|
|
|
|
HandshakeTimeout: proxyTimeout,
|
|
|
|
},
|
2022-04-04 15:39:49 +02:00
|
|
|
connectionsMap: make(map[string][]*mcuProxyConnection),
|
2020-09-16 10:13:09 +02:00
|
|
|
proxyTimeout: proxyTimeout,
|
2020-08-31 13:07:03 +02:00
|
|
|
|
2021-01-21 14:39:33 +01:00
|
|
|
maxStreamBitrate: maxStreamBitrate,
|
|
|
|
maxScreenBitrate: maxScreenBitrate,
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
publishers: make(map[string]*mcuProxyConnection),
|
|
|
|
|
2022-06-21 16:04:40 +02:00
|
|
|
rpcClients: rpcClients,
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
2021-08-06 16:00:54 +02:00
|
|
|
if err := mcu.loadContinentsMap(config); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-12-16 15:17:44 +01:00
|
|
|
skipverify, _ := config.GetBool("mcu", "skipverify")
|
|
|
|
if skipverify {
|
|
|
|
log.Println("WARNING: MCU verification is disabled!")
|
|
|
|
mcu.dialer.TLSClientConfig = &tls.Config{
|
|
|
|
InsecureSkipVerify: skipverify,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-31 15:23:55 +02:00
|
|
|
switch urlType {
|
|
|
|
case proxyUrlTypeStatic:
|
2023-12-22 21:49:46 +01:00
|
|
|
mcu.config, err = NewProxyConfigStatic(config, mcu, dnsMonitor)
|
2020-08-31 15:23:55 +02:00
|
|
|
case proxyUrlTypeEtcd:
|
2023-12-01 23:42:59 +01:00
|
|
|
mcu.config, err = NewProxyConfigEtcd(config, etcdClient, mcu)
|
2020-08-31 15:23:55 +02:00
|
|
|
default:
|
2023-12-01 23:42:59 +01:00
|
|
|
err = fmt.Errorf("Unsupported proxy URL type %s", urlType)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
return mcu, nil
|
|
|
|
}
|
|
|
|
|
2021-08-06 16:00:54 +02:00
|
|
|
func (m *mcuProxy) loadContinentsMap(config *goconf.ConfigFile) error {
|
2023-12-07 13:33:54 +01:00
|
|
|
options, err := GetStringOptions(config, "continent-overrides", false)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-08-06 16:00:54 +02:00
|
|
|
if len(options) == 0 {
|
|
|
|
m.setContinentsMap(nil)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
continentsMap := make(map[string][]string)
|
2023-12-07 13:33:54 +01:00
|
|
|
for option, value := range options {
|
2021-08-06 16:00:54 +02:00
|
|
|
option = strings.ToUpper(strings.TrimSpace(option))
|
|
|
|
if !IsValidContinent(option) {
|
|
|
|
log.Printf("Ignore unknown continent %s", option)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
var values []string
|
|
|
|
for _, v := range strings.Split(value, ",") {
|
|
|
|
v = strings.ToUpper(strings.TrimSpace(v))
|
|
|
|
if !IsValidContinent(v) {
|
|
|
|
log.Printf("Ignore unknown continent %s for override %s", v, option)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
values = append(values, v)
|
|
|
|
}
|
|
|
|
if len(values) == 0 {
|
|
|
|
log.Printf("No valid values found for continent override %s, ignoring", option)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
continentsMap[option] = values
|
|
|
|
log.Printf("Mapping users on continent %s to %s", option, values)
|
|
|
|
}
|
|
|
|
|
|
|
|
m.setContinentsMap(continentsMap)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (m *mcuProxy) Start() error {
|
2021-01-21 14:39:33 +01:00
|
|
|
log.Printf("Maximum bandwidth %d bits/sec per publishing stream", m.maxStreamBitrate)
|
|
|
|
log.Printf("Maximum bandwidth %d bits/sec per screensharing stream", m.maxScreenBitrate)
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
return m.config.Start()
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mcuProxy) Stop() {
|
2020-08-31 13:07:03 +02:00
|
|
|
m.connectionsMu.RLock()
|
|
|
|
defer m.connectionsMu.RUnlock()
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
|
|
|
|
defer cancel()
|
2020-08-31 13:07:03 +02:00
|
|
|
for _, c := range m.connections {
|
2020-08-07 10:27:28 +02:00
|
|
|
c.stop(ctx)
|
|
|
|
}
|
2022-04-04 15:39:49 +02:00
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
m.config.Stop()
|
2022-04-04 15:39:49 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
func (m *mcuProxy) AddConnection(ignoreErrors bool, url string, ips ...net.IP) error {
|
2022-04-04 15:39:49 +02:00
|
|
|
m.connectionsMu.Lock()
|
|
|
|
defer m.connectionsMu.Unlock()
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
var conns []*mcuProxyConnection
|
|
|
|
if len(ips) == 0 {
|
|
|
|
conn, err := newMcuProxyConnection(m, url, nil)
|
2022-04-04 15:39:49 +02:00
|
|
|
if err != nil {
|
2023-12-01 23:42:59 +01:00
|
|
|
if ignoreErrors {
|
|
|
|
log.Printf("Could not create proxy connection to %s: %s", url, err)
|
|
|
|
return nil
|
2022-04-04 15:39:49 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
return err
|
2022-04-04 15:39:49 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
conns = append(conns, conn)
|
|
|
|
} else {
|
2022-04-04 15:39:49 +02:00
|
|
|
for _, ip := range ips {
|
2023-12-01 23:42:59 +01:00
|
|
|
conn, err := newMcuProxyConnection(m, url, ip)
|
2022-04-04 15:39:49 +02:00
|
|
|
if err != nil {
|
2023-12-01 23:42:59 +01:00
|
|
|
if ignoreErrors {
|
|
|
|
log.Printf("Could not create proxy connection to %s (%s): %s", url, ip, err)
|
2022-04-04 15:39:49 +02:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
return err
|
2022-04-04 15:39:49 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
conns = append(conns, conn)
|
2022-04-04 15:39:49 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
for _, conn := range conns {
|
|
|
|
log.Printf("Adding new connection to %s", conn)
|
|
|
|
conn.start()
|
2022-04-04 15:39:49 +02:00
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
m.connections = append(m.connections, conn)
|
|
|
|
if existing, found := m.connectionsMap[url]; found {
|
|
|
|
m.connectionsMap[url] = append(existing, conn)
|
|
|
|
} else {
|
|
|
|
m.connectionsMap[url] = []*mcuProxyConnection{conn}
|
2022-04-04 15:39:49 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
m.nextSort.Store(0)
|
2022-04-04 15:39:49 +02:00
|
|
|
return nil
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
func containsIP(ips []net.IP, ip net.IP) bool {
|
|
|
|
for _, i := range ips {
|
|
|
|
if i.Equal(ip) {
|
|
|
|
return true
|
2020-08-31 15:23:55 +02:00
|
|
|
}
|
2021-08-06 16:00:54 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
return false
|
2020-08-31 13:07:03 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
func (m *mcuProxy) iterateConnections(url string, ips []net.IP, f func(conn *mcuProxyConnection)) {
|
|
|
|
m.connectionsMu.Lock()
|
|
|
|
defer m.connectionsMu.Unlock()
|
2020-08-31 15:23:55 +02:00
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
conns, found := m.connectionsMap[url]
|
|
|
|
if !found {
|
2020-08-31 15:23:55 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
var toRemove []*mcuProxyConnection
|
|
|
|
if len(ips) == 0 {
|
|
|
|
toRemove = conns
|
2020-08-31 15:23:55 +02:00
|
|
|
} else {
|
2023-12-01 23:42:59 +01:00
|
|
|
for _, conn := range conns {
|
|
|
|
if containsIP(ips, conn.ip) {
|
|
|
|
toRemove = append(toRemove, conn)
|
|
|
|
}
|
2020-08-31 15:23:55 +02:00
|
|
|
}
|
2023-12-01 23:42:59 +01:00
|
|
|
}
|
2020-08-31 15:23:55 +02:00
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
for _, conn := range toRemove {
|
|
|
|
f(conn)
|
2020-08-31 15:23:55 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
func (m *mcuProxy) RemoveConnection(url string, ips ...net.IP) {
|
|
|
|
m.iterateConnections(url, ips, func(conn *mcuProxyConnection) {
|
|
|
|
log.Printf("Removing connection to %s", conn)
|
|
|
|
conn.closeIfEmpty()
|
|
|
|
})
|
|
|
|
}
|
2020-08-31 15:23:55 +02:00
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
func (m *mcuProxy) KeepConnection(url string, ips ...net.IP) {
|
|
|
|
m.iterateConnections(url, ips, func(conn *mcuProxyConnection) {
|
|
|
|
conn.stopCloseIfEmpty()
|
|
|
|
conn.clearTemporary()
|
|
|
|
})
|
2020-08-31 15:23:55 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
|
|
|
|
if err := m.loadContinentsMap(config); err != nil {
|
|
|
|
log.Printf("Error loading continents map: %s", err)
|
2020-08-31 15:23:55 +02:00
|
|
|
}
|
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
if err := m.config.Reload(config); err != nil {
|
|
|
|
log.Printf("could not reload proxy configuration: %s", err)
|
2020-08-31 15:23:55 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-31 13:07:03 +02:00
|
|
|
func (m *mcuProxy) removeConnection(c *mcuProxyConnection) {
|
|
|
|
m.connectionsMu.Lock()
|
|
|
|
defer m.connectionsMu.Unlock()
|
|
|
|
|
2022-04-04 15:39:49 +02:00
|
|
|
if conns, found := m.connectionsMap[c.rawUrl]; found {
|
|
|
|
for idx, conn := range conns {
|
|
|
|
if conn == c {
|
|
|
|
conns = append(conns[:idx], conns[idx+1:]...)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(conns) == 0 {
|
|
|
|
delete(m.connectionsMap, c.rawUrl)
|
|
|
|
m.connections = nil
|
|
|
|
for _, conns := range m.connectionsMap {
|
|
|
|
m.connections = append(m.connections, conns...)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
m.connectionsMap[c.rawUrl] = conns
|
2020-08-31 13:07:03 +02:00
|
|
|
}
|
|
|
|
|
2023-06-15 13:36:53 +02:00
|
|
|
m.nextSort.Store(0)
|
2020-08-31 13:07:03 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
func (m *mcuProxy) SetOnConnected(f func()) {
|
|
|
|
// Not supported.
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mcuProxy) SetOnDisconnected(f func()) {
|
|
|
|
// Not supported.
|
|
|
|
}
|
|
|
|
|
|
|
|
type mcuProxyStats struct {
|
2022-04-04 15:39:49 +02:00
|
|
|
Publishers int64 `json:"publishers"`
|
|
|
|
Clients int64 `json:"clients"`
|
|
|
|
Details []*mcuProxyConnectionStats `json:"details"`
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mcuProxy) GetStats() interface{} {
|
2022-04-04 15:39:49 +02:00
|
|
|
result := &mcuProxyStats{}
|
2020-08-31 13:07:03 +02:00
|
|
|
|
|
|
|
m.connectionsMu.RLock()
|
|
|
|
defer m.connectionsMu.RUnlock()
|
|
|
|
|
|
|
|
for _, conn := range m.connections {
|
2020-08-07 10:27:28 +02:00
|
|
|
stats := conn.GetStats()
|
|
|
|
result.Publishers += stats.Publishers
|
|
|
|
result.Clients += stats.Clients
|
2022-04-04 15:39:49 +02:00
|
|
|
result.Details = append(result.Details, stats)
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
2021-08-06 16:00:54 +02:00
|
|
|
func (m *mcuProxy) getContinentsMap() map[string][]string {
|
|
|
|
continentsMap := m.continentsMap.Load()
|
|
|
|
if continentsMap == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return continentsMap.(map[string][]string)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mcuProxy) setContinentsMap(continentsMap map[string][]string) {
|
|
|
|
if continentsMap == nil {
|
|
|
|
continentsMap = make(map[string][]string)
|
|
|
|
}
|
|
|
|
m.continentsMap.Store(continentsMap)
|
|
|
|
}
|
|
|
|
|
2020-08-07 10:27:28 +02:00
|
|
|
type mcuProxyConnectionsList []*mcuProxyConnection
|
|
|
|
|
|
|
|
func (l mcuProxyConnectionsList) Len() int {
|
|
|
|
return len(l)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l mcuProxyConnectionsList) Less(i, j int) bool {
|
|
|
|
return l[i].Load() < l[j].Load()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l mcuProxyConnectionsList) Swap(i, j int) {
|
|
|
|
l[i], l[j] = l[j], l[i]
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l mcuProxyConnectionsList) Sort() {
|
|
|
|
sort.Sort(l)
|
|
|
|
}
|
|
|
|
|
2020-08-12 15:42:11 +02:00
|
|
|
func ContinentsOverlap(a, b []string) bool {
|
|
|
|
if len(a) == 0 || len(b) == 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, checkA := range a {
|
|
|
|
for _, checkB := range b {
|
|
|
|
if checkA == checkB {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2021-08-06 16:00:54 +02:00
|
|
|
func sortConnectionsForCountry(connections []*mcuProxyConnection, country string, continentMap map[string][]string) []*mcuProxyConnection {
|
2020-08-12 15:42:11 +02:00
|
|
|
// Move connections in the same country to the start of the list.
|
|
|
|
sorted := make(mcuProxyConnectionsList, 0, len(connections))
|
|
|
|
unprocessed := make(mcuProxyConnectionsList, 0, len(connections))
|
|
|
|
for _, conn := range connections {
|
|
|
|
if country == conn.Country() {
|
|
|
|
sorted = append(sorted, conn)
|
|
|
|
} else {
|
|
|
|
unprocessed = append(unprocessed, conn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if continents, found := ContinentMap[country]; found && len(unprocessed) > 1 {
|
|
|
|
remaining := make(mcuProxyConnectionsList, 0, len(unprocessed))
|
2021-08-06 16:00:54 +02:00
|
|
|
// Map continents to other continents (e.g. use Europe for Africa).
|
|
|
|
for _, continent := range continents {
|
|
|
|
if toAdd, found := continentMap[continent]; found {
|
|
|
|
continents = append(continents, toAdd...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next up are connections on the same or mapped continent.
|
2020-08-12 15:42:11 +02:00
|
|
|
for _, conn := range unprocessed {
|
|
|
|
connCountry := conn.Country()
|
|
|
|
if IsValidCountry(connCountry) {
|
|
|
|
connContinents := ContinentMap[connCountry]
|
|
|
|
if ContinentsOverlap(continents, connContinents) {
|
|
|
|
sorted = append(sorted, conn)
|
|
|
|
} else {
|
|
|
|
remaining = append(remaining, conn)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
remaining = append(remaining, conn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
unprocessed = remaining
|
|
|
|
}
|
|
|
|
// Add all other connections by load.
|
|
|
|
sorted = append(sorted, unprocessed...)
|
|
|
|
return sorted
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mcuProxy) getSortedConnections(initiator McuInitiator) []*mcuProxyConnection {
|
2020-08-31 13:07:03 +02:00
|
|
|
m.connectionsMu.RLock()
|
|
|
|
connections := m.connections
|
|
|
|
m.connectionsMu.RUnlock()
|
2020-08-07 10:27:28 +02:00
|
|
|
if len(connections) < 2 {
|
|
|
|
return connections
|
|
|
|
}
|
|
|
|
|
|
|
|
// Connections are re-sorted every <connectionSortRequests> requests or
|
|
|
|
// every <connectionSortInterval>.
|
|
|
|
now := time.Now().UnixNano()
|
2023-06-15 13:36:53 +02:00
|
|
|
if m.connRequests.Add(1)%connectionSortRequests == 0 || m.nextSort.Load() <= now {
|
|
|
|
m.nextSort.Store(now + int64(connectionSortInterval))
|
2020-08-07 10:27:28 +02:00
|
|
|
|
|
|
|
sorted := make(mcuProxyConnectionsList, len(connections))
|
|
|
|
copy(sorted, connections)
|
|
|
|
|
|
|
|
sorted.Sort()
|
|
|
|
|
2020-08-31 13:07:03 +02:00
|
|
|
m.connectionsMu.Lock()
|
|
|
|
m.connections = sorted
|
|
|
|
m.connectionsMu.Unlock()
|
2020-08-07 10:27:28 +02:00
|
|
|
connections = sorted
|
|
|
|
}
|
|
|
|
|
2020-08-12 15:42:11 +02:00
|
|
|
if initiator != nil {
|
|
|
|
if country := initiator.Country(); IsValidCountry(country) {
|
2021-08-06 16:00:54 +02:00
|
|
|
connections = sortConnectionsForCountry(connections, country, m.getContinentsMap())
|
2020-08-12 15:42:11 +02:00
|
|
|
}
|
|
|
|
}
|
2020-08-07 10:27:28 +02:00
|
|
|
return connections
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
delete(m.publishers, getStreamId(publisher.id, publisher.StreamType()))
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
|
2020-08-12 15:42:11 +02:00
|
|
|
connections := m.getSortedConnections(initiator)
|
2020-08-07 10:27:28 +02:00
|
|
|
for _, conn := range connections {
|
2022-06-22 15:34:54 +02:00
|
|
|
if conn.IsShutdownScheduled() || conn.IsTemporary() {
|
2020-08-07 15:16:13 +02:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-09-16 10:13:09 +02:00
|
|
|
subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout)
|
|
|
|
defer cancel()
|
2021-01-21 14:39:33 +01:00
|
|
|
|
|
|
|
var maxBitrate int
|
2024-02-27 13:52:59 +01:00
|
|
|
if streamType == StreamTypeScreen {
|
2021-01-21 14:39:33 +01:00
|
|
|
maxBitrate = m.maxScreenBitrate
|
|
|
|
} else {
|
|
|
|
maxBitrate = m.maxStreamBitrate
|
|
|
|
}
|
|
|
|
if bitrate <= 0 {
|
|
|
|
bitrate = maxBitrate
|
|
|
|
} else {
|
|
|
|
bitrate = min(bitrate, maxBitrate)
|
|
|
|
}
|
2022-04-13 03:03:36 +02:00
|
|
|
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes)
|
2020-08-07 10:27:28 +02:00
|
|
|
if err != nil {
|
2022-04-04 15:39:49 +02:00
|
|
|
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err)
|
2020-08-07 10:27:28 +02:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
m.mu.Lock()
|
2024-02-27 13:52:59 +01:00
|
|
|
m.publishers[getStreamId(id, streamType)] = conn
|
2020-08-07 10:27:28 +02:00
|
|
|
m.mu.Unlock()
|
2023-01-19 15:35:11 +01:00
|
|
|
m.publisherWaiters.Wakeup()
|
2020-08-07 10:27:28 +02:00
|
|
|
return publisher, nil
|
|
|
|
}
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
statsProxyNobackendAvailableTotal.WithLabelValues(string(streamType)).Inc()
|
2020-08-07 10:27:28 +02:00
|
|
|
return nil, fmt.Errorf("No MCU connection available")
|
|
|
|
}
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
func (m *mcuProxy) getPublisherConnection(publisher string, streamType StreamType) *mcuProxyConnection {
|
2020-08-07 10:27:28 +02:00
|
|
|
m.mu.RLock()
|
2022-06-21 16:04:40 +02:00
|
|
|
defer m.mu.RUnlock()
|
2020-08-07 10:27:28 +02:00
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
return m.publishers[getStreamId(publisher, streamType)]
|
2022-06-21 16:04:40 +02:00
|
|
|
}
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
func (m *mcuProxy) waitForPublisherConnection(ctx context.Context, publisher string, streamType StreamType) *mcuProxyConnection {
|
2020-08-07 10:27:28 +02:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
conn := m.publishers[getStreamId(publisher, streamType)]
|
2020-08-07 10:27:28 +02:00
|
|
|
if conn != nil {
|
2021-04-20 17:12:28 +02:00
|
|
|
// Publisher was created while waiting for lock.
|
2020-08-07 10:27:28 +02:00
|
|
|
return conn
|
|
|
|
}
|
|
|
|
|
2023-01-19 15:35:11 +01:00
|
|
|
ch := make(chan struct{}, 1)
|
|
|
|
id := m.publisherWaiters.Add(ch)
|
|
|
|
defer m.publisherWaiters.Remove(id)
|
2020-08-07 10:27:28 +02:00
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
statsWaitingForPublisherTotal.WithLabelValues(string(streamType)).Inc()
|
2020-08-07 10:27:28 +02:00
|
|
|
for {
|
|
|
|
m.mu.Unlock()
|
|
|
|
select {
|
|
|
|
case <-ch:
|
|
|
|
m.mu.Lock()
|
2024-02-27 13:52:59 +01:00
|
|
|
conn = m.publishers[getStreamId(publisher, streamType)]
|
2020-08-07 10:27:28 +02:00
|
|
|
if conn != nil {
|
|
|
|
return conn
|
|
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
|
|
m.mu.Lock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-02-27 13:52:59 +01:00
|
|
|
func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType) (McuSubscriber, error) {
|
2022-06-21 16:04:40 +02:00
|
|
|
if conn := m.getPublisherConnection(publisher, streamType); conn != nil {
|
|
|
|
// Fast common path: publisher is available locally.
|
|
|
|
conn.publishersLock.Lock()
|
2024-02-27 13:52:59 +01:00
|
|
|
id, found := conn.publisherIds[getStreamId(publisher, streamType)]
|
2022-06-21 16:04:40 +02:00
|
|
|
conn.publishersLock.Unlock()
|
|
|
|
if !found {
|
|
|
|
return nil, fmt.Errorf("Unknown publisher %s", publisher)
|
|
|
|
}
|
|
|
|
|
|
|
|
return conn.newSubscriber(ctx, listener, id, publisher, streamType)
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("No %s publisher %s found yet, deferring", streamType, publisher)
|
|
|
|
ch := make(chan McuSubscriber)
|
|
|
|
getctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
// Wait for publisher to be created locally.
|
|
|
|
go func() {
|
|
|
|
if conn := m.waitForPublisherConnection(getctx, publisher, streamType); conn != nil {
|
|
|
|
cancel() // Cancel pending RPC calls.
|
|
|
|
|
|
|
|
conn.publishersLock.Lock()
|
2024-02-27 13:52:59 +01:00
|
|
|
id, found := conn.publisherIds[getStreamId(publisher, streamType)]
|
2022-06-21 16:04:40 +02:00
|
|
|
conn.publishersLock.Unlock()
|
|
|
|
if !found {
|
|
|
|
log.Printf("Unknown id for local %s publisher %s", streamType, publisher)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
subscriber, err := conn.newSubscriber(ctx, listener, id, publisher, streamType)
|
|
|
|
if subscriber != nil {
|
|
|
|
ch <- subscriber
|
|
|
|
} else if err != nil {
|
|
|
|
log.Printf("Error creating local subscriber for %s publisher %s: %s", streamType, publisher, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Wait for publisher to be created on one of the other servers in the cluster.
|
|
|
|
if clients := m.rpcClients.GetClients(); len(clients) > 0 {
|
|
|
|
for _, client := range clients {
|
|
|
|
go func(client *GrpcClient) {
|
|
|
|
id, url, ip, err := client.GetPublisherId(getctx, publisher, streamType)
|
|
|
|
if errors.Is(err, context.Canceled) {
|
|
|
|
return
|
|
|
|
} else if err != nil {
|
|
|
|
log.Printf("Error getting %s publisher id %s from %s: %s", streamType, publisher, client.Target(), err)
|
|
|
|
return
|
|
|
|
} else if id == "" {
|
|
|
|
// Publisher not found on other server
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
cancel() // Cancel pending RPC calls.
|
|
|
|
log.Printf("Found publisher id %s through %s on proxy %s", id, client.Target(), url)
|
|
|
|
|
|
|
|
m.connectionsMu.RLock()
|
|
|
|
connections := m.connections
|
|
|
|
m.connectionsMu.RUnlock()
|
2022-06-22 15:34:54 +02:00
|
|
|
var publisherConn *mcuProxyConnection
|
2022-06-21 16:04:40 +02:00
|
|
|
for _, conn := range connections {
|
|
|
|
if conn.rawUrl != url || !ip.Equal(conn.ip) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Simple case, signaling server has a connection to the same endpoint
|
2022-06-22 15:34:54 +02:00
|
|
|
publisherConn = conn
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
if publisherConn == nil {
|
|
|
|
publisherConn, err = newMcuProxyConnection(m, url, ip)
|
2022-06-21 16:04:40 +02:00
|
|
|
if err != nil {
|
2022-06-22 15:34:54 +02:00
|
|
|
log.Printf("Could not create temporary connection to %s for %s publisher %s: %s", url, streamType, publisher, err)
|
2022-06-21 16:04:40 +02:00
|
|
|
return
|
|
|
|
}
|
2022-06-22 15:34:54 +02:00
|
|
|
|
2023-12-01 23:42:59 +01:00
|
|
|
publisherConn.setTemporary()
|
|
|
|
publisherConn.start()
|
2022-06-22 15:34:54 +02:00
|
|
|
if err := publisherConn.waitUntilConnected(ctx); err != nil {
|
|
|
|
log.Printf("Could not establish new connection to %s: %s", publisherConn, err)
|
|
|
|
publisherConn.closeIfEmpty()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
m.connectionsMu.Lock()
|
|
|
|
m.connections = append(m.connections, publisherConn)
|
|
|
|
conns, found := m.connectionsMap[url]
|
|
|
|
if found {
|
|
|
|
conns = append(conns, publisherConn)
|
|
|
|
} else {
|
|
|
|
conns = []*mcuProxyConnection{publisherConn}
|
|
|
|
}
|
|
|
|
m.connectionsMap[url] = conns
|
|
|
|
m.connectionsMu.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
subscriber, err := publisherConn.newSubscriber(ctx, listener, id, publisher, streamType)
|
|
|
|
if err != nil {
|
|
|
|
if publisherConn.IsTemporary() {
|
|
|
|
publisherConn.closeIfEmpty()
|
|
|
|
}
|
|
|
|
log.Printf("Could not create subscriber for %s publisher %s: %s", streamType, publisher, err)
|
2022-06-21 16:04:40 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-06-22 15:34:54 +02:00
|
|
|
ch <- subscriber
|
2022-06-21 16:04:40 +02:00
|
|
|
}(client)
|
|
|
|
}
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|
|
|
|
|
2022-06-21 16:04:40 +02:00
|
|
|
select {
|
|
|
|
case subscriber := <-ch:
|
|
|
|
return subscriber, nil
|
|
|
|
case <-ctx.Done():
|
|
|
|
return nil, fmt.Errorf("No %s publisher %s found", streamType, publisher)
|
|
|
|
}
|
2020-08-07 10:27:28 +02:00
|
|
|
}
|