Merge pull request #1102 from strukturag/janus-events

Expose real bandwidth usage through metrics.
This commit is contained in:
Joachim Bauch 2025-11-05 12:25:38 +01:00 committed by GitHub
commit 9e2633e99c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 1958 additions and 51 deletions

View file

@ -139,14 +139,23 @@ server.
A Janus server (from https://github.com/meetecho/janus-gateway) can be used to
act as a WebRTC gateway. See the documentation of Janus on how to configure and
run the server. At least the `VideoRoom` plugin and the websocket transport of
Janus must be enabled.
run the server. At least the `VideoRoom` plugin, the websocket transport and the
websocket events handler of Janus must be enabled. Also broadcasting of events
must be enabled.
The signaling server uses the `VideoRoom` plugin of Janus to manage sessions.
All gateway details are hidden from the clients, all messages are sent through
the signaling server. Only WebRTC media is exchanged directly between the
gateway and the clients.
To enable sending of events from Janus, the option `broadcast` must be set to
`true` in the block `events` of `janus.jcfg`. In the configuration of the
websocket events handler (`janus.eventhandler.wsevh.jcfg`), the module must be
enabled by setting `enabled` to `true`, the `backend` must be set to the
websocket url of the signaling server (or signaling proxy) and `subprotocol`
must be set to `janus-events`.
At least events of type `media` must be subscribed.
Edit the `server.conf` and enter the URL to the websocket endpoint of Janus in
the section `[mcu]` and key `url`. During startup, the signaling server will
connect to Janus and log information of the gateway.

View file

@ -546,7 +546,7 @@ type BackendServerInfoSfuProxy struct {
Features []string `json:"features,omitempty"`
Country string `json:"country,omitempty"`
Load *int64 `json:"load,omitempty"`
Load *uint64 `json:"load,omitempty"`
Bandwidth *EventProxyServerBandwidth `json:"bandwidth,omitempty"`
}

View file

@ -1018,12 +1018,12 @@ func easyjson4354c623DecodeGithubComStrukturagNextcloudSpreedSignaling8(in *jlex
out.Load = nil
} else {
if out.Load == nil {
out.Load = new(int64)
out.Load = new(uint64)
}
if in.IsNull() {
in.Skip()
} else {
*out.Load = int64(in.Int64())
*out.Load = uint64(in.Uint64())
}
}
case "bandwidth":
@ -1111,7 +1111,7 @@ func easyjson4354c623EncodeGithubComStrukturagNextcloudSpreedSignaling8(out *jwr
if in.Load != nil {
const prefix string = ",\"load\":"
out.RawString(prefix)
out.Int64(int64(*in.Load))
out.Uint64(uint64(*in.Load))
}
if in.Bandwidth != nil {
const prefix string = ",\"bandwidth\":"

View file

@ -321,6 +321,11 @@ type EventProxyServerBandwidth struct {
Incoming *float64 `json:"incoming,omitempty"`
// Outgoing is the bandwidth utilization for subscribers in percent.
Outgoing *float64 `json:"outgoing,omitempty"`
// Received is the incoming bandwidth in bytes per second.
Received uint64 `json:"received,omitempty"`
// Sent is the outgoing bandwidth in bytes per second.
Sent uint64 `json:"sent,omitempty"`
}
func (b *EventProxyServerBandwidth) String() string {
@ -347,7 +352,7 @@ type EventProxyServerMessage struct {
Type string `json:"type"`
ClientId string `json:"clientId,omitempty"`
Load int64 `json:"load,omitempty"`
Load uint64 `json:"load,omitempty"`
Sid string `json:"sid,omitempty"`
Bandwidth *EventProxyServerBandwidth `json:"bandwidth,omitempty"`

View file

@ -1294,7 +1294,7 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling9(in *jlex
if in.IsNull() {
in.Skip()
} else {
out.Load = int64(in.Int64())
out.Load = uint64(in.Uint64())
}
case "sid":
if in.IsNull() {
@ -1343,7 +1343,7 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling9(out *jwr
if in.Load != 0 {
const prefix string = ",\"load\":"
out.RawString(prefix)
out.Int64(int64(in.Load))
out.Uint64(uint64(in.Load))
}
if in.Sid != "" {
const prefix string = ",\"sid\":"
@ -1423,6 +1423,18 @@ func easyjson1c8542dbDecodeGithubComStrukturagNextcloudSpreedSignaling10(in *jle
*out.Outgoing = float64(in.Float64())
}
}
case "received":
if in.IsNull() {
in.Skip()
} else {
out.Received = uint64(in.Uint64())
}
case "sent":
if in.IsNull() {
in.Skip()
} else {
out.Sent = uint64(in.Uint64())
}
default:
in.SkipRecursive()
}
@ -1453,6 +1465,26 @@ func easyjson1c8542dbEncodeGithubComStrukturagNextcloudSpreedSignaling10(out *jw
}
out.Float64(float64(*in.Outgoing))
}
if in.Received != 0 {
const prefix string = ",\"received\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.Uint64(uint64(in.Received))
}
if in.Sent != 0 {
const prefix string = ",\"sent\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.Uint64(uint64(in.Sent))
}
out.RawByte('}')
}

View file

@ -55,3 +55,7 @@ The following metrics are available:
| `signaling_backend_client_requests_total` | Counter | 2.0.3 | The total number of backend client requests | `backend` |
| `signaling_backend_client_requests_duration` | Histogram | 2.0.3 | The duration of backend client requests in seconds | `backend` |
| `signaling_backend_client_requests_errors_total` | Counter | 2.0.3 | The total number of backend client requests that had an error | `backend`, `error` |
| `signaling_mcu_bandwidth` | Gauge | 2.0.5 | The current bandwidth in bytes per second | `direction` |
| `signaling_mcu_backend_usage` | Gauge | 2.0.5 | The current usage of signaling proxy backends in percent | `url`, `direction` |
| `signaling_mcu_backend_bandwidth` | Gauge | 2.0.5 | The current bandwidth of signaling proxy backends in bytes per second | `url`, `direction` |
| `signaling_proxy_load` | Gauge | 2.0.5 | The current load of the signaling proxy | |

8
hub.go
View file

@ -355,6 +355,9 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer
ReadBufferSize: websocketReadBufferSize,
WriteBufferSize: websocketWriteBufferSize,
WriteBufferPool: websocketWriteBufferPool,
Subprotocols: []string{
JanusEventsSubprotocol,
},
},
cookie: NewSessionIdCodec([]byte(hashKey), blockBytes),
info: NewWelcomeServerMessage(version, DefaultFeatures...),
@ -3094,6 +3097,11 @@ func (h *Hub) serveWs(w http.ResponseWriter, r *http.Request) {
return
}
if conn.Subprotocol() == JanusEventsSubprotocol {
RunJanusEventsHandler(r.Context(), h.mcu, conn, addr, agent)
return
}
client, err := NewClient(r.Context(), conn, addr, agent, h)
if err != nil {
log.Printf("Could not create client for %s: %s", addr, err)

44
internal/ips.go Normal file
View file

@ -0,0 +1,44 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package internal
import (
"net"
)
func IsLoopbackIP(addr string) bool {
ip := net.ParseIP(addr)
if len(ip) == 0 {
return false
}
return ip.IsLoopback()
}
func IsPrivateIP(addr string) bool {
ip := net.ParseIP(addr)
if len(ip) == 0 {
return false
}
return ip.IsPrivate()
}

78
internal/ips_test.go Normal file
View file

@ -0,0 +1,78 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package internal
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestIsLoopbackIP(t *testing.T) {
loopback := []string{
"127.0.0.1",
"127.1.0.1",
"::1",
"::ffff:127.0.0.1",
}
nonLoopback := []string{
"",
"invalid",
"1.2.3.4",
"::0",
"::2",
}
assert := assert.New(t)
for _, ip := range loopback {
assert.True(IsLoopbackIP(ip), "should be loopback: %s", ip)
}
for _, ip := range nonLoopback {
assert.False(IsLoopbackIP(ip), "should not be loopback: %s", ip)
}
}
func TestIsPrivateIP(t *testing.T) {
private := []string{
"10.1.2.3",
"172.20.21.22",
"192.168.10.20",
"fdea:aef9:06e3:bb24:1234:1234:1234:1234",
"fd12:3456:789a:1::1",
}
nonPrivate := []string{
"",
"invalid",
"127.0.0.1",
"1.2.3.4",
"::0",
"::1",
"::2",
"1234:3456:789a:1::1",
}
assert := assert.New(t)
for _, ip := range private {
assert.True(IsPrivateIP(ip), "should be private: %s", ip)
}
for _, ip := range nonPrivate {
assert.False(IsPrivateIP(ip), "should not be private: %s", ip)
}
}

View file

@ -142,18 +142,30 @@ var msgtypes = map[string]func() any{
"trickle": func() any { return &TrickleMsg{} },
}
type InfoDependencies struct {
Glib2 string `json:"glib2"`
Jansson string `json:"jansson"`
Libnice string `json:"libnice"`
Libsrtp string `json:"libsrtp"`
Libcurl string `json:"libcurl,omitempty"`
Crypto string `json:"crypto"`
}
type InfoMsg struct {
Name string
Version int
VersionString string `json:"version_string"`
Author string
DataChannels bool `json:"data_channels"`
EventHandlers bool `json:"event_handlers"`
IPv6 bool `json:"ipv6"`
LocalIP string `json:"local-ip"`
ICE_TCP bool `json:"ice-tcp"`
FullTrickle bool `json:"full-trickle"`
Transports map[string]janus.PluginInfo
Plugins map[string]janus.PluginInfo
Events map[string]janus.PluginInfo
Dependencies InfoDependencies
}
type TrickleMsg struct {

View file

@ -200,10 +200,18 @@ func IsValidStreamType(s string) bool {
}
}
type McuClientBandwidthInfo struct {
// Sent is the outgoing bandwidth in bytes per second.
Sent uint64
// Received is the incoming bandwidth in bytes per second.
Received uint64
}
type McuClient interface {
Id() string
Sid() string
StreamType() StreamType
// MaxBitrate is the maximum allowed bitrate in bits per second.
MaxBitrate() int
Close(ctx context.Context)
@ -211,6 +219,12 @@ type McuClient interface {
SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, api.StringMap))
}
type McuClientWithBandwidth interface {
McuClient
Bandwidth() *McuClientBandwidthInfo
}
type McuPublisher interface {
McuClient

View file

@ -41,8 +41,10 @@ import (
const (
pluginVideoRoom = "janus.plugin.videoroom"
eventWebsocket = "janus.eventhandler.wsevh"
keepaliveInterval = 30 * time.Second
bandwidthInterval = time.Second
videoPublisherUserId = 1
screenPublisherUserId = 2
@ -137,7 +139,12 @@ func getPluginStringValue(data janus.PluginData, pluginName string, key string)
// TODO(jojo): Lots of error handling still missing.
type clientInterface interface {
Handle() uint64
NotifyReconnected()
Bandwidth() *McuClientBandwidthInfo
UpdateBandwidth(media string, sent uint32, received uint32)
}
type mcuJanusSettings struct {
@ -220,9 +227,9 @@ type mcuJanus struct {
closeChan chan struct{}
muClients sync.Mutex
muClients sync.RWMutex
// +checklocks:muClients
clients map[clientInterface]bool
clients map[uint64]clientInterface
clientId atomic.Uint64
// +checklocks:mu
@ -253,7 +260,7 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc
url: url,
settings: settings,
closeChan: make(chan struct{}, 1),
clients: make(map[clientInterface]bool),
clients: make(map[uint64]clientInterface),
publishers: make(map[StreamId]*mcuJanusPublisher),
remotePublishers: make(map[StreamId]*mcuJanusRemotePublisher),
@ -304,6 +311,44 @@ func (m *mcuJanus) GetBandwidthLimits() (int, int) {
return int(m.settings.MaxStreamBitrate()), int(m.settings.MaxScreenBitrate())
}
func (m *mcuJanus) Bandwidth() (result *McuClientBandwidthInfo) {
m.muClients.RLock()
defer m.muClients.RUnlock()
for _, client := range m.clients {
if bandwidth := client.Bandwidth(); bandwidth != nil {
if result == nil {
result = &McuClientBandwidthInfo{}
}
result.Received += bandwidth.Received
result.Sent += bandwidth.Sent
}
}
return
}
func (m *mcuJanus) updateBandwidthStats() {
if info := m.info.Load(); info != nil {
if !info.EventHandlers {
// Event handlers are disabled, no stats will be available.
return
}
if _, found := info.Events[eventWebsocket]; !found {
// Event handler plugin not found, no stats will be available.
return
}
}
if bandwidth := m.Bandwidth(); bandwidth != nil {
statsJanusBandwidthCurrent.WithLabelValues("incoming").Set(float64(bandwidth.Received))
statsJanusBandwidthCurrent.WithLabelValues("outgoing").Set(float64(bandwidth.Sent))
} else {
statsJanusBandwidthCurrent.WithLabelValues("incoming").Set(0)
statsJanusBandwidthCurrent.WithLabelValues("outgoing").Set(0)
}
}
func (m *mcuJanus) reconnect(ctx context.Context) error {
m.disconnect()
gw, err := m.createJanusGateway(ctx, m.url, m)
@ -334,11 +379,27 @@ func (m *mcuJanus) doReconnect(ctx context.Context) {
m.reconnectInterval = initialReconnectInterval
m.mu.Unlock()
m.muClients.Lock()
for client := range m.clients {
go client.NotifyReconnected()
m.notifyClientsReconnected()
}
func (m *mcuJanus) notifyClientsReconnected() {
m.muClients.RLock()
defer m.muClients.RUnlock()
for oldHandle, client := range m.clients {
go func(oldHandle uint64, client clientInterface) {
client.NotifyReconnected()
newHandle := client.Handle()
if oldHandle != newHandle {
m.muClients.Lock()
defer m.muClients.Unlock()
delete(m.clients, oldHandle)
m.clients[newHandle] = client
}
}(oldHandle, client)
}
m.muClients.Unlock()
}
func (m *mcuJanus) scheduleReconnect(err error) {
@ -379,14 +440,25 @@ func (m *mcuJanus) Start(ctx context.Context) error {
}
log.Printf("Connected to %s %s by %s", info.Name, info.VersionString, info.Author)
plugin, found := info.Plugins[pluginVideoRoom]
if !found {
m.version = info.Version
if plugin, found := info.Plugins[pluginVideoRoom]; found {
log.Printf("Found %s %s by %s", plugin.Name, plugin.VersionString, plugin.Author)
} else {
return fmt.Errorf("plugin %s is not supported", pluginVideoRoom)
}
m.version = info.Version
if plugin, found := info.Events[eventWebsocket]; found {
if !info.EventHandlers {
log.Printf("Found %s %s by %s but event handlers are disabled, realtime usage will not be available", plugin.Name, plugin.VersionString, plugin.Author)
} else {
log.Printf("Found %s %s by %s", plugin.Name, plugin.VersionString, plugin.Author)
}
} else {
log.Printf("Plugin %s not found, realtime usage will not be available", eventWebsocket)
}
log.Printf("Found %s %s by %s", plugin.Name, plugin.VersionString, plugin.Author)
log.Printf("Used dependencies: %+v", info.Dependencies)
if !info.DataChannels {
return fmt.Errorf("data channels are not supported")
}
@ -421,25 +493,32 @@ func (m *mcuJanus) Start(ctx context.Context) error {
func (m *mcuJanus) registerClient(client clientInterface) {
m.muClients.Lock()
m.clients[client] = true
m.muClients.Unlock()
defer m.muClients.Unlock()
m.clients[client.Handle()] = client
}
func (m *mcuJanus) unregisterClient(client clientInterface) {
m.muClients.Lock()
delete(m.clients, client)
m.muClients.Unlock()
defer m.muClients.Unlock()
delete(m.clients, client.Handle())
}
func (m *mcuJanus) run() {
ticker := time.NewTicker(keepaliveInterval)
defer ticker.Stop()
bandwidthTicker := time.NewTicker(bandwidthInterval)
defer bandwidthTicker.Stop()
loop:
for {
select {
case <-ticker.C:
m.sendKeepalive(context.Background())
case <-bandwidthTicker.C:
m.updateBandwidthStats()
case <-m.closeChan:
break loop
}
@ -975,3 +1054,15 @@ func (m *mcuJanus) NewRemoteSubscriber(ctx context.Context, listener McuListener
statsSubscribersTotal.WithLabelValues(string(publisher.StreamType())).Inc()
return client, nil
}
func (m *mcuJanus) UpdateBandwidth(handle uint64, media string, sent uint32, received uint32) {
m.muClients.RLock()
defer m.muClients.RUnlock()
client, found := m.clients[handle]
if !found {
return
}
client.UpdateBandwidth(media, sent, received)
}

View file

@ -37,7 +37,7 @@ import (
type mcuJanusClient struct {
mcu *mcuJanus
listener McuListener
mu sync.Mutex // nolint
mu sync.Mutex
id uint64
session uint64
@ -46,6 +46,9 @@ type mcuJanusClient struct {
streamType StreamType
maxBitrate int
// +checklocks:mu
bandwidth map[string]*McuClientBandwidthInfo
handle atomic.Pointer[JanusHandle]
handleId atomic.Uint64
closeChan chan struct{}
@ -67,6 +70,10 @@ func (c *mcuJanusClient) Sid() string {
return c.sid
}
func (c *mcuJanusClient) Handle() uint64 {
return c.handleId.Load()
}
func (c *mcuJanusClient) StreamType() StreamType {
return c.streamType
}
@ -81,6 +88,40 @@ func (c *mcuJanusClient) Close(ctx context.Context) {
func (c *mcuJanusClient) SendMessage(ctx context.Context, message *MessageClientMessage, data *MessageClientMessageData, callback func(error, api.StringMap)) {
}
func (c *mcuJanusClient) UpdateBandwidth(media string, sent uint32, received uint32) {
c.mu.Lock()
defer c.mu.Unlock()
if c.bandwidth == nil {
c.bandwidth = make(map[string]*McuClientBandwidthInfo)
}
info, found := c.bandwidth[media]
if !found {
info = &McuClientBandwidthInfo{}
c.bandwidth[media] = info
}
info.Sent = uint64(sent)
info.Received = uint64(received)
}
func (c *mcuJanusClient) Bandwidth() *McuClientBandwidthInfo {
c.mu.Lock()
defer c.mu.Unlock()
if c.bandwidth == nil {
return nil
}
result := &McuClientBandwidthInfo{}
for _, info := range c.bandwidth {
result.Received += info.Received
result.Sent += info.Sent
}
return result
}
func (c *mcuJanusClient) closeClient(ctx context.Context) bool {
if handle := c.handle.Swap(nil); handle != nil {
close(c.closeChan)

742
mcu_janus_events_handler.go Normal file
View file

@ -0,0 +1,742 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
)
const (
JanusEventsSubprotocol = "janus-events"
JanusEventTypeSession = 1
JanusEventTypeHandle = 2
JanusEventTypeExternal = 4
JanusEventTypeJSEP = 8
JanusEventTypeWebRTC = 16
JanusEventSubTypeWebRTCICE = 1
JanusEventSubTypeWebRTCLocalCandidate = 2
JanusEventSubTypeWebRTCRemoteCandidate = 3
JanusEventSubTypeWebRTCSelectedPair = 4
JanusEventSubTypeWebRTCDTLS = 5
JanusEventSubTypeWebRTCPeerConnection = 6
JanusEventTypeMedia = 32
JanusEventSubTypeMediaState = 1
JanusEventSubTypeMediaSlowLink = 2
JanusEventSubTypeMediaStats = 3
JanusEventTypePlugin = 64
JanusEventTypeTransport = 128
JanusEventTypeCore = 256
JanusEventSubTypeCoreStatusStartup = 1
JanusEventSubTypeCoreStatusShutdown = 2
)
func unmarshalEvent[T any](data json.RawMessage) (*T, error) {
var e T
if err := json.Unmarshal(data, &e); err != nil {
return nil, err
}
return &e, nil
}
func marshalEvent[T any](e T) string {
data, err := json.Marshal(e)
if err != nil {
return fmt.Sprintf("Could not serialize %#v: %s", e, err)
}
return string(data)
}
type JanusEvent struct {
Emitter string `json:"emitter"`
Type int `json:"type"`
SubType int `json:"subtype,omitempty"`
Timestamp uint64 `json:"timestamp"`
SessionId uint64 `json:"session_id,omitempty"`
HandleId uint64 `json:"handle_id,omitempty"`
OpaqueId uint64 `json:"opaque_id,omitempty"`
Event json.RawMessage `json:"event"`
}
func (e JanusEvent) String() string {
return marshalEvent(e)
}
func (e JanusEvent) Decode() (any, error) {
switch e.Type {
case JanusEventTypeSession:
return unmarshalEvent[JanusEventSession](e.Event)
case JanusEventTypeHandle:
return unmarshalEvent[JanusEventHandle](e.Event)
case JanusEventTypeExternal:
return unmarshalEvent[JanusEventExternal](e.Event)
case JanusEventTypeJSEP:
return unmarshalEvent[JanusEventJSEP](e.Event)
case JanusEventTypeWebRTC:
switch e.SubType {
case JanusEventSubTypeWebRTCICE:
return unmarshalEvent[JanusEventWebRTCICE](e.Event)
case JanusEventSubTypeWebRTCLocalCandidate:
return unmarshalEvent[JanusEventWebRTCLocalCandidate](e.Event)
case JanusEventSubTypeWebRTCRemoteCandidate:
return unmarshalEvent[JanusEventWebRTCRemoteCandidate](e.Event)
case JanusEventSubTypeWebRTCSelectedPair:
return unmarshalEvent[JanusEventWebRTCSelectedPair](e.Event)
case JanusEventSubTypeWebRTCDTLS:
return unmarshalEvent[JanusEventWebRTCDTLS](e.Event)
case JanusEventSubTypeWebRTCPeerConnection:
return unmarshalEvent[JanusEventWebRTCPeerConnection](e.Event)
}
case JanusEventTypeMedia:
switch e.SubType {
case JanusEventSubTypeMediaState:
return unmarshalEvent[JanusEventMediaState](e.Event)
case JanusEventSubTypeMediaSlowLink:
return unmarshalEvent[JanusEventMediaSlowLink](e.Event)
case JanusEventSubTypeMediaStats:
return unmarshalEvent[JanusEventMediaStats](e.Event)
}
case JanusEventTypePlugin:
return unmarshalEvent[JanusEventPlugin](e.Event)
case JanusEventTypeTransport:
return unmarshalEvent[JanusEventTransport](e.Event)
case JanusEventTypeCore:
switch e.SubType {
case JanusEventSubTypeCoreStatusStartup:
event, err := unmarshalEvent[JanusEventCoreStartup](e.Event)
if err != nil {
return nil, err
}
switch event.Status {
case "started":
return unmarshalEvent[JanusEventStatusStartupInfo](event.Info)
case "update":
return unmarshalEvent[JanusEventStatusUpdateInfo](event.Info)
}
return event, nil
case JanusEventSubTypeCoreStatusShutdown:
return unmarshalEvent[JanusEventCoreShutdown](e.Event)
}
}
return nil, fmt.Errorf("unsupported event type %d", e.Type)
}
type JanusEventSessionTransport struct {
Transport string `json:"transport"`
ID string `json:"id"`
}
// type=1
type JanusEventSession struct {
Name string `json:"name"` // "created", "destroyed", "timeout"
Transport *JanusEventSessionTransport `json:"transport,omitempty"`
}
func (e JanusEventSession) String() string {
return marshalEvent(e)
}
// type=2
type JanusEventHandle struct {
Name string `json:"name"` // "attached", "detached"
Plugin string `json:"plugin"`
Token string `json:"token,omitempty"`
// Deprecated
OpaqueId string `json:"opaque_id,omitempty"`
}
func (e JanusEventHandle) String() string {
return marshalEvent(e)
}
// type=4
type JanusEventExternal struct {
Schema string `json:"schema"`
Data json.RawMessage `json:"data"`
}
func (e JanusEventExternal) String() string {
return marshalEvent(e)
}
// type=8
type JanusEventJSEP struct {
Owner string `json:"owner"`
Jsep struct {
Type string `json:"type"`
SDP string `json:"sdp"`
} `json:"jsep"`
}
func (e JanusEventJSEP) String() string {
return marshalEvent(e)
}
// type=16, subtype=1
type JanusEventWebRTCICE struct {
ICE string `json:"ice"` // "gathering", "connecting", "connected", "ready"
StreamID int `json:"stream_id"`
ComponentID int `json:"component_id"`
}
func (e JanusEventWebRTCICE) String() string {
return marshalEvent(e)
}
// type=16, subtype=2
type JanusEventWebRTCLocalCandidate struct {
LocalCandidate string `json:"local-candidate"`
StreamID int `json:"stream_id"`
ComponentID int `json:"component_id"`
}
func (e JanusEventWebRTCLocalCandidate) String() string {
return marshalEvent(e)
}
// type=16, subtype=3
type JanusEventWebRTCRemoteCandidate struct {
RemoteCandidate string `json:"remote-candidate"`
StreamID int `json:"stream_id"`
ComponentID int `json:"component_id"`
}
func (e JanusEventWebRTCRemoteCandidate) String() string {
return marshalEvent(e)
}
type JanusEventCandidate struct {
Address string `json:"address"`
Port int `json:"port"`
Type string `json:"type"`
Transport string `json:"transport"`
Family int `json:"family"`
}
func (e JanusEventCandidate) String() string {
return marshalEvent(e)
}
type JanusEventCandidates struct {
Local JanusEventCandidate `json:"local"`
Remote JanusEventCandidate `json:"remote"`
}
func (e JanusEventCandidates) String() string {
return marshalEvent(e)
}
// type=16, subtype=4
type JanusEventWebRTCSelectedPair struct {
StreamID int `json:"stream_id"`
ComponentID int `json:"component_id"`
SelectedPair string `json:"selected-pair"`
Candidates JanusEventCandidates `json:"candidates"`
}
func (e JanusEventWebRTCSelectedPair) String() string {
return marshalEvent(e)
}
// type=16, subtype=5
type JanusEventWebRTCDTLS struct {
DTLS string `json:"dtls"` // "trying", "connected"
StreamID int `json:"stream_id"`
ComponentID int `json:"component_id"`
Retransmissions int `json:"retransmissions"`
}
func (e JanusEventWebRTCDTLS) String() string {
return marshalEvent(e)
}
// type=16, subtype=6
type JanusEventWebRTCPeerConnection struct {
Connection string `json:"connection"` // "webrtcup"
}
func (e JanusEventWebRTCPeerConnection) String() string {
return marshalEvent(e)
}
// type=32, subtype=1
type JanusEventMediaState struct {
Media string `json:"media"` // "audio", "video"
MID string `json:"mid"`
SubStream *int `json:"substream,omitempty"`
Receiving bool `json:"receiving"`
Seconds int `json:"seconds"`
}
func (e JanusEventMediaState) String() string {
return marshalEvent(e)
}
// type=32, subtype=2
type JanusEventMediaSlowLink struct {
Media string `json:"media"` // "audio", "video"
MID string `json:"mid"`
SlowLink string `json:"slow_link"` // "uplink", "downlink"
LostLastSec int `json:"lost_lastsec"`
}
func (e JanusEventMediaSlowLink) String() string {
return marshalEvent(e)
}
type JanusMediaStatsRTTValues struct {
NTP uint32 `json:"ntp"`
LSR uint32 `json:"lsr"`
DLSR uint32 `json:"dlsr"`
}
func (e JanusMediaStatsRTTValues) String() string {
return marshalEvent(e)
}
// type=32, subtype=3
type JanusEventMediaStats struct {
MID string `json:"mid"`
MIndex int `json:"mindex"`
Media string `json:"media"` // "audio", "video", "video-sim1", "video-sim2"
// Audio / video only
Codec string `json:"codec,omitempty"`
Base uint32 `json:"base"`
Lost int32 `json:"lost"`
LostByRemote int32 `json:"lost-by-remote"`
JitterLocal uint32 `json:"jitter-local"`
JitterRemote uint32 `json:"jitter-remote"`
InLinkQuality uint32 `json:"in-link-quality"`
InMediaLinkQuality uint32 `json:"in-media-link-quality"`
OutLinkQuality uint32 `json:"out-link-quality"`
OutMediaLinkQuality uint32 `json:"out-media-link-quality"`
BytesReceivedLastSec uint32 `json:"bytes-received-lastsec"`
BytesSentLastSec uint32 `json:"bytes-sent-lastsec"`
NacksReceived uint32 `json:"nacks-received"`
NacksSent uint32 `json:"nacks-sent"`
RetransmissionsReceived uint32 `json:"retransmissions-received"`
// Only for audio / video on layer 0
RTT uint32 `json:"rtt,omitempty"`
// Only for audio / video on layer 0 if RTCP is active
RTTValues *JanusMediaStatsRTTValues `json:"rtt-values,omitempty"`
// For all media on all layers
PacketsReceived int32 `json:"packets-received"`
PacketsSent int32 `json:"packets-sent"`
BytesReceived int64 `json:"bytes-received"`
BytesSent int64 `json:"bytes-sent"`
// For layer 0 if REMB is enabled
REMBBitrate uint32 `json:"remb-bitrate"`
}
func (e JanusEventMediaStats) String() string {
return marshalEvent(e)
}
// type=64
type JanusEventPlugin struct {
Plugin string `json:"plugin"`
Data json.RawMessage `json:"data"`
}
func (e JanusEventPlugin) String() string {
return marshalEvent(e)
}
type JanusEventTransportWebsocket struct {
Event string `json:"event"`
AdminApi bool `json:"admin_api,omitempty"`
IP string `json:"ip,omitempty"`
}
// type=128
type JanusEventTransport struct {
Transport string `json:"transport"`
Id string `json:"id"`
Data JanusEventTransportWebsocket `json:"data"`
}
func (e JanusEventTransport) String() string {
return marshalEvent(e)
}
type JanusEventDependenciesInfo struct {
Glib2 string `json:"glib2"`
Jansson string `json:"jansson"`
Libnice string `json:"libnice"`
Libsrtp string `json:"libsrtp"`
Libcurl string `json:"libcurl,omitempty"`
Crypto string `json:"crypto"`
}
func (e JanusEventDependenciesInfo) String() string {
return marshalEvent(e)
}
type JanusEventPluginInfo struct {
Name string `json:"name"`
Author string `json:"author"`
Description string `json:"description"`
VersionString string `json:"version_string"`
Version int `json:"version"`
}
func (e JanusEventPluginInfo) String() string {
return marshalEvent(e)
}
// type=256, subtype=1, status="startup"
type JanusEventStatusStartupInfo struct {
Janus string `json:"janus"`
Version int `json:"version"`
VersionString string `json:"version_string"`
Author string `json:"author"`
CommitHash string `json:"commit-hash"`
CompileTime string `json:"compile-time"`
LogToStdout bool `json:"log-to-stdout"`
LogToFile bool `json:"log-to-file"`
LogPath string `json:"log-path,omitempty"`
DataChannels bool `json:"data_channels"`
AcceptingNewSessions bool `json:"accepting-new-sessions"`
SessionTimeout int `json:"session-timeout"`
ReclaimSessionTimeout int `json:"reclaim-session-timeout"`
CandidatesTimeout int `json:"candidates-timeout"`
ServerName string `json:"server-name"`
LocalIP string `json:"local-ip"`
PublicIP string `json:"public-ip,omitempty"`
PublicIPs []string `json:"public-ips,omitempty"`
IPv6 bool `json:"ipv6"`
IPv6LinkLocal bool `json:"ipv6-link-local,omitempty"`
ICELite bool `json:"ice-lite"`
ICETCP bool `json:"ice-tcp"`
ICENomination string `json:"ice-nomination,omitempty"`
ICEConsentFreshness bool `json:"ice-consent-freshness"`
ICEKeepaliveConncheck bool `json:"ice-keepalive-conncheck"`
HangupOnFailed bool `json:"hangup-on-failed"`
FullTrickle bool `json:"full-trickle"`
MDNSEnabled bool `json:"mdns-enabled"`
MinNACKQueue int `json:"min-nack-queue"`
NACKOptimizations bool `json:"nack-optimizations"`
TWCCPeriod int `json:"twcc-period"`
DSCP int `json:"dscp,omitempty"`
DTLSMCU int `json:"dtls-mcu"`
STUNServer string `json:"stun-server,omitempty"`
TURNServer string `json:"turn-server,omitempty"`
AllowForceRelay bool `json:"allow-force-relay,omitempty"`
StaticEventLoops int `json:"static-event-loops"`
LoopIndication bool `json:"loop-indication,omitempty"`
APISecret bool `json:"api_secret"`
AuthToken bool `json:"auth_token"`
EventHandlers bool `json:"event_handlers"`
OpaqueIdInAPI bool `json:"opaqueid_in_api"`
WebRTCEncryption bool `json:"webrtc_encryption"`
Dependencies *JanusEventDependenciesInfo `json:"dependencies,omitempty"`
Transports map[string]JanusEventPluginInfo `json:"transports,omitempty"`
Events map[string]JanusEventPluginInfo `json:"events,omitempty"`
Loggers map[string]JanusEventPluginInfo `json:"loggers,omitempty"`
Plugins map[string]JanusEventPluginInfo `json:"plugins,omitempty"`
}
func (e JanusEventStatusStartupInfo) String() string {
return marshalEvent(e)
}
// type=256, subtype=1, status="update"
type JanusEventStatusUpdateInfo struct {
Sessions int `json:"sessions"`
Handles int `json:"handles"`
PeerConnections int `json:"peerconnections"`
StatsPeriod int `json:"stats-period"`
}
func (e JanusEventStatusUpdateInfo) String() string {
return marshalEvent(e)
}
// type=256, subtype=1
type JanusEventCoreStartup struct {
Status string `json:"status"`
Info json.RawMessage `json:"info"`
}
func (e JanusEventCoreStartup) String() string {
return marshalEvent(e)
}
// type=256, subtype=2
type JanusEventCoreShutdown struct {
Status string `json:"status"`
Signum int `json:"signum"`
}
func (e JanusEventCoreShutdown) String() string {
return marshalEvent(e)
}
type McuEventHandler interface {
UpdateBandwidth(handle uint64, media string, sent uint32, received uint32)
}
type JanusEventsHandler struct {
mu sync.Mutex
ctx context.Context
mcu McuEventHandler
// +checklocks:mu
conn *websocket.Conn
addr string
agent string
events chan JanusEvent
}
func RunJanusEventsHandler(ctx context.Context, mcu Mcu, conn *websocket.Conn, addr string, agent string) {
deadline := time.Now().Add(time.Second)
if mcu == nil {
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "no mcu configured"), deadline) // nolint
return
}
m, ok := mcu.(McuEventHandler)
if !ok {
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "mcu does not support events"), deadline) // nolint
return
}
if !internal.IsLoopbackIP(addr) && !internal.IsPrivateIP(addr) {
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "only loopback and private connections allowed"), deadline) // nolint
return
}
client, err := NewJanusEventsHandler(ctx, m, conn, addr, agent)
if err != nil {
log.Printf("Could not create Janus events handler for %s: %s", addr, err)
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "error creating handler"), deadline) // nolint
return
}
client.Run()
}
func NewJanusEventsHandler(ctx context.Context, mcu McuEventHandler, conn *websocket.Conn, addr string, agent string) (*JanusEventsHandler, error) {
handler := &JanusEventsHandler{
ctx: ctx,
mcu: mcu,
conn: conn,
addr: addr,
agent: agent,
events: make(chan JanusEvent, 1),
}
return handler, nil
}
func (h *JanusEventsHandler) Run() {
log.Printf("Processing Janus events from %s", h.addr)
go h.writePump()
go h.processEvents()
h.readPump()
}
func (h *JanusEventsHandler) close() {
h.mu.Lock()
conn := h.conn
h.conn = nil
h.mu.Unlock()
if conn != nil {
if err := conn.Close(); err != nil {
log.Printf("Error closing %s", err)
}
}
}
func (h *JanusEventsHandler) readPump() {
h.mu.Lock()
conn := h.conn
h.mu.Unlock()
if conn == nil {
log.Printf("Connection from %s closed while starting readPump", h.addr)
return
}
conn.SetReadLimit(maxMessageSize)
conn.SetPongHandler(func(msg string) error {
now := time.Now()
conn.SetReadDeadline(now.Add(pongWait)) // nolint
return nil
})
for {
conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint
messageType, reader, err := conn.NextReader()
if err != nil {
// Gorilla websocket hides the original net.Error, so also compare error messages
if errors.Is(err, net.ErrClosed) || errors.Is(err, websocket.ErrCloseSent) || strings.Contains(err.Error(), net.ErrClosed.Error()) {
break
} else if _, ok := err.(*websocket.CloseError); !ok || websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
log.Printf("Error reading from %s: %v", h.addr, err)
}
break
}
if messageType != websocket.TextMessage {
log.Printf("Unsupported message type %v from %s", messageType, h.addr)
continue
}
decodeBuffer, err := bufferPool.ReadAll(reader)
if err != nil {
log.Printf("Error reading message from %s: %v", h.addr, err)
break
}
if decodeBuffer.Len() == 0 {
log.Printf("Received empty message from %s", h.addr)
bufferPool.Put(decodeBuffer)
break
}
var events []JanusEvent
if data := decodeBuffer.Bytes(); data[0] != '[' {
var event JanusEvent
if err := json.Unmarshal(data, &event); err != nil {
log.Printf("Error decoding message %s from %s: %v", decodeBuffer.String(), h.addr, err)
bufferPool.Put(decodeBuffer)
break
}
events = append(events, event)
} else {
if err := json.Unmarshal(data, &events); err != nil {
log.Printf("Error decoding message %s from %s: %v", decodeBuffer.String(), h.addr, err)
bufferPool.Put(decodeBuffer)
break
}
}
bufferPool.Put(decodeBuffer)
for _, e := range events {
h.events <- e
}
}
}
func (h *JanusEventsHandler) sendPing() bool {
h.mu.Lock()
defer h.mu.Unlock()
if h.conn == nil {
return false
}
now := time.Now().UnixNano()
msg := strconv.FormatInt(now, 10)
h.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
if err := h.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil {
log.Printf("Could not send ping to %s: %v", h.addr, err)
return false
}
return true
}
func (h *JanusEventsHandler) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
h.close()
}()
for {
select {
case <-ticker.C:
if !h.sendPing() {
return
}
case <-h.ctx.Done():
return
}
}
}
func (h *JanusEventsHandler) processEvents() {
for {
select {
case event := <-h.events:
h.processEvent(event)
case <-h.ctx.Done():
return
}
}
}
func (h *JanusEventsHandler) processEvent(event JanusEvent) {
evt, err := event.Decode()
if err != nil {
log.Printf("Error decoding event %s (%s)", event, err)
return
}
switch evt := evt.(type) {
case *JanusEventMediaStats:
h.mcu.UpdateBandwidth(event.HandleId, evt.Media, evt.BytesSentLastSec, evt.BytesReceivedLastSec)
}
}

View file

@ -0,0 +1,585 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type TestJanusEventsServerHandler struct {
t *testing.T
upgrader websocket.Upgrader
mcu Mcu
addr string
}
func (h *TestJanusEventsServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.t.Helper()
require := require.New(h.t)
conn, err := h.upgrader.Upgrade(w, r, nil)
require.NoError(err)
if conn.Subprotocol() == JanusEventsSubprotocol {
addr := h.addr
if addr == "" {
addr = r.RemoteAddr
}
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
RunJanusEventsHandler(r.Context(), h.mcu, conn, addr, r.Header.Get("User-Agent"))
return
}
deadline := time.Now().Add(time.Second)
require.NoError(conn.SetWriteDeadline(deadline))
require.NoError(conn.WriteJSON(map[string]string{"error": "invalid_subprotocol"}))
require.NoError(conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseProtocolError, "invalid_subprotocol"), deadline))
require.NoError(conn.Close())
}
func NewTestJanusEventsHandlerServer(t *testing.T) (*httptest.Server, string, *TestJanusEventsServerHandler) {
t.Helper()
handler := &TestJanusEventsServerHandler{
t: t,
upgrader: websocket.Upgrader{
Subprotocols: []string{
JanusEventsSubprotocol,
},
},
}
server := httptest.NewServer(handler)
t.Cleanup(func() {
server.Close()
})
url := strings.ReplaceAll(server.URL, "http://", "ws://")
url = strings.ReplaceAll(url, "https://", "wss://")
return server, url, handler
}
func TestJanusEventsHandlerNoMcu(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, _ := NewTestJanusEventsHandlerServer(t)
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var ce *websocket.CloseError
require.NoError(conn.SetReadDeadline(time.Now().Add(testTimeout)))
if mt, msg, err := conn.ReadMessage(); err == nil {
assert.Fail("connection was not closed", "expected close error, got message %s with type %d", string(msg), mt)
} else if assert.ErrorAs(err, &ce) {
assert.EqualValues(websocket.CloseInternalServerErr, ce.Code)
assert.Equal("no mcu configured", ce.Text)
}
}
func TestJanusEventsHandlerInvalidMcu(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
handler.mcu = &mcuProxy{}
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var ce *websocket.CloseError
require.NoError(conn.SetReadDeadline(time.Now().Add(testTimeout)))
if mt, msg, err := conn.ReadMessage(); err == nil {
assert.Fail("connection was not closed", "expected close error, got message %s with type %d", string(msg), mt)
} else if assert.ErrorAs(err, &ce) {
assert.EqualValues(websocket.CloseInternalServerErr, ce.Code)
assert.Equal("mcu does not support events", ce.Text)
}
}
func TestJanusEventsHandlerPublicIP(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
handler.mcu = &mcuJanus{}
handler.addr = "1.2.3.4"
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var ce *websocket.CloseError
require.NoError(conn.SetReadDeadline(time.Now().Add(testTimeout)))
if mt, msg, err := conn.ReadMessage(); err == nil {
assert.Fail("connection was not closed", "expected close error, got message %s with type %d", string(msg), mt)
} else if assert.ErrorAs(err, &ce) {
assert.EqualValues(websocket.ClosePolicyViolation, ce.Code)
assert.Equal("only loopback and private connections allowed", ce.Text)
}
}
type TestMcuWithEvents struct {
TestMCU
t *testing.T
mu sync.Mutex
// +checklocks:mu
idx int
}
func (m *TestMcuWithEvents) UpdateBandwidth(handle uint64, media string, sent uint32, received uint32) {
assert := assert.New(m.t)
m.mu.Lock()
defer m.mu.Unlock()
m.idx++
switch m.idx {
case 1:
assert.EqualValues(1, handle)
assert.EqualValues("audio", media)
assert.EqualValues(100, sent)
assert.EqualValues(200, received)
case 2:
assert.EqualValues(1, handle)
assert.EqualValues("video", media)
assert.EqualValues(200, sent)
assert.EqualValues(300, received)
default:
assert.Fail("too many updates", "received update %d (handle=%d, media=%s, sent=%d, received=%d)", m.idx, handle, media, sent, received)
}
}
func (m *TestMcuWithEvents) WaitForUpdates(ctx context.Context, waitForIdx int) error {
for {
if err := ctx.Err(); err != nil {
return err
}
m.mu.Lock()
idx := m.idx
m.mu.Unlock()
if idx == waitForIdx {
return nil
}
time.Sleep(time.Millisecond)
}
}
type janusEventSender struct {
events []JanusEvent
}
func (s *janusEventSender) SendSingle(t *testing.T, conn *websocket.Conn) {
t.Helper()
require := require.New(t)
require.Len(s.events, 1)
require.NoError(conn.WriteJSON(s.events[0]))
}
func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) {
t.Helper()
require := require.New(t)
require.NoError(conn.WriteJSON(s.events))
}
func (s *janusEventSender) AddEvent(t *testing.T, eventType int, eventSubtype int, handleId uint64, event any) {
t.Helper()
require := require.New(t)
assert := assert.New(t)
data, err := json.Marshal(event)
require.NoError(err)
if s, ok := event.(fmt.Stringer); assert.True(ok, "%T should implement fmt.Stringer", event) {
assert.Equal(s.String(), string(data))
}
message := JanusEvent{
Type: eventType,
SubType: eventSubtype,
HandleId: handleId,
Event: data,
}
s.events = append(s.events, message)
}
func TestJanusEventsHandlerDifferentTypes(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
mcu := &TestMcuWithEvents{
t: t,
}
handler.mcu = mcu
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var sender janusEventSender
sender.AddEvent(
t,
JanusEventTypeSession,
0,
1,
JanusEventSession{
Name: "created",
},
)
sender.AddEvent(
t,
JanusEventTypeHandle,
0,
1,
JanusEventHandle{
Name: "attached",
},
)
sender.AddEvent(
t,
JanusEventTypeExternal,
0,
0,
JanusEventExternal{
Schema: "test-external",
},
)
sender.AddEvent(
t,
JanusEventTypeJSEP,
0,
1,
JanusEventJSEP{
Owner: "testing",
},
)
sender.AddEvent(
t,
JanusEventTypeWebRTC,
JanusEventSubTypeWebRTCICE,
1,
JanusEventWebRTCICE{
ICE: "gathering",
},
)
sender.AddEvent(
t,
JanusEventTypeWebRTC,
JanusEventSubTypeWebRTCLocalCandidate,
1,
JanusEventWebRTCLocalCandidate{
LocalCandidate: "invalid-candidate",
},
)
sender.AddEvent(
t,
JanusEventTypeWebRTC,
JanusEventSubTypeWebRTCRemoteCandidate,
1,
JanusEventWebRTCRemoteCandidate{
RemoteCandidate: "invalid-candidate",
},
)
sender.AddEvent(
t,
JanusEventTypeWebRTC,
JanusEventSubTypeWebRTCSelectedPair,
1,
JanusEventWebRTCSelectedPair{
SelectedPair: "invalid-pair",
},
)
sender.AddEvent(
t,
JanusEventTypeWebRTC,
JanusEventSubTypeWebRTCDTLS,
1,
JanusEventWebRTCDTLS{
DTLS: "trying",
},
)
sender.AddEvent(
t,
JanusEventTypeWebRTC,
JanusEventSubTypeWebRTCPeerConnection,
1,
JanusEventWebRTCPeerConnection{
Connection: "webrtcup",
},
)
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaState,
1,
JanusEventMediaState{
Media: "audio",
},
)
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaSlowLink,
1,
JanusEventMediaSlowLink{
Media: "audio",
},
)
sender.AddEvent(
t,
JanusEventTypePlugin,
0,
1,
JanusEventPlugin{
Plugin: "test-plugin",
},
)
sender.AddEvent(
t,
JanusEventTypeTransport,
0,
1,
JanusEventTransport{
Transport: "test-transport",
},
)
sender.AddEvent(
t,
JanusEventTypeCore,
JanusEventSubTypeCoreStatusStartup,
0,
JanusEventCoreStartup{
Status: "started",
},
)
sender.AddEvent(
t,
JanusEventTypeCore,
JanusEventSubTypeCoreStatusStartup,
0,
JanusEventCoreStartup{
Status: "update",
},
)
sender.AddEvent(
t,
JanusEventTypeCore,
JanusEventSubTypeCoreStatusShutdown,
0,
JanusEventCoreShutdown{
Status: "shutdown",
},
)
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
},
)
sender.Send(t, conn)
// Wait until all events are processed.
assert.NoError(mcu.WaitForUpdates(ctx, 1))
}
func TestJanusEventsHandlerNotGrouped(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
mcu := &TestMcuWithEvents{
t: t,
}
handler.mcu = mcu
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var sender janusEventSender
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
},
)
sender.SendSingle(t, conn)
assert.NoError(mcu.WaitForUpdates(ctx, 1))
}
func TestJanusEventsHandlerGrouped(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
mcu := &TestMcuWithEvents{
t: t,
}
handler.mcu = mcu
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var sender janusEventSender
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
},
)
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "video",
BytesSentLastSec: 200,
BytesReceivedLastSec: 300,
},
)
sender.Send(t, conn)
assert.NoError(mcu.WaitForUpdates(ctx, 2))
}

View file

@ -122,6 +122,7 @@ func (g *TestJanusGateway) Info(ctx context.Context) (*InfoMsg, error) {
VersionString: "1.4.0",
Author: "struktur AG",
DataChannels: true,
EventHandlers: true,
FullTrickle: true,
Plugins: map[string]janus.PluginInfo{
pluginVideoRoom: {
@ -130,6 +131,13 @@ func (g *TestJanusGateway) Info(ctx context.Context) (*InfoMsg, error) {
Author: "struktur AG",
},
},
Events: map[string]janus.PluginInfo{
eventWebsocket: {
Name: "Test Websocket events",
VersionString: "0.0.0",
Author: "struktur AG",
},
},
}, nil
}
@ -1071,9 +1079,12 @@ func Test_JanusPublisherGetStreamsAudioVideo(t *testing.T) {
}
func Test_JanusPublisherSubscriber(t *testing.T) {
ResetStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"))
ResetStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"))
CatchLogForTest(t)
t.Parallel()
require := require.New(t)
assert := assert.New(t)
mcu, gateway := newMcuJanusForTesting(t)
gateway.registerHandlers(map[string]TestJanusHandler{})
@ -1081,6 +1092,12 @@ func Test_JanusPublisherSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
// Bandwidth for unknown handles is ignored.
mcu.UpdateBandwidth(1234, "video", 100, 200)
mcu.updateBandwidthStats()
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"), 0)
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"), 0)
pubId := PublicSessionId("publisher-id")
listener1 := &TestMcuListener{
id: pubId,
@ -1095,6 +1112,24 @@ func Test_JanusPublisherSubscriber(t *testing.T) {
require.NoError(err)
defer pub.Close(context.Background())
janusPub, ok := pub.(*mcuJanusPublisher)
require.True(ok)
assert.Nil(mcu.Bandwidth())
assert.Nil(janusPub.Bandwidth())
mcu.UpdateBandwidth(janusPub.Handle(), "video", 1000, 2000)
if bw := janusPub.Bandwidth(); assert.NotNil(bw) {
assert.EqualValues(1000, bw.Sent)
assert.EqualValues(2000, bw.Received)
}
if bw := mcu.Bandwidth(); assert.NotNil(bw) {
assert.EqualValues(1000, bw.Sent)
assert.EqualValues(2000, bw.Received)
}
mcu.updateBandwidthStats()
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"), 2000)
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"), 1000)
listener2 := &TestMcuListener{
id: pubId,
}
@ -1105,6 +1140,25 @@ func Test_JanusPublisherSubscriber(t *testing.T) {
sub, err := mcu.NewSubscriber(ctx, listener2, pubId, StreamTypeVideo, initiator2)
require.NoError(err)
defer sub.Close(context.Background())
janusSub, ok := sub.(*mcuJanusSubscriber)
require.True(ok)
assert.Nil(janusSub.Bandwidth())
mcu.UpdateBandwidth(janusSub.Handle(), "video", 3000, 4000)
if bw := janusSub.Bandwidth(); assert.NotNil(bw) {
assert.EqualValues(3000, bw.Sent)
assert.EqualValues(4000, bw.Received)
}
if bw := mcu.Bandwidth(); assert.NotNil(bw) {
assert.EqualValues(4000, bw.Sent)
assert.EqualValues(6000, bw.Received)
}
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"), 2000)
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"), 1000)
mcu.updateBandwidthStats()
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("incoming"), 6000)
checkStatsValue(t, statsJanusBandwidthCurrent.WithLabelValues("outgoing"), 4000)
}
func Test_JanusSubscriberPublisher(t *testing.T) {

View file

@ -345,7 +345,7 @@ type mcuProxyConnection struct {
ip net.IP
connectToken string
load atomic.Int64
load atomic.Uint64
bandwidth atomic.Pointer[EventProxyServerBandwidth]
mu sync.Mutex
closer *Closer
@ -413,6 +413,11 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP, token str
conn.country.Store("")
conn.version.Store("")
conn.features.Store([]string{})
statsProxyBackendLoadCurrent.WithLabelValues(conn.url.String()).Set(0)
statsProxyUsageCurrent.WithLabelValues(conn.url.String(), "incoming").Set(0)
statsProxyUsageCurrent.WithLabelValues(conn.url.String(), "outgoing").Set(0)
statsProxyBandwidthCurrent.WithLabelValues(conn.url.String(), "incoming").Set(0)
statsProxyBandwidthCurrent.WithLabelValues(conn.url.String(), "outgoing").Set(0)
return conn, nil
}
@ -478,7 +483,7 @@ type mcuProxyConnectionStats struct {
Connected bool `json:"connected"`
Publishers int64 `json:"publishers"`
Clients int64 `json:"clients"`
Load *int64 `json:"load,omitempty"`
Load *uint64 `json:"load,omitempty"`
Shutdown *bool `json:"shutdown,omitempty"`
Temporary *bool `json:"temporary,omitempty"`
Uptime *time.Time `json:"uptime,omitempty"`
@ -514,7 +519,7 @@ func (c *mcuProxyConnection) GetStats() *mcuProxyConnectionStats {
return result
}
func (c *mcuProxyConnection) Load() int64 {
func (c *mcuProxyConnection) Load() uint64 {
return c.load.Load()
}
@ -748,6 +753,12 @@ func (c *mcuProxyConnection) closeIfEmpty() bool {
log.Printf("All clients disconnected, closing connection to %s", c)
c.stop(ctx)
statsProxyBackendLoadCurrent.DeleteLabelValues(c.url.String())
statsProxyUsageCurrent.DeleteLabelValues(c.url.String(), "incoming")
statsProxyUsageCurrent.DeleteLabelValues(c.url.String(), "outgoing")
statsProxyBandwidthCurrent.DeleteLabelValues(c.url.String(), "incoming")
statsProxyBandwidthCurrent.DeleteLabelValues(c.url.String(), "outgoing")
c.proxy.removeConnection(c)
}()
return true
@ -1082,6 +1093,20 @@ func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) {
c.load.Store(event.Load)
c.bandwidth.Store(event.Bandwidth)
statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load))
if bw := event.Bandwidth; bw != nil {
statsProxyBandwidthCurrent.WithLabelValues(c.url.String(), "incoming").Set(float64(bw.Received))
statsProxyBandwidthCurrent.WithLabelValues(c.url.String(), "outgoing").Set(float64(bw.Sent))
if bw.Incoming != nil {
statsProxyUsageCurrent.WithLabelValues(c.url.String(), "incoming").Set(*bw.Incoming)
} else {
statsProxyUsageCurrent.WithLabelValues(c.url.String(), "incoming").Set(0)
}
if bw.Outgoing != nil {
statsProxyUsageCurrent.WithLabelValues(c.url.String(), "outgoing").Set(*bw.Outgoing)
} else {
statsProxyUsageCurrent.WithLabelValues(c.url.String(), "outgoing").Set(0)
}
}
return
case "shutdown-scheduled":
log.Printf("Proxy %s is scheduled to shutdown", c)

View file

@ -551,7 +551,7 @@ type TestProxyServerHandler struct {
country string
mu sync.Mutex
load atomic.Int64
load atomic.Uint64
incoming atomic.Pointer[float64]
outgoing atomic.Pointer[float64]
// +checklocks:mu
@ -666,7 +666,7 @@ func (h *TestProxyServerHandler) Clear(incoming bool, outgoing bool) {
}
}
func (h *TestProxyServerHandler) getLoadMessage(load int64) *ProxyServerMessage {
func (h *TestProxyServerHandler) getLoadMessage(load uint64) *ProxyServerMessage {
msg := &ProxyServerMessage{
Type: "event",
Event: &EventProxyServerMessage{
@ -691,7 +691,12 @@ func (h *TestProxyServerHandler) updateLoad(delta int64) {
return
}
load := h.load.Add(delta)
var load uint64
if delta > 0 {
load = h.load.Add(uint64(delta))
} else {
load = h.load.Add(^uint64(delta - 1))
}
h.mu.Lock()
defer h.mu.Unlock()

View file

@ -86,6 +86,17 @@ var (
statsMcuPublisherStreamTypesCurrent,
}
statsJanusBandwidthCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "bandwidth",
Help: "The current bandwidth in bytes per second",
}, []string{"direction"})
janusMcuStats = []prometheus.Collector{
statsJanusBandwidthCurrent,
}
statsConnectedProxyBackendsCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
@ -98,6 +109,18 @@ var (
Name: "backend_load",
Help: "Current load of signaling proxy backends",
}, []string{"url"})
statsProxyUsageCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "backend_usage",
Help: "The current usage of signaling proxy backends in percent",
}, []string{"url", "direction"})
statsProxyBandwidthCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "mcu",
Name: "backend_bandwidth",
Help: "The current bandwidth of signaling proxy backends in bytes per second",
}, []string{"url", "direction"})
statsProxyNobackendAvailableTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "mcu",
@ -108,16 +131,20 @@ var (
proxyMcuStats = []prometheus.Collector{
statsConnectedProxyBackendsCurrent,
statsProxyBackendLoadCurrent,
statsProxyUsageCurrent,
statsProxyBandwidthCurrent,
statsProxyNobackendAvailableTotal,
}
)
func RegisterJanusMcuStats() {
registerAll(commonMcuStats...)
registerAll(janusMcuStats...)
}
func UnregisterJanusMcuStats() {
unregisterAll(commonMcuStats...)
unregisterAll(janusMcuStats...)
}
func RegisterProxyMcuStats() {

View file

@ -115,12 +115,12 @@ type ProxyServer struct {
url string
mcu signaling.Mcu
stopped atomic.Bool
load atomic.Int64
load atomic.Uint64
maxIncoming atomic.Int64
currentIncoming atomic.Int64
maxOutgoing atomic.Int64
currentOutgoing atomic.Int64
maxIncoming atomic.Uint64
currentIncoming atomic.Uint64
maxOutgoing atomic.Uint64
currentOutgoing atomic.Uint64
shutdownChannel chan struct{}
shutdownScheduled atomic.Bool
@ -356,6 +356,9 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*
upgrader: websocket.Upgrader{
ReadBufferSize: websocketReadBufferSize,
WriteBufferSize: websocketWriteBufferSize,
Subprotocols: []string{
signaling.JanusEventsSubprotocol,
},
},
tokens: tokens,
@ -374,12 +377,14 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*
remotePublishers: make(map[string]map[*proxyRemotePublisher]bool),
}
result.maxIncoming.Store(int64(maxIncoming) * 1024 * 1024)
result.maxOutgoing.Store(int64(maxOutgoing) * 1024 * 1024)
result.maxIncoming.Store(uint64(maxIncoming) * 1024 * 1024)
result.maxOutgoing.Store(uint64(maxOutgoing) * 1024 * 1024)
result.statsAllowedIps.Store(statsAllowedIps)
result.trustedProxies.Store(trustedProxiesIps)
result.upgrader.CheckOrigin = result.checkOrigin
statsLoadCurrent.Set(0)
if debug, _ := config.GetBool("app", "debug"); debug {
log.Println("Installing debug handlers in \"/debug/pprof\"")
s := r.PathPrefix("/debug/pprof").Subrouter()
@ -488,7 +493,7 @@ loop:
}
}
func (s *ProxyServer) newLoadEvent(load int64, incoming int64, outgoing int64) *signaling.ProxyServerMessage {
func (s *ProxyServer) newLoadEvent(load uint64, incoming uint64, outgoing uint64) *signaling.ProxyServerMessage {
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
@ -498,8 +503,12 @@ func (s *ProxyServer) newLoadEvent(load int64, incoming int64, outgoing int64) *
}
maxIncoming := s.maxIncoming.Load()
maxOutgoing := s.maxOutgoing.Load()
if maxIncoming > 0 || maxOutgoing > 0 {
msg.Event.Bandwidth = &signaling.EventProxyServerBandwidth{}
if maxIncoming > 0 || maxOutgoing > 0 || incoming != 0 || outgoing != 0 {
// Values should be sent in bytes per second.
msg.Event.Bandwidth = &signaling.EventProxyServerBandwidth{
Received: incoming / 8,
Sent: outgoing / 8,
}
if maxIncoming > 0 {
value := float64(incoming) / float64(maxIncoming) * 100
msg.Event.Bandwidth.Incoming = &value
@ -521,10 +530,11 @@ func (s *ProxyServer) updateLoad() {
return
}
statsLoadCurrent.Set(float64(load))
s.sendLoadToAll(load, incoming, outgoing)
}
func (s *ProxyServer) sendLoadToAll(load int64, incoming int64, outgoing int64) {
func (s *ProxyServer) sendLoadToAll(load uint64, incoming uint64, outgoing uint64) {
if s.shutdownScheduled.Load() {
// Server is scheduled to shutdown, no need to update clients with current load.
return
@ -628,9 +638,9 @@ func (s *ProxyServer) Reload(config *goconf.ConfigFile) {
}
maxIncoming, maxOutgoing := getTargetBandwidths(config)
oldIncoming := s.maxIncoming.Swap(int64(maxIncoming))
oldOutgoing := s.maxOutgoing.Swap(int64(maxOutgoing))
if oldIncoming != int64(maxIncoming) || oldOutgoing != int64(maxOutgoing) {
oldIncoming := s.maxIncoming.Swap(uint64(maxIncoming))
oldOutgoing := s.maxOutgoing.Swap(uint64(maxOutgoing))
if oldIncoming != uint64(maxIncoming) || oldOutgoing != uint64(maxOutgoing) {
// Notify sessions about updated load / bandwidth usage.
go s.sendLoadToAll(s.load.Load(), s.currentIncoming.Load(), s.currentOutgoing.Load())
}
@ -664,6 +674,12 @@ func (s *ProxyServer) proxyHandler(w http.ResponseWriter, r *http.Request) {
return
}
if conn.Subprotocol() == signaling.JanusEventsSubprotocol {
agent := r.Header.Get("User-Agent")
signaling.RunJanusEventsHandler(r.Context(), s.mcu, conn, addr, agent)
return
}
client, err := NewProxyClient(r.Context(), s, conn, addr)
if err != nil {
log.Printf("Could not create client for %s: %s", addr, err)
@ -1562,20 +1578,29 @@ func (s *ProxyServer) HasClients() bool {
return len(s.clients) > 0
}
func (s *ProxyServer) GetClientsLoad() (load int64, incoming int64, outgoing int64) {
func (s *ProxyServer) GetClientsLoad() (load uint64, incoming uint64, outgoing uint64) {
s.clientsLock.RLock()
defer s.clientsLock.RUnlock()
for _, c := range s.clients {
bitrate := int64(c.MaxBitrate())
load += bitrate
// Use "current" bandwidth usage if supported.
if bw, ok := c.(signaling.McuClientWithBandwidth); ok {
if bandwidth := bw.Bandwidth(); bandwidth != nil {
incoming += bandwidth.Received * 8
outgoing += bandwidth.Sent * 8
continue
}
}
bitrate := uint64(c.MaxBitrate())
if _, ok := c.(signaling.McuPublisher); ok {
incoming += bitrate
} else if _, ok := c.(signaling.McuSubscriber); ok {
outgoing += bitrate
}
}
load = load / 1024
load = incoming + outgoing
load = min(uint64(len(s.clients)), load/1024)
return
}

View file

@ -446,6 +446,105 @@ func (p *TestMCUPublisher) UnpublishRemote(ctx context.Context, remoteId signali
return errors.New("not implemented")
}
type PublisherTestMCU struct {
TestMCU
}
type TestPublisherWithBandwidth struct {
TestMCUPublisher
bandwidth *signaling.McuClientBandwidthInfo
}
func (p *TestPublisherWithBandwidth) Bandwidth() *signaling.McuClientBandwidthInfo {
return p.bandwidth
}
func (m *PublisherTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id signaling.PublicSessionId, sid string, streamType signaling.StreamType, settings signaling.NewPublisherSettings, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
publisher := &TestPublisherWithBandwidth{
TestMCUPublisher: TestMCUPublisher{
id: id,
sid: sid,
streamType: streamType,
},
bandwidth: &signaling.McuClientBandwidthInfo{
Sent: 1000,
Received: 2000,
},
}
return publisher, nil
}
func NewPublisherTestMCU(t *testing.T) *PublisherTestMCU {
return &PublisherTestMCU{
TestMCU: TestMCU{
t: t,
},
}
}
func TestProxyPublisherBandwidth(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)
// Values are in bits per second.
proxy.maxIncoming.Store(10000 * 8)
proxy.maxOutgoing.Store(10000 * 8)
mcu := NewPublisherTestMCU(t)
proxy.mcu = mcu
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()
require.NoError(client.SendHello(key))
if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
StreamType: signaling.StreamTypeVideo,
},
}))
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal("2345", message.Id)
if err := checkMessageType(message, "command"); assert.NoError(err) {
assert.NotEmpty(message.Command.Id)
}
}
proxy.updateLoad()
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
if err := checkMessageType(message, "event"); assert.NoError(err) && assert.Equal("update-load", message.Event.Type) {
assert.EqualValues(1, message.Event.Load)
if bw := message.Event.Bandwidth; assert.NotNil(bw) {
if assert.NotNil(bw.Incoming) {
assert.EqualValues(20, *bw.Incoming)
}
if assert.NotNil(bw.Outgoing) {
assert.EqualValues(10, *bw.Outgoing)
}
}
}
}
}
type HangingTestMCU struct {
TestMCU
ctx context.Context

View file

@ -86,6 +86,12 @@ var (
Name: "token_errors_total",
Help: "The total number of token errors",
}, []string{"reason"})
statsLoadCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "load",
Help: "The current load of the signaling proxy",
})
)
func init() {
@ -99,4 +105,5 @@ func init() {
prometheus.MustRegister(statsCommandMessagesTotal)
prometheus.MustRegister(statsPayloadMessagesTotal)
prometheus.MustRegister(statsTokenErrorsTotal)
prometheus.MustRegister(statsLoadCurrent)
}

View file

@ -226,7 +226,7 @@ func (c *ProxyTestClient) RunUntilHello(ctx context.Context) (message *signaling
return message, nil
}
func (c *ProxyTestClient) RunUntilLoad(ctx context.Context, load int64) (message *signaling.ProxyServerMessage, err error) {
func (c *ProxyTestClient) RunUntilLoad(ctx context.Context, load uint64) (message *signaling.ProxyServerMessage, err error) {
if message, err = c.RunUntilMessage(ctx); err != nil {
return nil, err
}