From b869b6f24812a4e3ba93443ff1effd6871e4d99b Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 25 Apr 2024 11:14:52 +0200 Subject: [PATCH] Support bandwidth limits when selecting proxy to use. --- api_proxy.go | 29 +++ docker/README.md | 2 + docker/proxy/entrypoint.sh | 5 + mcu_proxy.go | 173 ++++++++++++++--- mcu_proxy_test.go | 369 +++++++++++++++++++++++++++++++++++-- proxy.conf.in | 14 ++ proxy/proxy_server.go | 91 ++++++--- slices_go120.go | 34 ++++ slices_go121.go | 32 ++++ 9 files changed, 677 insertions(+), 72 deletions(-) create mode 100644 slices_go120.go create mode 100644 slices_go121.go diff --git a/api_proxy.go b/api_proxy.go index 255c3c2..70e7637 100644 --- a/api_proxy.go +++ b/api_proxy.go @@ -299,12 +299,41 @@ type PayloadProxyServerMessage struct { // Type "event" +type EventProxyServerBandwidth struct { + // Incoming is the bandwidth utilization for publishers in percent. + Incoming *float64 `json:"incoming,omitempty"` + // Outgoing is the bandwidth utilization for subscribers in percent. + Outgoing *float64 `json:"outgoing,omitempty"` +} + +func (b *EventProxyServerBandwidth) String() string { + if b.Incoming != nil && b.Outgoing != nil { + return fmt.Sprintf("bandwidth: incoming=%.3f%%, outgoing=%.3f%%", *b.Incoming, *b.Outgoing) + } else if b.Incoming != nil { + return fmt.Sprintf("bandwidth: incoming=%.3f%%, outgoing=unlimited", *b.Incoming) + } else if b.Outgoing != nil { + return fmt.Sprintf("bandwidth: incoming=unlimited, outgoing=%.3f%%", *b.Outgoing) + } else { + return "bandwidth: incoming=unlimited, outgoing=unlimited" + } +} + +func (b EventProxyServerBandwidth) AllowIncoming() bool { + return b.Incoming == nil || *b.Incoming < 100 +} + +func (b EventProxyServerBandwidth) AllowOutgoing() bool { + return b.Outgoing == nil || *b.Outgoing < 100 +} + type EventProxyServerMessage struct { Type string `json:"type"` ClientId string `json:"clientId,omitempty"` Load int64 `json:"load,omitempty"` Sid string `json:"sid,omitempty"` + + Bandwidth *EventProxyServerBandwidth `json:"bandwidth,omitempty"` } // Information on a proxy in the etcd cluster. diff --git a/docker/README.md b/docker/README.md index fcbb0af..86c3eaa 100644 --- a/docker/README.md +++ b/docker/README.md @@ -102,6 +102,8 @@ The running container can be configured through different environment variables: - `EXTERNAL_HOSTNAME`: The external hostname for remote streams. Will try to autodetect if omitted. - `TOKEN_ID`: Id of the token to use when connecting remote streams. - `TOKEN_KEY`: Private key for the configured token id. +- `BANDWIDTH_INCOMING`: Optional incoming target bandwidth (in megabits per second). +- `BANDWIDTH_OUTGOING`: Optional outgoing target bandwidth (in megabits per second). - `JANUS_URL`: Url to Janus server. - `MAX_STREAM_BITRATE`: Optional maximum bitrate for audio/video streams. - `MAX_SCREEN_BITRATE`: Optional maximum bitrate for screensharing streams. diff --git a/docker/proxy/entrypoint.sh b/docker/proxy/entrypoint.sh index 6deb7df..d41bb4d 100755 --- a/docker/proxy/entrypoint.sh +++ b/docker/proxy/entrypoint.sh @@ -52,6 +52,11 @@ if [ ! -f "$CONFIG" ]; then fi if [ -n "$TOKEN_KEY" ]; then sed -i "s|#token_key =.*|token_key = $TOKEN_KEY|" "$CONFIG" + if [ -n "$BANDWIDTH_INCOMING" ]; then + sed -i "s|#incoming =.*|incoming = $BANDWIDTH_INCOMING|" "$CONFIG" + fi + if [ -n "$BANDWIDTH_OUTGOING" ]; then + sed -i "s|#outgoing =.*|outgoing = $BANDWIDTH_OUTGOING|" "$CONFIG" fi HAS_ETCD= diff --git a/mcu_proxy.go b/mcu_proxy.go index 0e22151..3d0c01a 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -334,6 +334,7 @@ type mcuProxyConnection struct { ip net.IP load atomic.Int64 + bandwidth atomic.Pointer[EventProxyServerBandwidth] mu sync.Mutex closer *Closer closedDone *Closer @@ -385,6 +386,7 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP) (*mcuProx } conn.reconnectInterval.Store(int64(initialReconnectInterval)) conn.load.Store(loadNotConnected) + conn.bandwidth.Store(nil) conn.country.Store("") return conn, nil } @@ -488,6 +490,10 @@ func (c *mcuProxyConnection) Load() int64 { return c.load.Load() } +func (c *mcuProxyConnection) Bandwidth() *EventProxyServerBandwidth { + return c.bandwidth.Load() +} + func (c *mcuProxyConnection) Country() string { return c.country.Load().(string) } @@ -532,7 +538,10 @@ func (c *mcuProxyConnection) readPump() { } }() defer c.close() - defer c.load.Store(loadNotConnected) + defer func() { + c.load.Store(loadNotConnected) + c.bandwidth.Store(nil) + }() c.mu.Lock() conn := c.conn @@ -996,9 +1005,10 @@ func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) { return case "update-load": if proxyDebugMessages { - log.Printf("Load of %s now at %d", c, event.Load) + log.Printf("Load of %s now at %d (%s)", c, event.Load, event.Bandwidth) } c.load.Store(event.Load) + c.bandwidth.Store(event.Bandwidth) statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load)) return case "shutdown-scheduled": @@ -1730,27 +1740,27 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) { delete(m.publishers, getStreamId(publisher.id, publisher.StreamType())) } -func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { - connections := m.getSortedConnections(initiator) +func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher { + var maxBitrate int + if streamType == StreamTypeScreen { + maxBitrate = m.maxScreenBitrate + } else { + maxBitrate = m.maxStreamBitrate + } + if bitrate <= 0 { + bitrate = maxBitrate + } else { + bitrate = min(bitrate, maxBitrate) + } + for _, conn := range connections { - if conn.IsShutdownScheduled() || conn.IsTemporary() { + if !isAllowed(conn) || conn.IsShutdownScheduled() || conn.IsTemporary() { continue } subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout) defer cancel() - var maxBitrate int - if streamType == StreamTypeScreen { - maxBitrate = m.maxScreenBitrate - } else { - maxBitrate = m.maxStreamBitrate - } - if bitrate <= 0 { - bitrate = maxBitrate - } else { - bitrate = min(bitrate, maxBitrate) - } publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes) if err != nil { log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err) @@ -1761,11 +1771,61 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st m.publishers[getStreamId(id, streamType)] = conn m.mu.Unlock() m.publisherWaiters.Wakeup() - return publisher, nil + return publisher } - statsProxyNobackendAvailableTotal.WithLabelValues(string(streamType)).Inc() - return nil, fmt.Errorf("No MCU connection available") + return nil +} + +func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) { + connections := m.getSortedConnections(initiator) + publisher := m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections, func(c *mcuProxyConnection) bool { + bw := c.Bandwidth() + return bw == nil || bw.AllowIncoming() + }) + if publisher == nil { + // No proxy has available bandwidth, select one with the lowest currently used bandwidth. + connections2 := make([]*mcuProxyConnection, 0, len(connections)) + for _, c := range connections { + if c.Bandwidth() != nil { + connections2 = append(connections2, c) + } + } + SlicesSortFunc(connections2, func(a *mcuProxyConnection, b *mcuProxyConnection) int { + var incoming_a *float64 + if bw := a.Bandwidth(); bw != nil { + incoming_a = bw.Incoming + } + + var incoming_b *float64 + if bw := b.Bandwidth(); bw != nil { + incoming_b = bw.Incoming + } + + if incoming_a == nil && incoming_b == nil { + return 0 + } else if incoming_a == nil && incoming_b != nil { + return -1 + } else if incoming_a != nil && incoming_b == nil { + return -1 + } else if *incoming_a < *incoming_b { + return -1 + } else if *incoming_a > *incoming_b { + return 1 + } + return 0 + }) + publisher = m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections2, func(c *mcuProxyConnection) bool { + return true + }) + } + + if publisher == nil { + statsProxyNobackendAvailableTotal.WithLabelValues(string(streamType)).Inc() + return nil, fmt.Errorf("No MCU connection available") + } + + return publisher, nil } func (m *mcuProxy) getPublisherConnection(publisher string, streamType StreamType) *mcuProxyConnection { @@ -1812,6 +1872,30 @@ type proxyPublisherInfo struct { err error } +func (m *mcuProxy) createSubscriber(ctx context.Context, listener McuListener, id string, publisher string, streamType StreamType, publisherConn *mcuProxyConnection, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuSubscriber { + for _, conn := range connections { + if !isAllowed(conn) || conn.IsShutdownScheduled() || conn.IsTemporary() { + continue + } + + var subscriber McuSubscriber + var err error + if conn == publisherConn { + subscriber, err = conn.newSubscriber(ctx, listener, id, publisher, streamType) + } else { + subscriber, err = conn.newRemoteSubscriber(ctx, listener, id, publisher, streamType, publisherConn) + } + if err != nil { + log.Printf("Could not create subscriber for %s publisher %s on %s: %s", streamType, publisher, conn, err) + continue + } + + return subscriber + } + + return nil +} + func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType, initiator McuInitiator) (McuSubscriber, error) { var publisherInfo *proxyPublisherInfo if conn := m.getPublisherConnection(publisher, streamType); conn != nil { @@ -1948,21 +2032,52 @@ func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publ return nil, publisherInfo.err } - if !publisherInfo.conn.IsSameCountry(initiator) { + bw := publisherInfo.conn.Bandwidth() + allowOutgoing := bw == nil || bw.AllowOutgoing() + if !allowOutgoing || !publisherInfo.conn.IsSameCountry(initiator) { connections := m.getSortedConnections(initiator) - if len(connections) > 0 && !connections[0].IsSameCountry(publisherInfo.conn) { + if !allowOutgoing || len(connections) > 0 && !connections[0].IsSameCountry(publisherInfo.conn) { // Connect to remote publisher through "closer" gateway. - for _, conn := range connections { - if conn.IsShutdownScheduled() || conn.IsTemporary() || conn == publisherInfo.conn { - continue + subscriber := m.createSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn, connections, func(c *mcuProxyConnection) bool { + bw := c.Bandwidth() + return bw == nil || bw.AllowOutgoing() + }) + if subscriber == nil { + connections2 := make([]*mcuProxyConnection, 0, len(connections)) + for _, c := range connections { + if c.Bandwidth() != nil { + connections2 = append(connections2, c) + } } + SlicesSortFunc(connections2, func(a *mcuProxyConnection, b *mcuProxyConnection) int { + var outgoing_a *float64 + if bw := a.Bandwidth(); bw != nil { + outgoing_a = bw.Outgoing + } - subscriber, err := conn.newRemoteSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn) - if err != nil { - log.Printf("Could not create subscriber for %s publisher %s on %s: %s", streamType, publisher, conn, err) - continue - } + var outgoing_b *float64 + if bw := b.Bandwidth(); bw != nil { + outgoing_b = bw.Outgoing + } + if outgoing_a == nil && outgoing_b == nil { + return 0 + } else if outgoing_a == nil && outgoing_b != nil { + return -1 + } else if outgoing_a != nil && outgoing_b == nil { + return -1 + } else if *outgoing_a < *outgoing_b { + return -1 + } else if *outgoing_a > *outgoing_b { + return 1 + } + return 0 + }) + subscriber = m.createSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn, connections2, func(c *mcuProxyConnection) bool { + return true + }) + } + if subscriber != nil { return subscriber, nil } } diff --git a/mcu_proxy_test.go b/mcu_proxy_test.go index b7d0ae4..c197f36 100644 --- a/mcu_proxy_test.go +++ b/mcu_proxy_test.go @@ -445,9 +445,61 @@ type TestProxyServerHandler struct { upgrader *websocket.Upgrader country string - mu sync.Mutex - load atomic.Int64 - clients map[string]*testProxyServerClient + mu sync.Mutex + load atomic.Int64 + incoming atomic.Pointer[float64] + outgoing atomic.Pointer[float64] + clients map[string]*testProxyServerClient +} + +func (h *TestProxyServerHandler) UpdateBandwidth(incoming float64, outgoing float64) { + h.incoming.Store(&incoming) + h.outgoing.Store(&outgoing) + + h.mu.Lock() + defer h.mu.Unlock() + + msg := h.getLoadMessage(h.load.Load()) + for _, c := range h.clients { + c.sendMessage(msg) + } +} + +func (h *TestProxyServerHandler) Clear(incoming bool, outgoing bool) { + if incoming { + h.incoming.Store(nil) + } + if outgoing { + h.outgoing.Store(nil) + } + + h.mu.Lock() + defer h.mu.Unlock() + + msg := h.getLoadMessage(h.load.Load()) + for _, c := range h.clients { + c.sendMessage(msg) + } +} + +func (h *TestProxyServerHandler) getLoadMessage(load int64) *ProxyServerMessage { + msg := &ProxyServerMessage{ + Type: "event", + Event: &EventProxyServerMessage{ + Type: "update-load", + Load: load, + }, + } + + incoming := h.incoming.Load() + outgoing := h.outgoing.Load() + if incoming != nil || outgoing != nil { + msg.Event.Bandwidth = &EventProxyServerBandwidth{ + Incoming: incoming, + Outgoing: outgoing, + } + } + return msg } func (h *TestProxyServerHandler) updateLoad(delta int64) { @@ -455,31 +507,20 @@ func (h *TestProxyServerHandler) updateLoad(delta int64) { return } + load := h.load.Add(delta) + h.mu.Lock() defer h.mu.Unlock() - load := h.load.Add(delta) + msg := h.getLoadMessage(load) for _, c := range h.clients { - go func(c *testProxyServerClient, load int64) { - c.sendMessage(&ProxyServerMessage{ - Type: "event", - Event: &EventProxyServerMessage{ - Type: "update-load", - Load: load, - }, - }) - }(c, load) + go c.sendMessage(msg) } } func (h *TestProxyServerHandler) sendLoad(c *testProxyServerClient) { - c.sendMessage(&ProxyServerMessage{ - Type: "event", - Event: &EventProxyServerMessage{ - Type: "update-load", - Load: h.load.Load(), - }, - }) + msg := h.getLoadMessage(h.load.Load()) + c.sendMessage(msg) } func (h *TestProxyServerHandler) removeClient(client *testProxyServerClient) { @@ -705,6 +746,153 @@ func Test_ProxyWaitForPublisher(t *testing.T) { defer pub.Close(context.Background()) } +func Test_ProxyPublisherBandwidth(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + server1 := NewProxyServerForTest(t, "DE") + server2 := NewProxyServerForTest(t, "DE") + mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ + server1, + server2, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pub1Id := "the-publisher-1" + pub1Sid := "1234567890" + pub1Listener := &MockMcuListener{ + publicId: pub1Id + "-public", + } + pub1Initiator := &MockMcuInitiator{ + country: "DE", + } + pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator) + if err != nil { + t.Fatal(err) + } + + defer pub1.Close(context.Background()) + + if pub1.(*mcuProxyPublisher).conn.rawUrl == server1.URL { + server1.UpdateBandwidth(100, 0) + } else { + server2.UpdateBandwidth(100, 0) + } + + // Wait until proxy has been updated + for ctx.Err() == nil { + mcu.connectionsMu.RLock() + connections := mcu.connections + mcu.connectionsMu.RUnlock() + missing := true + for _, c := range connections { + if c.Bandwidth() != nil { + missing = false + break + } + } + if !missing { + break + } + time.Sleep(time.Millisecond) + } + + pub2Id := "the-publisher-2" + pub2id := "1234567890" + pub2Listener := &MockMcuListener{ + publicId: pub2Id + "-public", + } + pub2Initiator := &MockMcuInitiator{ + country: "DE", + } + pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator) + if err != nil { + t.Fatal(err) + } + + defer pub2.Close(context.Background()) + + if pub1.(*mcuProxyPublisher).conn.rawUrl == pub2.(*mcuProxyPublisher).conn.rawUrl { + t.Errorf("servers should be different, got %s", pub1.(*mcuProxyPublisher).conn.rawUrl) + } +} + +func Test_ProxyPublisherBandwidthOverload(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + server1 := NewProxyServerForTest(t, "DE") + server2 := NewProxyServerForTest(t, "DE") + mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ + server1, + server2, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pub1Id := "the-publisher-1" + pub1Sid := "1234567890" + pub1Listener := &MockMcuListener{ + publicId: pub1Id + "-public", + } + pub1Initiator := &MockMcuInitiator{ + country: "DE", + } + pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator) + if err != nil { + t.Fatal(err) + } + + defer pub1.Close(context.Background()) + + // If all servers are bandwidth loaded, select the one with the least usage. + if pub1.(*mcuProxyPublisher).conn.rawUrl == server1.URL { + server1.UpdateBandwidth(100, 0) + server2.UpdateBandwidth(102, 0) + } else { + server1.UpdateBandwidth(102, 0) + server2.UpdateBandwidth(100, 0) + } + + // Wait until proxy has been updated + for ctx.Err() == nil { + mcu.connectionsMu.RLock() + connections := mcu.connections + mcu.connectionsMu.RUnlock() + missing := false + for _, c := range connections { + if c.Bandwidth() == nil { + missing = true + break + } + } + if !missing { + break + } + time.Sleep(time.Millisecond) + } + + pub2Id := "the-publisher-2" + pub2id := "1234567890" + pub2Listener := &MockMcuListener{ + publicId: pub2Id + "-public", + } + pub2Initiator := &MockMcuInitiator{ + country: "DE", + } + pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator) + if err != nil { + t.Fatal(err) + } + + defer pub2.Close(context.Background()) + + if pub1.(*mcuProxyPublisher).conn.rawUrl != pub2.(*mcuProxyPublisher).conn.rawUrl { + t.Errorf("servers should be the same, got %s / %s", pub1.(*mcuProxyPublisher).conn.rawUrl, pub2.(*mcuProxyPublisher).conn.rawUrl) + } +} + func Test_ProxyPublisherLoad(t *testing.T) { CatchLogForTest(t) t.Parallel() @@ -910,3 +1098,144 @@ func Test_ProxySubscriberCountry(t *testing.T) { t.Errorf("expected server %s, go %s", serverUS.URL, sub.(*mcuProxySubscriber).conn.rawUrl) } } + +func Test_ProxySubscriberBandwidth(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + serverDE := NewProxyServerForTest(t, "DE") + serverUS := NewProxyServerForTest(t, "US") + mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ + serverDE, + serverUS, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pubId := "the-publisher" + pubSid := "1234567890" + pubListener := &MockMcuListener{ + publicId: pubId + "-public", + } + pubInitiator := &MockMcuInitiator{ + country: "DE", + } + pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) + if err != nil { + t.Fatal(err) + } + + defer pub.Close(context.Background()) + + if pub.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { + t.Errorf("expected server %s, go %s", serverDE.URL, pub.(*mcuProxyPublisher).conn.rawUrl) + } + + serverDE.UpdateBandwidth(0, 100) + + // Wait until proxy has been updated + for ctx.Err() == nil { + mcu.connectionsMu.RLock() + connections := mcu.connections + mcu.connectionsMu.RUnlock() + missing := true + for _, c := range connections { + if c.Bandwidth() != nil { + missing = false + break + } + } + if !missing { + break + } + time.Sleep(time.Millisecond) + } + + subListener := &MockMcuListener{ + publicId: "subscriber-public", + } + subInitiator := &MockMcuInitiator{ + country: "US", + } + sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) + if err != nil { + t.Fatal(err) + } + + defer sub.Close(context.Background()) + + if sub.(*mcuProxySubscriber).conn.rawUrl != serverUS.URL { + t.Errorf("expected server %s, go %s", serverUS.URL, sub.(*mcuProxySubscriber).conn.rawUrl) + } +} + +func Test_ProxySubscriberBandwidthOverload(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + serverDE := NewProxyServerForTest(t, "DE") + serverUS := NewProxyServerForTest(t, "US") + mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{ + serverDE, + serverUS, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pubId := "the-publisher" + pubSid := "1234567890" + pubListener := &MockMcuListener{ + publicId: pubId + "-public", + } + pubInitiator := &MockMcuInitiator{ + country: "DE", + } + pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator) + if err != nil { + t.Fatal(err) + } + + defer pub.Close(context.Background()) + + if pub.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL { + t.Errorf("expected server %s, go %s", serverDE.URL, pub.(*mcuProxyPublisher).conn.rawUrl) + } + + serverDE.UpdateBandwidth(0, 100) + serverUS.UpdateBandwidth(0, 102) + + // Wait until proxy has been updated + for ctx.Err() == nil { + mcu.connectionsMu.RLock() + connections := mcu.connections + mcu.connectionsMu.RUnlock() + missing := false + for _, c := range connections { + if c.Bandwidth() == nil { + missing = true + break + } + } + if !missing { + break + } + time.Sleep(time.Millisecond) + } + + subListener := &MockMcuListener{ + publicId: "subscriber-public", + } + subInitiator := &MockMcuInitiator{ + country: "US", + } + sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator) + if err != nil { + t.Fatal(err) + } + + defer sub.Close(context.Background()) + + if sub.(*mcuProxySubscriber).conn.rawUrl != serverDE.URL { + t.Errorf("expected server %s, go %s", serverDE.URL, sub.(*mcuProxySubscriber).conn.rawUrl) + } +} diff --git a/proxy.conf.in b/proxy.conf.in index c011f77..f7fdf20 100644 --- a/proxy.conf.in +++ b/proxy.conf.in @@ -36,6 +36,20 @@ tokentype = static # self-signed certificates. #skipverify = false +[bandwidth] +# Target bandwidth limit for incoming streams (in megabits per second). +# Set to 0 to disable the limit. If the limit is reached, the proxy notifies +# the signaling servers that another proxy should be used for publishing if +# possible. +#incoming = 1024 + +# Target bandwidth limit for outgoing streams (in megabits per second). +# Set to 0 to disable the limit. If the limit is reached, the proxy notifies +# the signaling servers that another proxy should be used for subscribing if +# possible. Note that this might require additional outgoing bandwidth for the +# remote streams. +#outgoing = 1024 + [tokens] # For token type "static": Mapping of = of signaling # servers allowed to connect. diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index 2552774..83b5b55 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -100,6 +100,11 @@ type ProxyServer struct { stopped atomic.Bool load atomic.Int64 + maxIncoming int64 + currentIncoming atomic.Int64 + maxOutgoing int64 + currentOutgoing atomic.Int64 + shutdownChannel chan struct{} shutdownScheduled atomic.Bool @@ -266,11 +271,32 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (* log.Printf("No token id configured, remote streams will be disabled") } + maxIncoming, _ := config.GetInt("bandwidth", "incoming") + if maxIncoming < 0 { + maxIncoming = 0 + } + if maxIncoming > 0 { + log.Printf("Target bandwidth for incoming streams: %d MBit/s", maxIncoming) + } else { + log.Printf("Target bandwidth for incoming streams: unlimited") + } + maxOutgoing, _ := config.GetInt("bandwidth", "outgoing") + if maxOutgoing < 0 { + maxOutgoing = 0 + } + if maxIncoming > 0 { + log.Printf("Target bandwidth for outgoing streams: %d MBit/s", maxOutgoing) + } else { + log.Printf("Target bandwidth for outgoing streams: unlimited") + } + result := &ProxyServer{ version: version, country: country, welcomeMessage: string(welcomeMessage) + "\n", config: config, + maxIncoming: int64(maxIncoming) * 1024 * 1024, + maxOutgoing: int64(maxOutgoing) * 1024 * 1024, shutdownChannel: make(chan struct{}), @@ -398,18 +424,7 @@ loop: } } -func (s *ProxyServer) updateLoad() { - load := s.GetClientsLoad() - if load == s.load.Load() { - return - } - - s.load.Store(load) - if s.shutdownScheduled.Load() { - // Server is scheduled to shutdown, no need to update clients with current load. - return - } - +func (s *ProxyServer) newLoadEvent(load int64, incoming int64, outgoing int64) *signaling.ProxyServerMessage { msg := &signaling.ProxyServerMessage{ Type: "event", Event: &signaling.EventProxyServerMessage{ @@ -417,7 +432,37 @@ func (s *ProxyServer) updateLoad() { Load: load, }, } + if s.maxIncoming > 0 || s.maxOutgoing > 0 { + msg.Event.Bandwidth = &signaling.EventProxyServerBandwidth{} + if s.maxIncoming > 0 { + value := float64(incoming) / float64(s.maxIncoming) * 100 + msg.Event.Bandwidth.Incoming = &value + } + if s.maxOutgoing > 0 { + value := float64(outgoing) / float64(s.maxOutgoing) * 100 + msg.Event.Bandwidth.Outgoing = &value + } + } + return msg +} +func (s *ProxyServer) updateLoad() { + load, incoming, outgoing := s.GetClientsLoad() + if load == s.load.Load() && + incoming == s.currentIncoming.Load() && + outgoing == s.currentOutgoing.Load() { + return + } + + s.load.Store(load) + s.currentIncoming.Store(incoming) + s.currentOutgoing.Store(outgoing) + if s.shutdownScheduled.Load() { + // Server is scheduled to shutdown, no need to update clients with current load. + return + } + + msg := s.newLoadEvent(load, incoming, outgoing) s.IterateSessions(func(session *ProxySession) { session.sendMessage(msg) }) @@ -579,13 +624,7 @@ func (s *ProxyServer) onMcuDisconnected() { } func (s *ProxyServer) sendCurrentLoad(session *ProxySession) { - msg := &signaling.ProxyServerMessage{ - Type: "event", - Event: &signaling.EventProxyServerMessage{ - Type: "update-load", - Load: s.load.Load(), - }, - } + msg := s.newLoadEvent(s.load.Load(), s.currentIncoming.Load(), s.currentOutgoing.Load()) session.sendMessage(msg) } @@ -1257,15 +1296,21 @@ func (s *ProxyServer) HasClients() bool { return len(s.clients) > 0 } -func (s *ProxyServer) GetClientsLoad() int64 { +func (s *ProxyServer) GetClientsLoad() (load int64, incoming int64, outgoing int64) { s.clientsLock.RLock() defer s.clientsLock.RUnlock() - var load int64 for _, c := range s.clients { - load += int64(c.MaxBitrate()) + bitrate := int64(c.MaxBitrate()) + load += bitrate + if _, ok := c.(signaling.McuPublisher); ok { + incoming += bitrate + } else if _, ok := c.(signaling.McuSubscriber); ok { + outgoing += bitrate + } } - return load / 1024 + load = load / 1024 + return } func (s *ProxyServer) GetClient(id string) signaling.McuClient { diff --git a/slices_go120.go b/slices_go120.go new file mode 100644 index 0000000..de80826 --- /dev/null +++ b/slices_go120.go @@ -0,0 +1,34 @@ +//go:build !go1.21 + +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2024 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "sort" +) + +func SlicesSortFunc[T any](l []T, f func(a T, b T) int) { + sort.Slice(l, func(i, j int) bool { + return f(l[i], l[j]) < 0 + }) +} diff --git a/slices_go121.go b/slices_go121.go new file mode 100644 index 0000000..bc41535 --- /dev/null +++ b/slices_go121.go @@ -0,0 +1,32 @@ +//go:build go1.21 + +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2024 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "slices" +) + +func SlicesSortFunc[T any](l []T, f func(a T, b T) int) { + slices.SortFunc(l, f) +}