Add per-room bandwidth limits.

This commit is contained in:
Joachim Bauch 2025-11-05 16:46:09 +01:00
commit 90a29ba130
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
16 changed files with 568 additions and 14 deletions

View file

@ -132,10 +132,11 @@ type ProxyServer struct {
stopped atomic.Bool
load atomic.Uint64
maxIncoming api.AtomicBandwidth
currentIncoming api.AtomicBandwidth
maxOutgoing api.AtomicBandwidth
currentOutgoing api.AtomicBandwidth
maxIncoming api.AtomicBandwidth
currentIncoming api.AtomicBandwidth
maxOutgoing api.AtomicBandwidth
currentOutgoing api.AtomicBandwidth
currentBandwidths atomic.Pointer[map[string]*sfu.ClientBandwidthInfo]
shutdownChannel chan struct{}
shutdownScheduled atomic.Bool
@ -541,26 +542,32 @@ func (s *ProxyServer) newLoadEvent(load uint64, incoming api.Bandwidth, outgoing
}
func (s *ProxyServer) updateLoad() {
load, incoming, outgoing := s.GetClientsLoad()
load, incoming, outgoing, bandwidths := s.GetClientsLoad()
oldLoad := s.load.Swap(load)
oldIncoming := s.currentIncoming.Swap(incoming)
oldOutgoing := s.currentOutgoing.Swap(outgoing)
if oldLoad == load && oldIncoming == incoming && oldOutgoing == outgoing {
if len(bandwidths) == 0 {
s.currentBandwidths.Store(nil)
} else {
s.currentBandwidths.Store(&bandwidths)
}
if oldLoad == load && oldIncoming == incoming && oldOutgoing == outgoing && len(bandwidths) == 0 {
return
}
statsLoadCurrent.Set(float64(load))
s.sendLoadToAll(load, incoming, outgoing)
s.sendLoadToAll(load, incoming, outgoing, bandwidths)
}
func (s *ProxyServer) sendLoadToAll(load uint64, incoming api.Bandwidth, outgoing api.Bandwidth) {
func (s *ProxyServer) sendLoadToAll(load uint64, incoming api.Bandwidth, outgoing api.Bandwidth, bandwidths map[string]*sfu.ClientBandwidthInfo) {
if s.shutdownScheduled.Load() {
// Server is scheduled to shutdown, no need to update clients with current load.
return
}
msg := s.newLoadEvent(load, incoming, outgoing)
loadEvent := s.newLoadEvent(load, incoming, outgoing)
s.IterateSessions(func(session *ProxySession) {
msg := session.updateLoadEvent(loadEvent, bandwidths)
session.sendMessage(msg)
})
}
@ -661,7 +668,11 @@ func (s *ProxyServer) Reload(config *goconf.ConfigFile) {
oldOutgoing := s.maxOutgoing.Swap(maxOutgoing)
if oldIncoming != maxIncoming || oldOutgoing != maxOutgoing {
// Notify sessions about updated load / bandwidth usage.
go s.sendLoadToAll(s.load.Load(), s.currentIncoming.Load(), s.currentOutgoing.Load())
var bandwidths map[string]*sfu.ClientBandwidthInfo
if bw := s.currentBandwidths.Load(); bw != nil {
bandwidths = *bw
}
go s.sendLoadToAll(s.load.Load(), s.currentIncoming.Load(), s.currentOutgoing.Load(), bandwidths)
}
s.tokens.Reload(config)
@ -749,7 +760,12 @@ func (s *ProxyServer) onMcuDisconnected() {
}
func (s *ProxyServer) sendCurrentLoad(session *ProxySession) {
var bandwidths map[string]*sfu.ClientBandwidthInfo
if bw := s.currentBandwidths.Load(); bw != nil {
bandwidths = *bw
}
msg := s.newLoadEvent(s.load.Load(), s.currentIncoming.Load(), s.currentOutgoing.Load())
msg = session.updateLoadEvent(msg, bandwidths)
session.sendMessage(msg)
}
@ -1598,14 +1614,18 @@ func (s *ProxyServer) HasClients() bool {
return len(s.clients) > 0
}
func (s *ProxyServer) GetClientsLoad() (load uint64, incoming api.Bandwidth, outgoing api.Bandwidth) {
func (s *ProxyServer) GetClientsLoad() (load uint64, incoming api.Bandwidth, outgoing api.Bandwidth, bandwidths map[string]*sfu.ClientBandwidthInfo) {
s.clientsLock.RLock()
defer s.clientsLock.RUnlock()
for _, c := range s.clients {
for id, c := range s.clients {
// Use "current" bandwidth usage if supported.
if bw, ok := c.(sfu.ClientWithBandwidth); ok {
if bandwidth := bw.Bandwidth(); bandwidth != nil {
if bandwidths == nil {
bandwidths = make(map[string]*sfu.ClientBandwidthInfo)
}
bandwidths[id] = bandwidth
incoming += bandwidth.Received
outgoing += bandwidth.Sent
continue

View file

@ -468,6 +468,10 @@ func (p *TestPublisherWithBandwidth) Bandwidth() *sfu.ClientBandwidthInfo {
return p.bandwidth
}
func (p *TestPublisherWithBandwidth) SetBandwidth(ctx context.Context, bandwidth api.Bandwidth) error {
return errors.New("not implemented")
}
func (m *PublisherTestMCU) NewPublisher(ctx context.Context, listener sfu.Listener, id api.PublicSessionId, sid string, streamType sfu.StreamType, settings sfu.NewPublisherSettings, initiator sfu.Initiator) (sfu.Publisher, error) {
publisher := &TestPublisherWithBandwidth{
TestMCUPublisher: TestMCUPublisher{

View file

@ -484,3 +484,77 @@ func (s *ProxySession) OnRemotePublisherDeleted(publisherId api.PublicSessionId)
}
}
}
func cloneLoadMessageWithoutClientBandwidths(msg *proxy.ServerMessage) *proxy.ServerMessage {
return &proxy.ServerMessage{
Id: msg.Id,
Type: msg.Type,
Event: &proxy.EventServerMessage{
Type: msg.Event.Type,
ClientId: msg.Event.ClientId,
Load: msg.Event.Load,
Sid: msg.Event.Sid,
Bandwidth: msg.Event.Bandwidth,
},
}
}
func (s *ProxySession) updateLoadEventPublishers(msg *proxy.ServerMessage, bandwidths map[string]*sfu.ClientBandwidthInfo, needClone bool) (result *proxy.ServerMessage, cloned bool) {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
result = msg
for id := range s.publishers {
if bw, found := bandwidths[id]; found {
if needClone {
result = cloneLoadMessageWithoutClientBandwidths(msg)
needClone = false
cloned = true
}
if result.Event.ClientBandwidths == nil {
result.Event.ClientBandwidths = make(map[string]proxy.EventServerBandwidth)
}
result.Event.ClientBandwidths[id] = proxy.EventServerBandwidth{
Received: bw.Received,
Sent: bw.Sent,
}
}
}
return
}
func (s *ProxySession) updateLoadEventSubscribers(msg *proxy.ServerMessage, bandwidths map[string]*sfu.ClientBandwidthInfo, needClone bool) (result *proxy.ServerMessage, cloned bool) {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
result = msg
for id := range s.subscribers {
if bw, found := bandwidths[id]; found {
if needClone {
result = cloneLoadMessageWithoutClientBandwidths(msg)
needClone = false
cloned = true
}
if result.Event.ClientBandwidths == nil {
result.Event.ClientBandwidths = make(map[string]proxy.EventServerBandwidth)
}
result.Event.ClientBandwidths[id] = proxy.EventServerBandwidth{
Received: bw.Received,
Sent: bw.Sent,
}
}
}
return
}
func (s *ProxySession) updateLoadEvent(msg *proxy.ServerMessage, bandwidths map[string]*sfu.ClientBandwidthInfo) *proxy.ServerMessage {
if len(bandwidths) == 0 {
return msg
}
msg, cloned := s.updateLoadEventPublishers(msg, bandwidths, true)
msg, _ = s.updateLoadEventSubscribers(msg, bandwidths, !cloned)
return msg
}

View file

@ -80,6 +80,7 @@ type grpcClientImpl struct {
RpcInternalClient
RpcMcuClient
RpcSessionsClient
RpcRoomsClient
}
func newClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl {
@ -88,6 +89,7 @@ func newClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl {
RpcInternalClient: NewRpcInternalClient(conn),
RpcMcuClient: NewRpcMcuClient(conn),
RpcSessionsClient: NewRpcSessionsClient(conn),
RpcRoomsClient: NewRpcRoomsClient(conn),
}
}
@ -456,6 +458,28 @@ func (c *Client) ProxySession(ctx context.Context, sessionId api.PublicSessionId
return proxy, nil
}
func (c *Client) GetRoomBandwidth(ctx context.Context, roomId string, urls []string) (uint32, uint32, *sfu.ClientBandwidthInfo, error) {
statsGrpcClientCalls.WithLabelValues("GetRoomBandwidth").Inc()
response, err := c.impl.GetRoomBandwidth(ctx, &RoomBandwidthRequest{
RoomId: roomId,
BackendUrls: urls,
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return 0, 0, nil, nil
} else if err != nil {
return 0, 0, nil, err
}
var bandwidth *sfu.ClientBandwidthInfo
if response.GetIncoming() != 0 || response.GetOutgoing() != 0 {
bandwidth = &sfu.ClientBandwidthInfo{
Sent: api.BandwidthFromBits(response.GetOutgoing()),
Received: api.BandwidthFromBits(response.GetIncoming()),
}
}
return response.GetPublishers(), response.GetSubscribers(), bandwidth, nil
}
type clientsList struct {
clients []*Client
entry *dns.MonitorEntry

42
grpc/room.proto Normal file
View file

@ -0,0 +1,42 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
syntax = "proto3";
option go_package = "github.com/strukturag/nextcloud-spreed-signaling/grpc";
package grpc;
service RpcRooms {
rpc GetRoomBandwidth(RoomBandwidthRequest) returns (RoomBandwidthReply) {}
}
message RoomBandwidthRequest {
string roomId = 1;
repeated string backendUrls = 2;
}
message RoomBandwidthReply {
uint32 publishers = 1;
uint32 subscribers = 2;
uint64 incoming = 3;
uint64 outgoing = 4;
}

View file

@ -57,6 +57,7 @@ type ServerHub interface {
GetInternalSessions(roomId string, backend *talk.Backend) ([]*InternalSessionData, []*VirtualSessionData, bool)
GetTransientEntries(roomId string, backend *talk.Backend) (api.TransientDataEntries, bool)
GetPublisherIdForSessionId(ctx context.Context, sessionId api.PublicSessionId, streamType sfu.StreamType) (*GetPublisherIdReply, error)
GetRoomBandwidth(roomId string, backend *talk.Backend) (uint32, uint32, *sfu.ClientBandwidthInfo, bool)
ProxySession(request RpcSessions_ProxySessionServer) error
}
@ -66,6 +67,7 @@ type Server struct {
UnimplementedRpcInternalServer
UnimplementedRpcMcuServer
UnimplementedRpcSessionsServer
UnimplementedRpcRoomsServer
logger log.Logger
version string
@ -106,6 +108,7 @@ func NewServer(ctx context.Context, cfg *goconf.ConfigFile, version string) (*Se
RegisterRpcInternalServer(conn, result)
RegisterRpcSessionsServer(conn, result)
RegisterRpcMcuServer(conn, result)
RegisterRpcRoomsServer(conn, result)
return result, nil
}
@ -336,3 +339,53 @@ func (s *Server) ProxySession(request RpcSessions_ProxySessionServer) error {
return s.hub.ProxySession(request)
}
func (s *Server) GetRoomBandwidth(ctx context.Context, request *RoomBandwidthRequest) (*RoomBandwidthReply, error) {
statsGrpcServerCalls.WithLabelValues("GetRoomBandwidth").Inc()
var backendUrls []string
if len(request.BackendUrls) > 0 {
backendUrls = request.BackendUrls
} else {
// Only compat backend.
backendUrls = []string{""}
}
var result RoomBandwidthReply
processed := make(map[string]bool)
for _, bu := range backendUrls {
var parsed *url.URL
if bu != "" {
var err error
parsed, err = url.Parse(bu)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "invalid url")
}
}
backend := s.hub.GetBackend(parsed)
if backend == nil {
return nil, status.Error(codes.NotFound, "no such backend")
}
// Only process each backend once.
if processed[backend.Id()] {
continue
}
processed[backend.Id()] = true
publishers, subscribers, bandwidth, found := s.hub.GetRoomBandwidth(request.RoomId, backend)
if !found {
return nil, status.Error(codes.NotFound, "no such room")
}
result.Publishers += publishers
result.Subscribers += subscribers
if bandwidth != nil {
result.Incoming += bandwidth.Received.Bits()
result.Outgoing += bandwidth.Sent.Bits()
}
}
return &result, nil
}

View file

@ -503,6 +503,17 @@ func (h *testServerHub) ProxySession(request RpcSessions_ProxySessionServer) err
return nil
}
func (h *testServerHub) GetRoomBandwidth(roomId string, backend *talk.Backend) (uint32, uint32, *sfu.ClientBandwidthInfo, bool) {
if roomId == testRoomId && backend == h.backend {
return 1, 2, &sfu.ClientBandwidthInfo{
Sent: 1000,
Received: 2000,
}, true
}
return 0, 0, nil, false
}
func TestServer_GetSessionIdByResumeId(t *testing.T) {
t.Parallel()
@ -852,3 +863,43 @@ func TestServer_GetSessionCount(t *testing.T) {
}
}
}
func TestServer_GetRoomBandwidth(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
hub := newTestServerHub(t)
server, addr := NewServerForTest(t)
server.SetHub(hub)
clients, _ := NewClientsForTest(t, addr, nil)
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
require.NoError(clients.WaitForInitialized(ctx))
for _, client := range clients.GetClients() {
if publishers, subscribers, bandwidth, err := client.GetRoomBandwidth(ctx, testRoomId+"1", []string{testBackendUrl}); assert.NoError(err) {
assert.EqualValues(0, publishers)
assert.EqualValues(0, subscribers)
assert.Nil(bandwidth)
}
if publishers, subscribers, bandwidth, err := client.GetRoomBandwidth(ctx, testRoomId, []string{testBackendUrl + "1"}); assert.NoError(err) {
assert.EqualValues(0, publishers)
assert.EqualValues(0, subscribers)
assert.Nil(bandwidth)
}
if publishers, subscribers, bandwidth, err := client.GetRoomBandwidth(ctx, testRoomId, []string{testBackendUrl}); assert.NoError(err) {
assert.EqualValues(1, publishers)
assert.EqualValues(2, subscribers)
if assert.NotNil(bandwidth) {
assert.EqualValues(1000, bandwidth.Sent)
assert.EqualValues(2000, bandwidth.Received)
}
}
}
}

View file

@ -349,6 +349,8 @@ type EventServerMessage struct {
Sid string `json:"sid,omitempty"`
Bandwidth *EventServerBandwidth `json:"bandwidth,omitempty"`
ClientBandwidths map[string]EventServerBandwidth `json:"clientBandwidths,omitempty"`
}
// Information on a proxy in the etcd cluster.

View file

@ -1641,3 +1641,69 @@ func (s *ClientSession) ProcessResponse(message *api.ClientMessage) bool {
return cb(message)
}
func (s *ClientSession) Bandwidth() (uint32, uint32, *sfu.ClientBandwidthInfo) {
s.mu.Lock()
defer s.mu.Unlock()
var publishers uint32
var subscribers uint32
var bandwidth *sfu.ClientBandwidthInfo
for _, pub := range s.publishers {
if pub.StreamType() != sfu.StreamTypeVideo {
continue
}
if pub, ok := pub.(sfu.ClientWithBandwidth); ok {
if bw := pub.Bandwidth(); bw != nil {
if bandwidth == nil {
bandwidth = &sfu.ClientBandwidthInfo{}
}
bandwidth.Received += bw.Received
bandwidth.Sent += bw.Sent
publishers++
}
}
}
for _, sub := range s.subscribers {
if sub.StreamType() != sfu.StreamTypeVideo {
continue
}
if sub, ok := sub.(sfu.ClientWithBandwidth); ok {
if bw := sub.Bandwidth(); bw != nil {
if bandwidth == nil {
bandwidth = &sfu.ClientBandwidthInfo{}
}
bandwidth.Received += bw.Received
bandwidth.Sent += bw.Sent
subscribers++
}
}
}
return publishers, subscribers, bandwidth
}
func (s *ClientSession) UpdatePublisherBandwidth(ctx context.Context, streamType sfu.StreamType, bandwidth api.Bandwidth) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, pub := range s.publishers {
if pub.StreamType() != streamType {
continue
}
if pub, ok := pub.(sfu.ClientWithBandwidth); ok {
s.mu.Unlock()
defer s.mu.Lock()
return pub.SetBandwidth(ctx, bandwidth)
}
}
return nil
}

View file

@ -2100,6 +2100,16 @@ func (h *Hub) GetTransientEntries(roomId string, backend *talk.Backend) (api.Tra
return entries, true
}
func (h *Hub) GetRoomBandwidth(roomId string, backend *talk.Backend) (uint32, uint32, *sfu.ClientBandwidthInfo, bool) {
room := h.GetRoomForBackend(roomId, backend)
if room == nil {
return 0, 0, nil, false
}
publishers, subscribers, bandwidth := room.Bandwidth()
return publishers, subscribers, bandwidth, true
}
func (h *Hub) removeRoom(room *Room) {
internalRoomId := getRoomIdForBackend(room.Id(), room.Backend())
h.ru.Lock()

View file

@ -141,6 +141,10 @@ func (h *mockGrpcServerHub) GetPublisherIdForSessionId(ctx context.Context, sess
return nil, status.Error(codes.NotFound, "no such publisher")
}
func (h *mockGrpcServerHub) GetRoomBandwidth(roomId string, backend *talk.Backend) (uint32, uint32, *sfu.ClientBandwidthInfo, bool) {
return 0, 0, nil, false
}
func (h *mockGrpcServerHub) ProxySession(request grpc.RpcSessions_ProxySessionServer) error {
return errors.New("not implemented")
}

View file

@ -31,6 +31,7 @@ import (
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -41,6 +42,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
"github.com/strukturag/nextcloud-spreed-signaling/sfu"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
@ -62,6 +64,7 @@ const (
var (
updateActiveSessionsInterval = 10 * time.Second
updateRoomBandwidthInterval = 1 * time.Second
)
func init() {
@ -102,6 +105,13 @@ type Room struct {
lastRoomRequests map[string]int64
transientData *api.TransientData
publishersCount atomic.Uint32
subscribersCount atomic.Uint32
bandwidth atomic.Pointer[sfu.ClientBandwidthInfo]
// bandwidthPerRoom is the maximum incoming bandwidth per room.
bandwidthPerRoom api.Bandwidth
}
func getRoomIdForBackend(id string, backend *talk.Backend) string {
@ -140,6 +150,9 @@ func NewRoom(roomId string, properties json.RawMessage, hub *Hub, asyncEvents ev
lastRoomRequests: make(map[string]int64),
transientData: api.NewTransientData(),
// TODO: Make configurable
bandwidthPerRoom: api.BandwidthFromMegabits(10),
}
if err := asyncEvents.RegisterBackendRoomListener(roomId, backend, room); err != nil {
@ -192,7 +205,9 @@ func (r *Room) AsyncChannel() events.AsyncChannel {
}
func (r *Room) run() {
ticker := time.NewTicker(updateActiveSessionsInterval)
sessionsTicker := time.NewTicker(updateActiveSessionsInterval)
bandwidtTicker := time.NewTicker(updateRoomBandwidthInterval)
loop:
for {
select {
@ -203,8 +218,10 @@ loop:
for count := len(r.asyncCh); count > 0; count-- {
r.processAsyncNatsMessage(<-r.asyncCh)
}
case <-ticker.C:
case <-sessionsTicker.C:
r.publishActiveSessions()
case <-bandwidtTicker.C:
r.updateBandwidth()
}
}
}
@ -1353,3 +1370,125 @@ func (r *Room) fetchInitialTransientData() {
r.transientData.SetInitial(initial)
}
}
func (r *Room) Bandwidth() (uint32, uint32, *sfu.ClientBandwidthInfo) {
return r.publishersCount.Load(), r.subscribersCount.Load(), r.bandwidth.Load()
}
func (r *Room) getLocalBandwidth() (uint32, uint32, *sfu.ClientBandwidthInfo, []SessionWithBandwidth) {
r.mu.RLock()
defer r.mu.RUnlock()
var publishers uint32
var subscribers uint32
var bandwidth *sfu.ClientBandwidthInfo
var publisherSessions []SessionWithBandwidth
for _, session := range r.sessions {
if s, ok := session.(SessionWithBandwidth); ok {
pub, sub, bw := s.Bandwidth()
if bw != nil {
if bandwidth == nil {
bandwidth = &sfu.ClientBandwidthInfo{}
}
bandwidth.Received += bw.Received
bandwidth.Sent += bw.Sent
}
publishers += pub
subscribers += sub
if pub > 0 {
publisherSessions = append(publisherSessions, s)
}
}
}
r.publishersCount.Store(publishers)
r.subscribersCount.Store(subscribers)
r.bandwidth.Store(bandwidth)
return publishers, subscribers, bandwidth, publisherSessions
}
func (r *Room) getRemoteBandwidth() (uint32, uint32, *sfu.ClientBandwidthInfo) {
if r.hub.rpcClients == nil {
return 0, 0, nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var mu sync.Mutex
var wg sync.WaitGroup
var publishers atomic.Uint32
var subscribers atomic.Uint32
var bandwidth *sfu.ClientBandwidthInfo
for _, client := range r.hub.rpcClients.GetClients() {
wg.Add(1)
go func(c *grpc.Client) {
defer wg.Done()
pub, sub, bw, err := c.GetRoomBandwidth(ctx, r.id, r.backend.Urls())
if err != nil {
r.logger.Printf("Received error while getting bandwidth for %s@%s from %s: %s", r.Id(), r.Backend().Id(), c.Target(), err)
return
}
publishers.Add(pub)
subscribers.Add(sub)
if bw != nil {
mu.Lock()
defer mu.Unlock()
if bandwidth == nil {
bandwidth = bw
} else {
bandwidth.Received += bw.Received
bandwidth.Sent += bw.Sent
}
}
}(client)
}
wg.Wait()
return publishers.Load(), subscribers.Load(), bandwidth
}
func (r *Room) updateBandwidth() {
publishers, subscribers, bandwidth, publisherSessions := r.getLocalBandwidth()
if remotePublishers, remoteSubscribers, remote := r.getRemoteBandwidth(); remote != nil {
if bandwidth == nil {
bandwidth = &sfu.ClientBandwidthInfo{
Received: remote.Received,
Sent: remote.Sent,
}
} else {
bandwidth.Received += remote.Received
bandwidth.Sent += remote.Sent
}
publishers += remotePublishers
subscribers += remoteSubscribers
}
if publishers != 0 || subscribers != 0 || bandwidth != nil {
perPublisher := api.BandwidthFromBits(r.bandwidthPerRoom.Bits() / max(uint64(publishers), 2))
if maxBitrate := r.Backend().MaxStreamBitrate(); perPublisher < maxBitrate {
perPublisher = maxBitrate
}
r.logger.Printf("Bandwidth in room %s for %d pub / %d sub: %+v (max %s)", r.Id(), publishers, subscribers, bandwidth, perPublisher)
if perPublisher != 0 {
for _, session := range publisherSessions {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), r.hub.mcuTimeout)
defer cancel()
if err := session.UpdatePublisherBandwidth(ctx, sfu.StreamTypeVideo, perPublisher); err != nil {
r.logger.Printf("Could not update bandwidth of %s publisher in %s: %s", sfu.StreamTypeVideo, session.PublicId(), err)
}
}()
}
}
}
}

View file

@ -30,6 +30,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/session"
"github.com/strukturag/nextcloud-spreed-signaling/sfu"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
@ -65,6 +66,13 @@ type SessionWithInCall interface {
GetInCall() int
}
type SessionWithBandwidth interface {
Session
Bandwidth() (uint32, uint32, *sfu.ClientBandwidthInfo)
UpdatePublisherBandwidth(ctx context.Context, streamType sfu.StreamType, bandwidth api.Bandwidth) error
}
func parseUserData(data json.RawMessage) func() (api.StringMap, error) {
return sync.OnceValues(func() (api.StringMap, error) {
if len(data) == 0 {

View file

@ -202,6 +202,7 @@ type ClientWithBandwidth interface {
Client
Bandwidth() *ClientBandwidthInfo
SetBandwidth(ctx context.Context, bandwidth api.Bandwidth) error
}
type Publisher interface {

View file

@ -123,6 +123,24 @@ func (c *janusClient) Bandwidth() *sfu.ClientBandwidthInfo {
return result
}
func (c *janusClient) SetBandwidth(ctx context.Context, bandwidth api.Bandwidth) error {
handle := c.handle.Load()
if handle == nil {
return sfu.ErrNotConnected
}
configure_msg := api.StringMap{
"request": "configure",
"bitrate": bandwidth.Bits(),
}
_, err := handle.Message(ctx, configure_msg, nil)
if err != nil {
return err
}
return nil
}
func (c *janusClient) closeClient(ctx context.Context) bool {
if handle := c.handle.Swap(nil); handle != nil {
close(c.closeChan)

View file

@ -109,6 +109,7 @@ type proxyPubSubCommon struct {
proxyId string
conn *proxyConnection
listener sfu.Listener
bandwidth atomic.Pointer[sfu.ClientBandwidthInfo]
}
func (c *proxyPubSubCommon) Id() string {
@ -127,6 +128,17 @@ func (c *proxyPubSubCommon) MaxBitrate() api.Bandwidth {
return c.maxBitrate
}
func (c *proxyPubSubCommon) UpdateBandwidth(sent api.Bandwidth, received api.Bandwidth) {
c.bandwidth.Store(&sfu.ClientBandwidthInfo{
Sent: sent,
Received: received,
})
}
func (c *proxyPubSubCommon) Bandwidth() *sfu.ClientBandwidthInfo {
return c.bandwidth.Load()
}
func (c *proxyPubSubCommon) doSendMessage(ctx context.Context, msg *proxy.ClientMessage, callback func(error, api.StringMap)) {
c.conn.performAsyncRequest(ctx, msg, func(err error, response *proxy.ServerMessage) {
if err != nil {
@ -1110,6 +1122,28 @@ func (c *proxyConnection) processPayload(msg *proxy.ServerMessage) {
c.logger.Printf("Received payload for unknown client %+v from %s", payload, c)
}
func (c *proxyConnection) updatePublisherBandwidths(bandwidths map[string]proxy.EventServerBandwidth) {
c.publishersLock.RLock()
defer c.publishersLock.RUnlock()
for id, pub := range c.publishers {
if bw, ok := bandwidths[id]; ok {
pub.UpdateBandwidth(bw.Sent, bw.Received)
}
}
}
func (c *proxyConnection) updateSubscriberBandwidths(bandwidths map[string]proxy.EventServerBandwidth) {
c.subscribersLock.RLock()
defer c.subscribersLock.RUnlock()
for id, sub := range c.subscribers {
if bw, ok := bandwidths[id]; ok {
sub.UpdateBandwidth(bw.Sent, bw.Received)
}
}
}
func (c *proxyConnection) processEvent(msg *proxy.ServerMessage) {
event := msg.Event
switch event.Type {
@ -1144,6 +1178,10 @@ func (c *proxyConnection) processEvent(msg *proxy.ServerMessage) {
statsProxyUsageCurrent.WithLabelValues(c.url.String(), "outgoing").Set(0)
}
}
if len(event.ClientBandwidths) > 0 {
c.updatePublisherBandwidths(event.ClientBandwidths)
c.updateSubscriberBandwidths(event.ClientBandwidths)
}
return
case "shutdown-scheduled":
c.logger.Printf("Proxy %s is scheduled to shutdown", c)