Add tests for Janus events handler.

This commit is contained in:
Joachim Bauch 2025-11-05 10:29:48 +01:00
commit 9b79aac1cf
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
4 changed files with 418 additions and 29 deletions

13
hub.go
View file

@ -3098,18 +3098,7 @@ func (h *Hub) serveWs(w http.ResponseWriter, r *http.Request) {
}
if conn.Subprotocol() == JanusEventsSubprotocol {
if h.mcu == nil {
log.Printf("Could not create Janus events handler for %s: no MCU configured", addr)
return
}
client, err := NewJanusEventsHandler(r.Context(), h.mcu, conn, addr, agent)
if err != nil {
log.Printf("Could not create Janus events handler for %s: %s", addr, err)
return
}
client.Run()
RunJanusEventsHandler(r.Context(), h.mcu, conn, addr, agent)
return
}

View file

@ -523,11 +523,15 @@ func (e JanusEventCoreShutdown) String() string {
return marshalEvent(e)
}
type McuEventHandler interface {
UpdateBandwidth(handle uint64, media string, sent uint32, received uint32)
}
type JanusEventsHandler struct {
mu sync.Mutex
ctx context.Context
mcu *mcuJanus
mcu McuEventHandler
// +checklocks:mu
conn *websocket.Conn
addr string
@ -536,19 +540,38 @@ type JanusEventsHandler struct {
events chan JanusEvent
}
func NewJanusEventsHandler(ctx context.Context, mcu Mcu, conn *websocket.Conn, addr string, agent string) (*JanusEventsHandler, error) {
if !internal.IsLoopbackIP(addr) && !internal.IsPrivateIP(addr) {
return nil, errors.New("only loopback and private connections allowed")
func RunJanusEventsHandler(ctx context.Context, mcu Mcu, conn *websocket.Conn, addr string, agent string) {
deadline := time.Now().Add(time.Second)
if mcu == nil {
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "no mcu configured"), deadline) // nolint
return
}
m, ok := mcu.(*mcuJanus)
m, ok := mcu.(McuEventHandler)
if !ok {
return nil, errors.New("need a Janus MCU")
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "mcu does not support events"), deadline) // nolint
return
}
if !internal.IsLoopbackIP(addr) && !internal.IsPrivateIP(addr) {
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "only loopback and private connections allowed"), deadline) // nolint
return
}
client, err := NewJanusEventsHandler(ctx, m, conn, addr, agent)
if err != nil {
log.Printf("Could not create Janus events handler for %s: %s", addr, err)
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "error creating handler"), deadline) // nolint
return
}
client.Run()
}
func NewJanusEventsHandler(ctx context.Context, mcu McuEventHandler, conn *websocket.Conn, addr string, agent string) (*JanusEventsHandler, error) {
handler := &JanusEventsHandler{
ctx: ctx,
mcu: m,
mcu: mcu,
conn: conn,
addr: addr,
agent: agent,
@ -623,13 +646,30 @@ func (h *JanusEventsHandler) readPump() {
break
}
var events []JanusEvent
if err := json.Unmarshal(decodeBuffer.Bytes(), &events); err != nil {
log.Printf("Error decoding message %s from %s: %v", decodeBuffer.String(), h.addr, err)
if decodeBuffer.Len() == 0 {
log.Printf("Received empty message from %s", h.addr)
bufferPool.Put(decodeBuffer)
break
}
var events []JanusEvent
if data := decodeBuffer.Bytes(); data[0] != '[' {
var event JanusEvent
if err := json.Unmarshal(data, &event); err != nil {
log.Printf("Error decoding message %s from %s: %v", decodeBuffer.String(), h.addr, err)
bufferPool.Put(decodeBuffer)
break
}
events = append(events, event)
} else {
if err := json.Unmarshal(data, &events); err != nil {
log.Printf("Error decoding message %s from %s: %v", decodeBuffer.String(), h.addr, err)
bufferPool.Put(decodeBuffer)
break
}
}
bufferPool.Put(decodeBuffer)
for _, e := range events {
h.events <- e

View file

@ -0,0 +1,366 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"context"
"encoding/json"
"net"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type TestJanusEventsServerHandler struct {
t *testing.T
upgrader websocket.Upgrader
mcu Mcu
addr string
}
func (h *TestJanusEventsServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.t.Helper()
require := require.New(h.t)
conn, err := h.upgrader.Upgrade(w, r, nil)
require.NoError(err)
if conn.Subprotocol() == JanusEventsSubprotocol {
addr := h.addr
if addr == "" {
addr = r.RemoteAddr
}
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
RunJanusEventsHandler(r.Context(), h.mcu, conn, addr, r.Header.Get("User-Agent"))
return
}
deadline := time.Now().Add(time.Second)
require.NoError(conn.SetWriteDeadline(deadline))
require.NoError(conn.WriteJSON(map[string]string{"error": "invalid_subprotocol"}))
require.NoError(conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseProtocolError, "invalid_subprotocol"), deadline))
require.NoError(conn.Close())
}
func NewTestJanusEventsHandlerServer(t *testing.T) (*httptest.Server, string, *TestJanusEventsServerHandler) {
t.Helper()
handler := &TestJanusEventsServerHandler{
t: t,
upgrader: websocket.Upgrader{
Subprotocols: []string{
JanusEventsSubprotocol,
},
},
}
server := httptest.NewServer(handler)
t.Cleanup(func() {
server.Close()
})
url := strings.ReplaceAll(server.URL, "http://", "ws://")
url = strings.ReplaceAll(url, "https://", "wss://")
return server, url, handler
}
func TestJanusEventsHandlerNoMcu(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, _ := NewTestJanusEventsHandlerServer(t)
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var ce *websocket.CloseError
require.NoError(conn.SetReadDeadline(time.Now().Add(testTimeout)))
if mt, msg, err := conn.ReadMessage(); err == nil {
assert.Fail("connection was not closed", "expected close error, got message %s with type %d", string(msg), mt)
} else if assert.ErrorAs(err, &ce) {
assert.EqualValues(websocket.CloseInternalServerErr, ce.Code)
assert.Equal("no mcu configured", ce.Text)
}
}
func TestJanusEventsHandlerInvalidMcu(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
handler.mcu = &mcuProxy{}
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var ce *websocket.CloseError
require.NoError(conn.SetReadDeadline(time.Now().Add(testTimeout)))
if mt, msg, err := conn.ReadMessage(); err == nil {
assert.Fail("connection was not closed", "expected close error, got message %s with type %d", string(msg), mt)
} else if assert.ErrorAs(err, &ce) {
assert.EqualValues(websocket.CloseInternalServerErr, ce.Code)
assert.Equal("mcu does not support events", ce.Text)
}
}
func TestJanusEventsHandlerPublicIP(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
handler.mcu = &mcuJanus{}
handler.addr = "1.2.3.4"
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var ce *websocket.CloseError
require.NoError(conn.SetReadDeadline(time.Now().Add(testTimeout)))
if mt, msg, err := conn.ReadMessage(); err == nil {
assert.Fail("connection was not closed", "expected close error, got message %s with type %d", string(msg), mt)
} else if assert.ErrorAs(err, &ce) {
assert.EqualValues(websocket.ClosePolicyViolation, ce.Code)
assert.Equal("only loopback and private connections allowed", ce.Text)
}
}
type TestMcuWithEvents struct {
TestMCU
t *testing.T
mu sync.Mutex
// +checklocks:mu
idx int
}
func (m *TestMcuWithEvents) UpdateBandwidth(handle uint64, media string, sent uint32, received uint32) {
assert := assert.New(m.t)
m.mu.Lock()
defer m.mu.Unlock()
m.idx++
switch m.idx {
case 1:
assert.EqualValues(1, handle)
assert.EqualValues("audio", media)
assert.EqualValues(100, sent)
assert.EqualValues(200, received)
case 2:
assert.EqualValues(1, handle)
assert.EqualValues("video", media)
assert.EqualValues(200, sent)
assert.EqualValues(300, received)
default:
assert.Fail("too many updates", "received update %d (handle=%d, media=%s, sent=%d, received=%d)", m.idx, handle, media, sent, received)
}
}
func (m *TestMcuWithEvents) WaitForUpdates(ctx context.Context, waitForIdx int) error {
for {
if err := ctx.Err(); err != nil {
return err
}
m.mu.Lock()
idx := m.idx
m.mu.Unlock()
if idx == waitForIdx {
return nil
}
time.Sleep(time.Millisecond)
}
}
type janusEventSender struct {
events []JanusEvent
}
func (s *janusEventSender) SendSingle(t *testing.T, conn *websocket.Conn) {
t.Helper()
require := require.New(t)
require.Len(s.events, 1)
require.NoError(conn.WriteJSON(s.events[0]))
}
func (s *janusEventSender) Send(t *testing.T, conn *websocket.Conn) {
t.Helper()
require := require.New(t)
require.NoError(conn.WriteJSON(s.events))
}
func (s *janusEventSender) AddEvent(t *testing.T, eventType int, eventSubtype int, handleId uint64, event any) {
t.Helper()
require := require.New(t)
data, err := json.Marshal(event)
require.NoError(err)
message := JanusEvent{
Type: eventType,
SubType: eventSubtype,
HandleId: handleId,
Event: data,
}
s.events = append(s.events, message)
}
func TestJanusEventsHandlerNotGrouped(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
mcu := &TestMcuWithEvents{
t: t,
}
handler.mcu = mcu
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var sender janusEventSender
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
},
)
sender.SendSingle(t, conn)
assert.NoError(mcu.WaitForUpdates(ctx, 1))
}
func TestJanusEventsHandlerGrouped(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
_, url, handler := NewTestJanusEventsHandlerServer(t)
mcu := &TestMcuWithEvents{
t: t,
}
handler.mcu = mcu
ctx, cancel := context.WithTimeout(t.Context(), testTimeout)
defer cancel()
dialer := websocket.Dialer{
Subprotocols: []string{
JanusEventsSubprotocol,
},
}
conn, response, err := dialer.DialContext(ctx, url, nil)
require.NoError(err)
assert.Equal(JanusEventsSubprotocol, response.Header.Get("Sec-WebSocket-Protocol"))
var sender janusEventSender
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "audio",
BytesSentLastSec: 100,
BytesReceivedLastSec: 200,
},
)
sender.AddEvent(
t,
JanusEventTypeMedia,
JanusEventSubTypeMediaStats,
1,
JanusEventMediaStats{
Media: "video",
BytesSentLastSec: 200,
BytesReceivedLastSec: 300,
},
)
sender.Send(t, conn)
assert.NoError(mcu.WaitForUpdates(ctx, 2))
}

View file

@ -672,13 +672,7 @@ func (s *ProxyServer) proxyHandler(w http.ResponseWriter, r *http.Request) {
if conn.Subprotocol() == signaling.JanusEventsSubprotocol {
agent := r.Header.Get("User-Agent")
client, err := signaling.NewJanusEventsHandler(r.Context(), s.mcu, conn, addr, agent)
if err != nil {
log.Printf("Could not create Janus events handler for %s: %s", addr, err)
return
}
client.Run()
signaling.RunJanusEventsHandler(r.Context(), s.mcu, conn, addr, agent)
return
}