Move code for commandline applications to "cmd" folder.

This commit is contained in:
Joachim Bauch 2025-12-18 15:03:32 +01:00
commit 9bbc0588e3
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
19 changed files with 23 additions and 23 deletions

650
cmd/client/main.go Normal file
View file

@ -0,0 +1,650 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2017 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"log"
pseudorand "math/rand"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"sync"
"sync/atomic"
"time"
"github.com/dlintw/goconf"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/mailru/easyjson"
"github.com/mailru/easyjson/jlexer"
"github.com/mailru/easyjson/jwriter"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/config"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/talk"
)
var (
version = "unreleased"
showVersion = flag.Bool("version", false, "show version and quit")
addr = flag.String("addr", "localhost:28080", "http service address")
configFlag = flag.String("config", "server.conf", "config file to use")
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write memory profile to file")
maxClients = flag.Int("maxClients", 100, "number of client connections")
backendSecret []byte
// Report messages that took more than 1 second.
messageReportDuration = 1000 * time.Millisecond
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 64 * 1024
)
type MessagePayload struct {
Now time.Time `json:"now"`
}
func (m *MessagePayload) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
w.RawByte('{')
w.RawString("\"now\":")
w.Raw(m.Now.MarshalJSON())
w.RawByte('}')
return w.Buffer.BuildBytes(), w.Error
}
func (m *MessagePayload) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
r.Delim('{')
for !r.IsDelim('}') {
key := r.UnsafeFieldName(false)
r.WantColon()
switch key {
case "now":
if r.IsNull() {
r.Skip()
} else {
if data := r.Raw(); r.Ok() {
r.AddError((m.Now).UnmarshalJSON(data))
}
}
default:
r.SkipRecursive()
}
r.WantComma()
}
r.Delim('}')
r.Consumed()
return r.Error()
}
type SignalingClient struct {
readyWg *sync.WaitGroup // +checklocksignore: Only written to from constructor.
conn *websocket.Conn
stats *Stats
closed atomic.Bool
stopChan chan struct{}
lock sync.Mutex
// +checklocks:lock
privateSessionId api.PrivateSessionId
// +checklocks:lock
publicSessionId api.PublicSessionId
// +checklocks:lock
userId string
}
func NewSignalingClient(url string, stats *Stats, readyWg *sync.WaitGroup, doneWg *sync.WaitGroup) (*SignalingClient, error) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
client := &SignalingClient{
readyWg: readyWg,
conn: conn,
stats: stats,
stopChan: make(chan struct{}),
}
doneWg.Add(2)
go func() {
defer doneWg.Done()
client.readPump()
}()
go func() {
defer doneWg.Done()
client.writePump()
}()
return client, nil
}
func (c *SignalingClient) Close() {
if !c.closed.CompareAndSwap(false, true) {
return
}
// Signal writepump to terminate
close(c.stopChan)
c.lock.Lock()
c.publicSessionId = ""
c.privateSessionId = ""
c.writeInternal(&api.ClientMessage{
Type: "bye",
Bye: &api.ByeClientMessage{},
})
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) // nolint
c.conn.Close()
c.conn = nil
c.lock.Unlock()
}
func (c *SignalingClient) Send(message *api.ClientMessage) {
c.lock.Lock()
if c.conn == nil {
c.lock.Unlock()
return
}
if !c.writeInternal(message) {
c.lock.Unlock()
c.Close()
return
}
c.lock.Unlock()
}
func (c *SignalingClient) processMessage(message *api.ServerMessage) {
c.stats.numRecvMessages.Add(1)
switch message.Type {
case "welcome":
// Ignore welcome message.
case "hello":
c.processHelloMessage(message)
case "message":
c.processMessageMessage(message)
case "bye":
log.Printf("Received bye: %+v", message.Bye)
c.Close()
case "error":
log.Printf("Received error: %+v", message.Error)
c.Close()
default:
log.Printf("Unsupported message type: %+v", *message)
}
}
func (c *SignalingClient) processHelloMessage(message *api.ServerMessage) {
c.lock.Lock()
defer c.lock.Unlock()
c.privateSessionId = message.Hello.ResumeId
c.publicSessionId = message.Hello.SessionId
c.userId = message.Hello.UserId
log.Printf("Registered as %s (userid %s)", c.privateSessionId, c.userId)
c.readyWg.Done()
}
func (c *SignalingClient) PublicSessionId() api.PublicSessionId {
c.lock.Lock()
defer c.lock.Unlock()
return c.publicSessionId
}
func (c *SignalingClient) processMessageMessage(message *api.ServerMessage) {
var msg MessagePayload
if err := msg.UnmarshalJSON(message.Message.Data); err != nil {
log.Println("Error in unmarshal", err)
return
}
now := time.Now()
duration := now.Sub(msg.Now)
if duration > messageReportDuration {
log.Printf("Message took %s", duration)
}
}
func (c *SignalingClient) readPump() {
conn := c.conn
defer func() {
conn.Close()
}()
conn.SetReadLimit(maxMessageSize)
conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint
return nil
})
var decodeBuffer bytes.Buffer
for {
conn.SetReadDeadline(time.Now().Add(pongWait)) // nolint
messageType, reader, err := conn.NextReader()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
log.Printf("Error: %v", err)
}
break
}
if messageType != websocket.TextMessage {
log.Println("Unsupported message type", messageType)
break
}
decodeBuffer.Reset()
if _, err := decodeBuffer.ReadFrom(reader); err != nil {
c.lock.Lock()
if c.conn != nil {
log.Println("Error reading message", err)
}
c.lock.Unlock()
break
}
c.stats.numRecvBytes.Add(uint64(decodeBuffer.Len()))
var message api.ServerMessage
if err := message.UnmarshalJSON(decodeBuffer.Bytes()); err != nil {
log.Printf("Error: %v", err)
break
}
c.processMessage(&message)
}
}
func (c *SignalingClient) writeInternal(message *api.ClientMessage) bool {
var closeData []byte
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
var written int
writer, err := c.conn.NextWriter(websocket.TextMessage)
if err == nil {
written, err = easyjson.MarshalToWriter(message, writer)
}
if err != nil {
if err == websocket.ErrCloseSent {
// Already sent a "close", won't be able to send anything else.
return false
}
log.Println("Could not send message", message, err)
// TODO(jojo): Differentiate between JSON encode errors and websocket errors.
closeData = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "")
goto close
}
writer.Close()
c.stats.numSentMessages.Add(1)
if written > 0 {
c.stats.numSentBytes.Add(uint64(written))
}
return true
close:
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
c.conn.WriteMessage(websocket.CloseMessage, closeData) // nolint
return false
}
func (c *SignalingClient) sendPing() bool {
c.lock.Lock()
defer c.lock.Unlock()
if c.conn == nil {
return false
}
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return false
}
return true
}
func (c *SignalingClient) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.Close()
}()
for {
select {
case <-ticker.C:
if !c.sendPing() {
return
}
case <-c.stopChan:
return
}
}
}
func (c *SignalingClient) SendMessages(clients []*SignalingClient) {
sessionIds := make(map[*SignalingClient]api.PublicSessionId)
for _, c := range clients {
sessionIds[c] = c.PublicSessionId()
}
for !c.closed.Load() {
now := time.Now()
sender := c
recipientIdx := pseudorand.Int() % len(clients)
// Make sure a client is not sending to himself
for clients[recipientIdx] == sender {
recipientIdx = pseudorand.Int() % len(clients)
}
recipient := clients[recipientIdx]
msgdata := MessagePayload{
Now: now,
}
data, _ := msgdata.MarshalJSON()
msg := &api.ClientMessage{
Type: "message",
Message: &api.MessageClientMessage{
Recipient: api.MessageClientMessageRecipient{
Type: "session",
SessionId: sessionIds[recipient],
},
Data: data,
},
}
sender.Send(msg)
// Give some time to other clients.
time.Sleep(1 * time.Millisecond)
}
}
func registerAuthHandler(router *mux.Router) {
router.HandleFunc("/ocs/v2.php/apps/spreed/api/v1/signaling/backend", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
log.Println("Error reading body:", err)
return
}
rnd := r.Header.Get(talk.HeaderBackendSignalingRandom)
checksum := r.Header.Get(talk.HeaderBackendSignalingChecksum)
if rnd == "" || checksum == "" {
log.Println("No checksum headers found")
return
}
if verify := talk.CalculateBackendChecksum(rnd, body, backendSecret); verify != checksum {
log.Println("Backend checksum verification failed")
return
}
var request talk.BackendClientRequest
if err := request.UnmarshalJSON(body); err != nil {
log.Println(err)
return
}
response := &talk.BackendClientResponse{
Type: "auth",
Auth: &talk.BackendClientAuthResponse{
Version: talk.BackendVersion,
UserId: "sample-user",
},
}
data, err := response.MarshalJSON()
if err != nil {
log.Println(err)
return
}
rawdata := json.RawMessage(data)
payload := &talk.OcsResponse{
Ocs: &talk.OcsBody{
Meta: talk.OcsMeta{
Status: "ok",
StatusCode: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
Data: rawdata,
},
}
jsonpayload, err := payload.MarshalJSON()
if err != nil {
log.Println(err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonpayload) // nolint
})
}
func getLocalIP() string {
interfaces, err := net.InterfaceAddrs()
if err != nil {
log.Fatal(err)
}
for _, intf := range interfaces {
switch t := intf.(type) {
case *net.IPNet:
if !t.IP.IsInterfaceLocalMulticast() && !t.IP.IsLoopback() {
return t.IP.String()
}
}
}
return ""
}
func main() {
flag.Parse()
log.SetFlags(0)
if *showVersion {
fmt.Printf("nextcloud-spreed-signaling-client version %s/%s\n", version, runtime.Version())
os.Exit(0)
}
cfg, err := goconf.ReadConfigFile(*configFlag)
if err != nil {
log.Fatal("Could not read configuration: ", err)
}
secret, _ := config.GetStringOptionWithEnv(cfg, "backend", "secret")
backendSecret = []byte(secret)
log.Printf("Using a maximum of %d CPUs", runtime.GOMAXPROCS(0))
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatalf("Error writing CPU profile to %s: %s", *cpuprofile, err)
}
log.Printf("Writing CPU profile to %s ...", *cpuprofile)
defer pprof.StopCPUProfile()
}
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal(err) // nolint (defer pprof.StopCPUProfile() will not run which is ok in case of errors)
}
defer func() {
log.Printf("Writing Memory profile to %s ...", *memprofile)
runtime.GC()
if err := pprof.WriteHeapProfile(f); err != nil {
log.Printf("Error writing Memory profile to %s: %s", *memprofile, err)
}
}()
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
r := mux.NewRouter()
registerAuthHandler(r)
localIP := getLocalIP()
listener, err := net.Listen("tcp", localIP+":0")
if err != nil {
log.Fatal(err)
}
server := http.Server{
Handler: r,
}
go func() {
server.Serve(listener) // nolint
}()
backendUrl := "http://" + listener.Addr().String()
log.Println("Backend server running on", backendUrl)
urls := make([]url.URL, 0)
urlstrings := make([]string, 0)
for host := range internal.SplitEntries(*addr, ",") {
u := url.URL{
Scheme: "ws",
Host: host,
Path: "/spreed",
}
urls = append(urls, u)
urlstrings = append(urlstrings, u.String())
}
log.Printf("Connecting to %s", urlstrings)
clients := make([]*SignalingClient, 0)
stats := &Stats{}
if *maxClients < 2 {
log.Fatalf("Need at least 2 clients, got %d", *maxClients)
}
log.Printf("Starting %d clients", *maxClients)
var doneWg sync.WaitGroup
var readyWg sync.WaitGroup
for i := 0; i < *maxClients; i++ {
client, err := NewSignalingClient(urls[i%len(urls)].String(), stats, &readyWg, &doneWg)
if err != nil {
log.Fatal(err)
}
defer client.Close()
readyWg.Add(1)
request := &api.ClientMessage{
Type: "hello",
Hello: &api.HelloClientMessage{
Version: api.HelloVersionV1,
Auth: &api.HelloClientMessageAuth{
Url: backendUrl,
Params: json.RawMessage("{}"),
},
},
}
client.Send(request)
clients = append(clients, client)
}
log.Println("Clients created")
readyWg.Wait()
log.Println("All connections established")
for _, c := range clients {
doneWg.Add(1)
go func(c *SignalingClient) {
defer doneWg.Done()
c.SendMessages(clients)
}(c)
}
stats.start = time.Now()
reportInterval := 10 * time.Second
report := time.NewTicker(reportInterval)
loop:
for {
select {
case <-interrupt:
log.Println("Interrupted")
break loop
case <-report.C:
stats.Log()
}
}
log.Println("Waiting for clients to terminate ...")
for _, c := range clients {
c.Close()
}
doneWg.Wait()
}

97
cmd/client/stats.go Normal file
View file

@ -0,0 +1,97 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"log"
"sync/atomic"
"time"
"github.com/strukturag/nextcloud-spreed-signaling/api"
)
type Stats struct {
numRecvMessages atomic.Int64
numSentMessages atomic.Int64
resetRecvMessages int64
resetSentMessages int64
numRecvBytes atomic.Uint64
numSentBytes atomic.Uint64
resetRecvBytes uint64
resetSentBytes uint64
start time.Time
}
func (s *Stats) reset(start time.Time) {
s.resetRecvMessages = s.numRecvMessages.Load()
s.resetSentMessages = s.numSentMessages.Load()
s.resetRecvBytes = s.numRecvBytes.Load()
s.resetSentBytes = s.numSentBytes.Load()
s.start = start
}
type statsLogEntries struct {
totalSentMessages int64
sentMessagesPerSec int64
sentBytesPerSec api.Bandwidth
totalRecvMessages int64
recvMessagesPerSec int64
recvBytesPerSec api.Bandwidth
}
func (s *Stats) getLogEntries(now time.Time) *statsLogEntries {
duration := now.Sub(s.start)
perSec := int64(duration / time.Second)
if perSec == 0 {
return nil
}
totalSentMessages := s.numSentMessages.Load()
sentMessages := totalSentMessages - s.resetSentMessages
sentBytes := api.BandwidthFromBytes(s.numSentBytes.Load() - s.resetSentBytes)
totalRecvMessages := s.numRecvMessages.Load()
recvMessages := totalRecvMessages - s.resetRecvMessages
recvBytes := api.BandwidthFromBytes(s.numRecvBytes.Load() - s.resetRecvBytes)
s.reset(now)
return &statsLogEntries{
totalSentMessages: totalSentMessages,
sentMessagesPerSec: sentMessages / perSec,
sentBytesPerSec: sentBytes,
totalRecvMessages: totalRecvMessages,
recvMessagesPerSec: recvMessages / perSec,
recvBytesPerSec: recvBytes,
}
}
func (s *Stats) Log() {
now := time.Now()
if entries := s.getLogEntries(now); entries != nil {
log.Printf("Stats: sent=%d (%d/sec, %s), recv=%d (%d/sec, %s), delta=%d",
entries.totalSentMessages, entries.sentMessagesPerSec, entries.sentBytesPerSec,
entries.totalRecvMessages, entries.recvMessagesPerSec, entries.recvBytesPerSec,
entries.totalSentMessages-entries.totalRecvMessages)
}
}

80
cmd/client/stats_test.go Normal file
View file

@ -0,0 +1,80 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/strukturag/nextcloud-spreed-signaling/api"
)
func TestStats(t *testing.T) {
t.Parallel()
assert := assert.New(t)
var stats Stats
assert.Nil(stats.getLogEntries(time.Time{}))
now := time.Now()
if entries := stats.getLogEntries(now); assert.NotNil(entries) {
assert.EqualValues(0, entries.totalSentMessages)
assert.EqualValues(0, entries.sentMessagesPerSec)
assert.EqualValues(0, entries.sentBytesPerSec)
assert.EqualValues(0, entries.totalRecvMessages)
assert.EqualValues(0, entries.recvMessagesPerSec)
assert.EqualValues(0, entries.recvBytesPerSec)
}
stats.numSentMessages.Add(10)
stats.numSentBytes.Add((api.Bandwidth(20) * api.Kilobit).Bits())
stats.numRecvMessages.Add(30)
stats.numRecvBytes.Add((api.Bandwidth(40) * api.Kilobit).Bits())
if entries := stats.getLogEntries(now.Add(time.Second)); assert.NotNil(entries) {
assert.EqualValues(10, entries.totalSentMessages)
assert.EqualValues(10, entries.sentMessagesPerSec)
assert.EqualValues(20*1024*8, entries.sentBytesPerSec)
assert.EqualValues(30, entries.totalRecvMessages)
assert.EqualValues(30, entries.recvMessagesPerSec)
assert.EqualValues(40*1024*8, entries.recvBytesPerSec)
}
stats.numSentMessages.Add(100)
stats.numSentBytes.Add((api.Bandwidth(200) * api.Kilobit).Bits())
stats.numRecvMessages.Add(300)
stats.numRecvBytes.Add((api.Bandwidth(400) * api.Kilobit).Bits())
if entries := stats.getLogEntries(now.Add(2 * time.Second)); assert.NotNil(entries) {
assert.EqualValues(110, entries.totalSentMessages)
assert.EqualValues(100, entries.sentMessagesPerSec)
assert.EqualValues(200*1024*8, entries.sentBytesPerSec)
assert.EqualValues(330, entries.totalRecvMessages)
assert.EqualValues(300, entries.recvMessagesPerSec)
assert.EqualValues(400*1024*8, entries.recvBytesPerSec)
}
}

155
cmd/proxy/main.go Normal file
View file

@ -0,0 +1,155 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/dlintw/goconf"
"github.com/gorilla/mux"
"github.com/strukturag/nextcloud-spreed-signaling/config"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
signalinglog "github.com/strukturag/nextcloud-spreed-signaling/log"
)
var (
version = "unreleased"
configFlag = flag.String("config", "proxy.conf", "config file to use")
showVersion = flag.Bool("version", false, "show version and quit")
)
const (
defaultReadTimeout = 15
defaultWriteTimeout = 15
proxyDebugMessages = false
)
func main() {
log.SetFlags(log.Lshortfile)
flag.Parse()
if *showVersion {
fmt.Printf("nextcloud-spreed-signaling-proxy version %s/%s\n", version, runtime.Version())
os.Exit(0)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
signal.Notify(sigChan, syscall.SIGUSR1)
stopCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
logger := log.Default()
stopCtx = signalinglog.NewLoggerContext(stopCtx, logger)
logger.Printf("Starting up version %s/%s as pid %d", version, runtime.Version(), os.Getpid())
cfg, err := goconf.ReadConfigFile(*configFlag)
if err != nil {
logger.Fatal("Could not read configuration: ", err)
}
logger.Printf("Using a maximum of %d CPUs", runtime.GOMAXPROCS(0))
r := mux.NewRouter()
proxy, err := NewProxyServer(stopCtx, r, version, cfg)
if err != nil {
logger.Fatal(err)
}
if err := proxy.Start(cfg); err != nil {
logger.Fatal(err)
}
defer proxy.Stop()
if addr, _ := config.GetStringOptionWithEnv(cfg, "http", "listen"); addr != "" {
readTimeout, _ := cfg.GetInt("http", "readtimeout")
if readTimeout <= 0 {
readTimeout = defaultReadTimeout
}
writeTimeout, _ := cfg.GetInt("http", "writetimeout")
if writeTimeout <= 0 {
writeTimeout = defaultWriteTimeout
}
for address := range internal.SplitEntries(addr, " ") {
go func(address string) {
logger.Println("Listening on", address)
listener, err := net.Listen("tcp", address)
if err != nil {
logger.Fatal("Could not start listening: ", err)
}
srv := &http.Server{
Handler: r,
Addr: addr,
ReadTimeout: time.Duration(readTimeout) * time.Second,
WriteTimeout: time.Duration(writeTimeout) * time.Second,
}
if err := srv.Serve(listener); err != nil {
logger.Fatal("Could not start server: ", err)
}
}(address)
}
}
loop:
for {
select {
case <-stopCtx.Done():
logger.Println("Interrupted")
break loop
case sig := <-sigChan:
switch sig {
case syscall.SIGHUP:
logger.Printf("Received SIGHUP, reloading %s", *configFlag)
if config, err := goconf.ReadConfigFile(*configFlag); err != nil {
logger.Printf("Could not read configuration from %s: %s", *configFlag, err)
} else {
proxy.Reload(config)
}
case syscall.SIGUSR1:
logger.Printf("Received SIGUSR1, scheduling server to shutdown")
proxy.ScheduleShutdown()
}
case <-proxy.ShutdownChannel():
logger.Printf("All clients disconnected, shutting down")
break loop
}
}
}

72
cmd/proxy/proxy_client.go Normal file
View file

@ -0,0 +1,72 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"context"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
)
type ProxyClient struct {
signaling.Client
proxy *ProxyServer
session atomic.Pointer[ProxySession]
}
func NewProxyClient(ctx context.Context, proxy *ProxyServer, conn *websocket.Conn, addr string) (*ProxyClient, error) {
client := &ProxyClient{
proxy: proxy,
}
client.SetConn(ctx, conn, addr, client)
return client, nil
}
func (c *ProxyClient) GetSession() *ProxySession {
return c.session.Load()
}
func (c *ProxyClient) SetSession(session *ProxySession) {
c.session.Store(session)
}
func (c *ProxyClient) OnClosed(client signaling.HandlerClient) {
if session := c.GetSession(); session != nil {
session.MarkUsed()
}
c.proxy.clientClosed(&c.Client)
}
func (c *ProxyClient) OnMessageReceived(client signaling.HandlerClient, data []byte) {
c.proxy.processMessage(c, data)
}
func (c *ProxyClient) OnRTTReceived(client signaling.HandlerClient, rtt time.Duration) {
if session := c.GetSession(); session != nil {
session.MarkUsed()
}
}

606
cmd/proxy/proxy_remote.go Normal file
View file

@ -0,0 +1,606 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2024 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"context"
"crypto/rsa"
"crypto/tls"
"encoding/json"
"errors"
"math/rand/v2"
"net"
"net/http"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/gorilla/websocket"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/geoip"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
const (
initialReconnectInterval = 1 * time.Second
maxReconnectInterval = 16 * time.Second
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
var (
ErrNotConnected = errors.New("not connected") // +checklocksignore: Global readonly variable.
)
type RemoteConnection struct {
logger log.Logger
mu sync.Mutex
p *ProxyServer
url *url.URL
// +checklocks:mu
conn *websocket.Conn
closeCtx context.Context
closeFunc context.CancelFunc // +checklocksignore: Only written to from constructor.
tokenId string
tokenKey *rsa.PrivateKey
tlsConfig *tls.Config
// +checklocks:mu
connectedSince time.Time
reconnectTimer *time.Timer
reconnectInterval atomic.Int64
msgId atomic.Int64
// +checklocks:mu
helloMsgId string
// +checklocks:mu
sessionId api.PublicSessionId
// +checklocks:mu
helloReceived bool
// +checklocks:mu
pendingMessages []*signaling.ProxyClientMessage
// +checklocks:mu
messageCallbacks map[string]chan *signaling.ProxyServerMessage
}
func NewRemoteConnection(p *ProxyServer, proxyUrl string, tokenId string, tokenKey *rsa.PrivateKey, tlsConfig *tls.Config) (*RemoteConnection, error) {
u, err := url.Parse(proxyUrl)
if err != nil {
return nil, err
}
closeCtx, closeFunc := context.WithCancel(context.Background())
result := &RemoteConnection{
logger: p.logger,
p: p,
url: u,
closeCtx: closeCtx,
closeFunc: closeFunc,
tokenId: tokenId,
tokenKey: tokenKey,
tlsConfig: tlsConfig,
reconnectTimer: time.NewTimer(0),
messageCallbacks: make(map[string]chan *signaling.ProxyServerMessage),
}
result.reconnectInterval.Store(int64(initialReconnectInterval))
go result.writePump()
return result, nil
}
func (c *RemoteConnection) String() string {
return c.url.String()
}
func (c *RemoteConnection) SessionId() api.PublicSessionId {
c.mu.Lock()
defer c.mu.Unlock()
return c.sessionId
}
func (c *RemoteConnection) reconnect() {
u, err := c.url.Parse("proxy")
if err != nil {
c.logger.Printf("Could not resolve url to proxy at %s: %s", c, err)
c.scheduleReconnect()
return
}
switch u.Scheme {
case "http":
u.Scheme = "ws"
case "https":
u.Scheme = "wss"
}
dialer := websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: c.tlsConfig,
}
conn, _, err := dialer.DialContext(c.closeCtx, u.String(), nil)
if err != nil {
c.logger.Printf("Error connecting to proxy at %s: %s", c, err)
c.scheduleReconnect()
return
}
c.logger.Printf("Connected to %s", c)
c.mu.Lock()
if c.closeCtx.Err() != nil {
// Closed while waiting for lock.
c.mu.Unlock()
if err := conn.Close(); err != nil {
c.logger.Printf("Error closing connection to %s: %s", c, err)
}
return
}
c.connectedSince = time.Now()
c.conn = conn
c.mu.Unlock()
c.reconnectInterval.Store(int64(initialReconnectInterval))
if !c.sendReconnectHello() || !c.sendPing() {
c.scheduleReconnect()
return
}
go c.readPump(conn)
}
func (c *RemoteConnection) sendReconnectHello() bool {
c.mu.Lock()
defer c.mu.Unlock()
if err := c.sendHello(c.closeCtx); err != nil {
c.logger.Printf("Error sending hello request to proxy at %s: %s", c, err)
return false
}
return true
}
func (c *RemoteConnection) scheduleReconnect() {
c.mu.Lock()
defer c.mu.Unlock()
c.scheduleReconnectLocked()
}
// +checklocks:c.mu
func (c *RemoteConnection) scheduleReconnectLocked() {
if err := c.sendCloseLocked(); err != nil && err != ErrNotConnected {
c.logger.Printf("Could not send close message to %s: %s", c, err)
}
c.closeLocked()
interval := c.reconnectInterval.Load()
// Prevent all servers from reconnecting at the same time in case of an
// interrupted connection to the proxy or a restart.
jitter := rand.Int64N(interval) - (interval / 2)
c.reconnectTimer.Reset(time.Duration(interval + jitter))
interval = min(interval*2, int64(maxReconnectInterval))
c.reconnectInterval.Store(interval)
}
// +checklocks:c.mu
func (c *RemoteConnection) sendHello(ctx context.Context) error {
c.helloMsgId = strconv.FormatInt(c.msgId.Add(1), 10)
msg := &signaling.ProxyClientMessage{
Id: c.helloMsgId,
Type: "hello",
Hello: &signaling.HelloProxyClientMessage{
Version: "1.0",
},
}
if sessionId := c.sessionId; sessionId != "" {
msg.Hello.ResumeId = sessionId
} else {
tokenString, err := c.createToken("")
if err != nil {
return err
}
msg.Hello.Token = tokenString
}
return c.sendMessageLocked(ctx, msg)
}
// +checklocks:c.mu
func (c *RemoteConnection) sendCloseLocked() error {
if c.conn == nil {
return ErrNotConnected
}
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
return c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
}
func (c *RemoteConnection) close() {
c.mu.Lock()
defer c.mu.Unlock()
c.closeLocked()
}
// +checklocks:c.mu
func (c *RemoteConnection) closeLocked() {
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.connectedSince = time.Time{}
c.helloReceived = false
}
func (c *RemoteConnection) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
c.reconnectTimer.Stop()
if c.closeCtx.Err() != nil {
// Already closed
return nil
}
c.closeFunc()
var err1 error
var err2 error
if c.conn != nil {
err1 = c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Time{})
err2 = c.conn.Close()
c.conn = nil
}
c.connectedSince = time.Time{}
c.helloReceived = false
if err1 != nil {
return err1
}
return err2
}
func (c *RemoteConnection) createToken(subject string) (string, error) {
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now()),
Issuer: c.tokenId,
Subject: subject,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenString, err := token.SignedString(c.tokenKey)
if err != nil {
return "", err
}
return tokenString, nil
}
func (c *RemoteConnection) SendMessage(msg *signaling.ProxyClientMessage) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.sendMessageLocked(c.closeCtx, msg)
}
// +checklocks:c.mu
func (c *RemoteConnection) deferMessage(ctx context.Context, msg *signaling.ProxyClientMessage) {
c.pendingMessages = append(c.pendingMessages, msg)
if ctx.Done() != nil {
go func() {
<-ctx.Done()
c.mu.Lock()
defer c.mu.Unlock()
for idx, m := range c.pendingMessages {
if m == msg {
c.pendingMessages[idx] = nil
break
}
}
}()
}
}
// +checklocks:c.mu
func (c *RemoteConnection) sendMessageLocked(ctx context.Context, msg *signaling.ProxyClientMessage) error {
if c.conn == nil {
// Defer until connected.
c.deferMessage(ctx, msg)
return nil
}
if c.helloMsgId != "" && c.helloMsgId != msg.Id {
// Hello request is still inflight, defer.
c.deferMessage(ctx, msg)
return nil
}
c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // nolint
return c.conn.WriteJSON(msg)
}
func (c *RemoteConnection) readPump(conn *websocket.Conn) {
defer func() {
if c.closeCtx.Err() == nil {
c.scheduleReconnect()
}
}()
defer c.close()
for {
msgType, msg, err := conn.ReadMessage()
if err != nil {
if errors.Is(err, websocket.ErrCloseSent) {
break
} else if _, ok := err.(*websocket.CloseError); !ok || websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
if !errors.Is(err, net.ErrClosed) || c.closeCtx.Err() == nil {
c.logger.Printf("Error reading from %s: %v", c, err)
}
}
break
}
if msgType != websocket.TextMessage {
c.logger.Printf("unexpected message type %q (%s)", msgType, string(msg))
continue
}
var message signaling.ProxyServerMessage
if err := json.Unmarshal(msg, &message); err != nil {
c.logger.Printf("could not decode message %s: %s", string(msg), err)
continue
}
c.mu.Lock()
helloMsgId := c.helloMsgId
c.mu.Unlock()
if helloMsgId != "" && message.Id == helloMsgId {
c.processHello(&message)
} else {
c.processMessage(&message)
}
}
}
func (c *RemoteConnection) sendPing() bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return false
}
now := time.Now()
msg := strconv.FormatInt(now.UnixNano(), 10)
c.conn.SetWriteDeadline(now.Add(writeWait)) // nolint
if err := c.conn.WriteMessage(websocket.PingMessage, []byte(msg)); err != nil {
c.logger.Printf("Could not send ping to proxy at %s: %v", c, err)
go c.scheduleReconnect()
return false
}
return true
}
func (c *RemoteConnection) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
}()
defer c.reconnectTimer.Stop()
for {
select {
case <-c.reconnectTimer.C:
c.reconnect()
case <-ticker.C:
c.sendPing()
case <-c.closeCtx.Done():
return
}
}
}
func (c *RemoteConnection) processHello(msg *signaling.ProxyServerMessage) {
c.mu.Lock()
defer c.mu.Unlock()
c.helloMsgId = ""
switch msg.Type {
case "error":
if msg.Error.Code == "no_such_session" {
c.logger.Printf("Session %s could not be resumed on %s, registering new", c.sessionId, c)
c.sessionId = ""
if err := c.sendHello(c.closeCtx); err != nil {
c.logger.Printf("Could not send hello request to %s: %s", c, err)
c.scheduleReconnectLocked()
}
return
}
c.logger.Printf("Hello connection to %s failed with %+v, reconnecting", c, msg.Error)
c.scheduleReconnectLocked()
case "hello":
resumed := c.sessionId == msg.Hello.SessionId
c.sessionId = msg.Hello.SessionId
c.helloReceived = true
var country geoip.Country
if msg.Hello.Server != nil {
if country = msg.Hello.Server.Country; country != "" && !geoip.IsValidCountry(country) {
c.logger.Printf("Proxy %s sent invalid country %s in hello response", c, country)
country = ""
}
}
if resumed {
c.logger.Printf("Resumed session %s on %s", c.sessionId, c)
} else if country != "" {
c.logger.Printf("Received session %s from %s (in %s)", c.sessionId, c, country)
} else {
c.logger.Printf("Received session %s from %s", c.sessionId, c)
}
pending := c.pendingMessages
c.pendingMessages = nil
for _, m := range pending {
if m == nil {
continue
}
if err := c.sendMessageLocked(c.closeCtx, m); err != nil {
c.logger.Printf("Could not send pending message %+v to %s: %s", m, c, err)
}
}
default:
c.logger.Printf("Received unsupported hello response %+v from %s, reconnecting", msg, c)
c.scheduleReconnectLocked()
}
}
func (c *RemoteConnection) handleCallback(msg *signaling.ProxyServerMessage) bool {
if msg.Id == "" {
return false
}
c.mu.Lock()
ch, found := c.messageCallbacks[msg.Id]
if !found {
c.mu.Unlock()
return false
}
delete(c.messageCallbacks, msg.Id)
c.mu.Unlock()
ch <- msg
return true
}
func (c *RemoteConnection) processMessage(msg *signaling.ProxyServerMessage) {
if c.handleCallback(msg) {
return
}
switch msg.Type {
case "event":
c.processEvent(msg)
case "bye":
c.logger.Printf("Connection to %s was closed: %s", c, msg.Bye.Reason)
if msg.Bye.Reason == "session_expired" {
// Don't try to resume expired session.
c.mu.Lock()
c.sessionId = ""
c.mu.Unlock()
}
c.scheduleReconnect()
default:
c.logger.Printf("Received unsupported message %+v from %s", msg, c)
}
}
func (c *RemoteConnection) processEvent(msg *signaling.ProxyServerMessage) {
switch msg.Event.Type {
case "update-load":
// Ignore
case "publisher-closed":
c.logger.Printf("Remote publisher %s was closed on %s", msg.Event.ClientId, c)
c.p.RemotePublisherDeleted(api.PublicSessionId(msg.Event.ClientId))
default:
c.logger.Printf("Received unsupported event %+v from %s", msg, c)
}
}
func (c *RemoteConnection) sendMessageWithCallbackLocked(ctx context.Context, msg *signaling.ProxyClientMessage) (string, <-chan *signaling.ProxyServerMessage, error) {
msg.Id = strconv.FormatInt(c.msgId.Add(1), 10)
c.mu.Lock()
defer c.mu.Unlock()
if err := c.sendMessageLocked(ctx, msg); err != nil {
msg.Id = ""
return "", nil, err
}
ch := make(chan *signaling.ProxyServerMessage, 1)
c.messageCallbacks[msg.Id] = ch
return msg.Id, ch, nil
}
func (c *RemoteConnection) RequestMessage(ctx context.Context, msg *signaling.ProxyClientMessage) (*signaling.ProxyServerMessage, error) {
id, ch, err := c.sendMessageWithCallbackLocked(ctx, msg)
if err != nil {
return nil, err
}
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.messageCallbacks, id)
}()
select {
case <-ctx.Done():
// TODO: Cancel request.
return nil, ctx.Err()
case response := <-ch:
if response.Type == "error" {
return nil, response.Error
}
return response, nil
}
}
func (c *RemoteConnection) SendBye() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return nil
}
return c.sendMessageLocked(c.closeCtx, &signaling.ProxyClientMessage{
Type: "bye",
})
}

View file

@ -0,0 +1,216 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"context"
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
)
func (c *RemoteConnection) WaitForConnection(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
// Only used in tests, so a busy-loop should be fine.
for c.conn == nil || c.connectedSince.IsZero() || !c.helloReceived {
if err := ctx.Err(); err != nil {
return err
}
c.mu.Unlock()
time.Sleep(time.Nanosecond)
c.mu.Lock()
}
return nil
}
func (c *RemoteConnection) WaitForDisconnect(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
initial := c.conn
if initial == nil {
return nil
}
// Only used in tests, so a busy-loop should be fine.
for c.conn == initial {
if err := ctx.Err(); err != nil {
return err
}
c.mu.Unlock()
time.Sleep(time.Nanosecond)
c.mu.Lock()
}
return nil
}
func Test_ProxyRemoteConnectionReconnect(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
server, key, httpserver := newProxyServerForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
conn, err := NewRemoteConnection(server, httpserver.URL, TokenIdForTest, key, nil)
require.NoError(err)
t.Cleanup(func() {
assert.NoError(conn.SendBye())
assert.NoError(conn.Close())
})
assert.NoError(conn.WaitForConnection(ctx))
// Closing the connection will reconnect automatically
conn.mu.Lock()
c := conn.conn
conn.mu.Unlock()
assert.NoError(c.Close())
assert.NoError(conn.WaitForDisconnect(ctx))
assert.NoError(conn.WaitForConnection(ctx))
}
func Test_ProxyRemoteConnectionReconnectUnknownSession(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
server, key, httpserver := newProxyServerForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
conn, err := NewRemoteConnection(server, httpserver.URL, TokenIdForTest, key, nil)
require.NoError(err)
t.Cleanup(func() {
assert.NoError(conn.SendBye())
assert.NoError(conn.Close())
})
assert.NoError(conn.WaitForConnection(ctx))
// Closing the connection will reconnect automatically
conn.mu.Lock()
c := conn.conn
sessionId := conn.sessionId
conn.mu.Unlock()
var sid uint64
server.IterateSessions(func(session *ProxySession) {
if session.PublicId() == sessionId {
sid = session.Sid()
}
})
require.NotEqualValues(0, sid)
server.DeleteSession(sid)
if err := c.Close(); err != nil {
// If an error occurs while closing, it may only be "use of closed network
// connection" because the "DeleteSession" might have already closed the
// socket.
assert.ErrorIs(err, net.ErrClosed)
}
assert.NoError(conn.WaitForDisconnect(ctx))
assert.NoError(conn.WaitForConnection(ctx))
assert.NotEqual(sessionId, conn.SessionId())
}
func Test_ProxyRemoteConnectionReconnectExpiredSession(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
server, key, httpserver := newProxyServerForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
conn, err := NewRemoteConnection(server, httpserver.URL, TokenIdForTest, key, nil)
require.NoError(err)
t.Cleanup(func() {
assert.NoError(conn.SendBye())
assert.NoError(conn.Close())
})
assert.NoError(conn.WaitForConnection(ctx))
// Closing the connection will reconnect automatically
conn.mu.Lock()
sessionId := conn.sessionId
conn.mu.Unlock()
var session *ProxySession
server.IterateSessions(func(sess *ProxySession) {
if sess.PublicId() == sessionId {
session = sess
}
})
require.NotNil(session)
session.Close()
assert.NoError(conn.WaitForDisconnect(ctx))
assert.NoError(conn.WaitForConnection(ctx))
assert.NotEqual(sessionId, conn.SessionId())
}
func Test_ProxyRemoteConnectionCreatePublisher(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
server, key, httpserver := newProxyServerForTest(t)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
conn, err := NewRemoteConnection(server, httpserver.URL, TokenIdForTest, key, nil)
require.NoError(err)
t.Cleanup(func() {
assert.NoError(conn.SendBye())
assert.NoError(conn.Close())
})
publisherId := "the-publisher"
hostname := "the-hostname"
port := 1234
rtcpPort := 2345
_, err = conn.RequestMessage(ctx, &signaling.ProxyClientMessage{
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "publish-remote",
ClientId: publisherId,
Hostname: hostname,
Port: port,
RtcpPort: rtcpPort,
},
})
assert.ErrorContains(err, UnknownClient.Error())
}

1729
cmd/proxy/proxy_server.go Normal file

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

485
cmd/proxy/proxy_session.go Normal file
View file

@ -0,0 +1,485 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
const (
// Sessions expire if they have not been used for one minute.
sessionExpirationTime = time.Minute
)
type remotePublisherData struct {
id api.PublicSessionId
hostname string
port int
rtcpPort int
}
type ProxySession struct {
logger log.Logger
proxy *ProxyServer
id api.PublicSessionId
sid uint64
lastUsed atomic.Int64
ctx context.Context
closeFunc context.CancelFunc
clientLock sync.Mutex
// +checklocks:clientLock
client *ProxyClient
// +checklocks:clientLock
pendingMessages []*signaling.ProxyServerMessage
publishersLock sync.Mutex
// +checklocks:publishersLock
publishers map[string]signaling.McuPublisher
// +checklocks:publishersLock
publisherIds map[signaling.McuPublisher]string
subscribersLock sync.Mutex
// +checklocks:subscribersLock
subscribers map[string]signaling.McuSubscriber
// +checklocks:subscribersLock
subscriberIds map[signaling.McuSubscriber]string
remotePublishersLock sync.Mutex
// +checklocks:remotePublishersLock
remotePublishers map[signaling.McuRemoteAwarePublisher]map[string]*remotePublisherData
}
func NewProxySession(proxy *ProxyServer, sid uint64, id api.PublicSessionId) *ProxySession {
ctx, closeFunc := context.WithCancel(context.Background())
result := &ProxySession{
logger: proxy.logger,
proxy: proxy,
id: id,
sid: sid,
ctx: ctx,
closeFunc: closeFunc,
publishers: make(map[string]signaling.McuPublisher),
publisherIds: make(map[signaling.McuPublisher]string),
subscribers: make(map[string]signaling.McuSubscriber),
subscriberIds: make(map[signaling.McuSubscriber]string),
}
result.MarkUsed()
return result
}
func (s *ProxySession) Context() context.Context {
return s.ctx
}
func (s *ProxySession) PublicId() api.PublicSessionId {
return s.id
}
func (s *ProxySession) Sid() uint64 {
return s.sid
}
func (s *ProxySession) LastUsed() time.Time {
lastUsed := s.lastUsed.Load()
return time.Unix(0, lastUsed)
}
func (s *ProxySession) IsExpired() bool {
expiresAt := s.LastUsed().Add(sessionExpirationTime)
return expiresAt.Before(time.Now())
}
func (s *ProxySession) MarkUsed() {
now := time.Now()
s.lastUsed.Store(now.UnixNano())
}
func (s *ProxySession) Close() {
prev := s.SetClient(nil)
if prev != nil {
reason := "session_closed"
if s.IsExpired() {
reason = "session_expired"
}
prev.SendMessage(&signaling.ProxyServerMessage{
Type: "bye",
Bye: &signaling.ByeProxyServerMessage{
Reason: reason,
},
})
}
s.closeFunc()
s.clearPublishers()
s.clearSubscribers()
s.clearRemotePublishers()
s.proxy.DeleteSession(s.Sid())
}
func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {
s.clientLock.Lock()
prev := s.client
s.client = client
var messages []*signaling.ProxyServerMessage
if client != nil {
messages, s.pendingMessages = s.pendingMessages, nil
}
s.clientLock.Unlock()
if prev != nil {
prev.SetSession(nil)
}
if client != nil {
s.MarkUsed()
client.SetSession(s)
for _, msg := range messages {
client.SendMessage(msg)
}
}
return prev
}
func (s *ProxySession) OnUpdateOffer(client signaling.McuClient, offer api.StringMap) {
id := s.proxy.GetClientId(client)
if id == "" {
s.logger.Printf("Received offer %+v from unknown %s client %s (%+v)", offer, client.StreamType(), client.Id(), client)
return
}
msg := &signaling.ProxyServerMessage{
Type: "payload",
Payload: &signaling.PayloadProxyServerMessage{
Type: "offer",
ClientId: id,
Payload: api.StringMap{
"offer": offer,
},
},
}
s.sendMessage(msg)
}
func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate any) {
id := s.proxy.GetClientId(client)
if id == "" {
s.logger.Printf("Received candidate %+v from unknown %s client %s (%+v)", candidate, client.StreamType(), client.Id(), client)
return
}
msg := &signaling.ProxyServerMessage{
Type: "payload",
Payload: &signaling.PayloadProxyServerMessage{
Type: "candidate",
ClientId: id,
Payload: api.StringMap{
"candidate": candidate,
},
},
}
s.sendMessage(msg)
}
func (s *ProxySession) sendMessage(message *signaling.ProxyServerMessage) {
var client *ProxyClient
s.clientLock.Lock()
client = s.client
if client == nil {
s.pendingMessages = append(s.pendingMessages, message)
}
s.clientLock.Unlock()
if client != nil {
client.SendMessage(message)
}
}
func (s *ProxySession) OnIceCompleted(client signaling.McuClient) {
id := s.proxy.GetClientId(client)
if id == "" {
s.logger.Printf("Received ice completed event from unknown %s client %s (%+v)", client.StreamType(), client.Id(), client)
return
}
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "ice-completed",
ClientId: id,
},
}
s.sendMessage(msg)
}
func (s *ProxySession) SubscriberSidUpdated(subscriber signaling.McuSubscriber) {
id := s.proxy.GetClientId(subscriber)
if id == "" {
s.logger.Printf("Received subscriber sid updated event from unknown %s subscriber %s (%+v)", subscriber.StreamType(), subscriber.Id(), subscriber)
return
}
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "subscriber-sid-updated",
ClientId: id,
Sid: subscriber.Sid(),
},
}
s.sendMessage(msg)
}
func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) {
if id := s.DeletePublisher(publisher); id != "" {
if s.proxy.DeleteClient(id, publisher) {
statsPublishersCurrent.WithLabelValues(string(publisher.StreamType())).Dec()
}
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "publisher-closed",
ClientId: id,
},
}
s.sendMessage(msg)
}
}
func (s *ProxySession) SubscriberClosed(subscriber signaling.McuSubscriber) {
if id := s.DeleteSubscriber(subscriber); id != "" {
if s.proxy.DeleteClient(id, subscriber) {
statsSubscribersCurrent.WithLabelValues(string(subscriber.StreamType())).Dec()
}
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "subscriber-closed",
ClientId: id,
},
}
s.sendMessage(msg)
}
}
func (s *ProxySession) StorePublisher(ctx context.Context, id string, publisher signaling.McuPublisher) {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
s.publishers[id] = publisher
s.publisherIds[publisher] = id
}
func (s *ProxySession) DeletePublisher(publisher signaling.McuPublisher) string {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
id, found := s.publisherIds[publisher]
if !found {
return ""
}
delete(s.publishers, id)
delete(s.publisherIds, publisher)
if rp, ok := publisher.(signaling.McuRemoteAwarePublisher); ok {
s.remotePublishersLock.Lock()
defer s.remotePublishersLock.Unlock()
delete(s.remotePublishers, rp)
}
go s.proxy.PublisherDeleted(publisher)
return id
}
func (s *ProxySession) StoreSubscriber(ctx context.Context, id string, subscriber signaling.McuSubscriber) {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
s.subscribers[id] = subscriber
s.subscriberIds[subscriber] = id
}
func (s *ProxySession) DeleteSubscriber(subscriber signaling.McuSubscriber) string {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
id, found := s.subscriberIds[subscriber]
if !found {
return ""
}
delete(s.subscribers, id)
delete(s.subscriberIds, subscriber)
return id
}
func (s *ProxySession) clearPublishers() {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
go func(publishers map[string]signaling.McuPublisher) {
for id, publisher := range publishers {
if s.proxy.DeleteClient(id, publisher) {
statsPublishersCurrent.WithLabelValues(string(publisher.StreamType())).Dec()
}
publisher.Close(context.Background())
}
}(s.publishers)
// Can't use clear(...) here as the map is processed by the goroutine above.
s.publishers = make(map[string]signaling.McuPublisher)
clear(s.publisherIds)
}
func (s *ProxySession) clearRemotePublishers() {
s.remotePublishersLock.Lock()
defer s.remotePublishersLock.Unlock()
go func(remotePublishers map[signaling.McuRemoteAwarePublisher]map[string]*remotePublisherData) {
for publisher, entries := range remotePublishers {
for _, data := range entries {
if err := publisher.UnpublishRemote(context.Background(), s.PublicId(), data.hostname, data.port, data.rtcpPort); err != nil {
s.logger.Printf("Error unpublishing %s %s from remote %s: %s", publisher.StreamType(), publisher.Id(), data.hostname, err)
}
}
}
}(s.remotePublishers)
s.remotePublishers = nil
}
func (s *ProxySession) clearSubscribers() {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
go func(subscribers map[string]signaling.McuSubscriber) {
for id, subscriber := range subscribers {
if s.proxy.DeleteClient(id, subscriber) {
statsSubscribersCurrent.WithLabelValues(string(subscriber.StreamType())).Dec()
}
subscriber.Close(context.Background())
}
}(s.subscribers)
// Can't use clear(...) here as the map is processed by the goroutine above.
s.subscribers = make(map[string]signaling.McuSubscriber)
clear(s.subscriberIds)
}
func (s *ProxySession) NotifyDisconnected() {
s.clearPublishers()
s.clearSubscribers()
s.clearRemotePublishers()
}
func (s *ProxySession) AddRemotePublisher(publisher signaling.McuRemoteAwarePublisher, hostname string, port int, rtcpPort int) bool {
s.remotePublishersLock.Lock()
defer s.remotePublishersLock.Unlock()
remote, found := s.remotePublishers[publisher]
if !found {
remote = make(map[string]*remotePublisherData)
if s.remotePublishers == nil {
s.remotePublishers = make(map[signaling.McuRemoteAwarePublisher]map[string]*remotePublisherData)
}
s.remotePublishers[publisher] = remote
}
key := fmt.Sprintf("%s:%d%d", hostname, port, rtcpPort)
if _, found := remote[key]; found {
return false
}
data := &remotePublisherData{
id: publisher.PublisherId(),
hostname: hostname,
port: port,
rtcpPort: rtcpPort,
}
remote[key] = data
return true
}
func (s *ProxySession) RemoveRemotePublisher(publisher signaling.McuRemoteAwarePublisher, hostname string, port int, rtcpPort int) {
s.remotePublishersLock.Lock()
defer s.remotePublishersLock.Unlock()
remote, found := s.remotePublishers[publisher]
if !found {
return
}
key := fmt.Sprintf("%s:%d%d", hostname, port, rtcpPort)
delete(remote, key)
if len(remote) == 0 {
delete(s.remotePublishers, publisher)
if len(s.remotePublishers) == 0 {
s.remotePublishers = nil
}
}
}
func (s *ProxySession) OnPublisherDeleted(publisher signaling.McuPublisher) {
if publisher, ok := publisher.(signaling.McuRemoteAwarePublisher); ok {
s.OnRemoteAwarePublisherDeleted(publisher)
}
}
func (s *ProxySession) OnRemoteAwarePublisherDeleted(publisher signaling.McuRemoteAwarePublisher) {
s.remotePublishersLock.Lock()
defer s.remotePublishersLock.Unlock()
if entries, found := s.remotePublishers[publisher]; found {
delete(s.remotePublishers, publisher)
for _, entry := range entries {
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "publisher-closed",
ClientId: string(entry.id),
},
}
s.sendMessage(msg)
}
}
}
func (s *ProxySession) OnRemotePublisherDeleted(publisherId api.PublicSessionId) {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
for id, sub := range s.subscribers {
if sub.Publisher() == publisherId {
delete(s.subscribers, id)
delete(s.subscriberIds, sub)
s.logger.Printf("Remote subscriber %s was closed, closing %s subscriber %s", publisherId, sub.StreamType(), sub.Id())
go sub.Close(context.Background())
}
}
}

View file

@ -0,0 +1,109 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2021 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
statsSessionsCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "sessions",
Help: "The current number of sessions",
})
statsSessionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "sessions_total",
Help: "The total number of created sessions",
})
statsSessionsResumedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "sessions_resumed_total",
Help: "The total number of resumed sessions",
})
statsPublishersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "publishers",
Help: "The current number of publishers",
}, []string{"type"})
statsPublishersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "publishers_total",
Help: "The total number of created publishers",
}, []string{"type"})
statsSubscribersCurrent = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "subscribers",
Help: "The current number of subscribers",
}, []string{"type"})
statsSubscribersTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "subscribers_total",
Help: "The total number of created subscribers",
}, []string{"type"})
statsCommandMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "command_messages_total",
Help: "The total number of command messages",
}, []string{"type"})
statsPayloadMessagesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "payload_messages_total",
Help: "The total number of payload messages",
}, []string{"type"})
statsTokenErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "token_errors_total",
Help: "The total number of token errors",
}, []string{"reason"})
statsLoadCurrent = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "signaling",
Subsystem: "proxy",
Name: "load",
Help: "The current load of the signaling proxy",
})
)
func init() {
prometheus.MustRegister(statsSessionsCurrent)
prometheus.MustRegister(statsSessionsTotal)
prometheus.MustRegister(statsSessionsResumedTotal)
prometheus.MustRegister(statsPublishersCurrent)
prometheus.MustRegister(statsPublishersTotal)
prometheus.MustRegister(statsSubscribersCurrent)
prometheus.MustRegister(statsSubscribersTotal)
prometheus.MustRegister(statsCommandMessagesTotal)
prometheus.MustRegister(statsPayloadMessagesTotal)
prometheus.MustRegister(statsTokenErrorsTotal)
prometheus.MustRegister(statsLoadCurrent)
}

View file

@ -0,0 +1,257 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2024 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
"github.com/strukturag/nextcloud-spreed-signaling/api"
)
var (
ErrNoMessageReceived = errors.New("no message was received by the server")
)
type ProxyTestClient struct {
t *testing.T
assert *assert.Assertions // +checklocksignore: Only written to from constructor.
require *require.Assertions
mu sync.Mutex
// +checklocks:mu
conn *websocket.Conn
messageChan chan []byte
readErrorChan chan error
sessionId api.PublicSessionId
}
func NewProxyTestClient(ctx context.Context, t *testing.T, url string) *ProxyTestClient {
conn, _, err := websocket.DefaultDialer.DialContext(ctx, getWebsocketUrl(url), nil)
require.NoError(t, err)
messageChan := make(chan []byte)
readErrorChan := make(chan error, 1)
go func() {
for {
messageType, data, err := conn.ReadMessage()
if err != nil {
readErrorChan <- err
return
} else if !assert.Equal(t, websocket.TextMessage, messageType) {
return
}
messageChan <- data
}
}()
client := &ProxyTestClient{
t: t,
assert: assert.New(t),
require: require.New(t),
conn: conn,
messageChan: messageChan,
readErrorChan: readErrorChan,
}
return client
}
func (c *ProxyTestClient) CloseWithBye() {
c.SendBye() // nolint
c.Close()
}
func (c *ProxyTestClient) Close() {
c.mu.Lock()
defer c.mu.Unlock()
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err == websocket.ErrCloseSent {
// Already closed
return
}
// Wait a bit for close message to be processed.
time.Sleep(100 * time.Millisecond)
c.assert.NoError(c.conn.Close())
// Drain any entries in the channels to terminate the read goroutine.
loop:
for {
select {
case <-c.readErrorChan:
case <-c.messageChan:
default:
break loop
}
}
}
func (c *ProxyTestClient) SendBye() error {
hello := &signaling.ProxyClientMessage{
Id: "9876",
Type: "bye",
Bye: &signaling.ByeProxyClientMessage{},
}
return c.WriteJSON(hello)
}
func (c *ProxyTestClient) WriteJSON(data any) error {
if msg, ok := data.(*signaling.ProxyClientMessage); ok {
if err := msg.CheckValid(); err != nil {
return err
}
}
c.mu.Lock()
defer c.mu.Unlock()
return c.conn.WriteJSON(data)
}
func (c *ProxyTestClient) RunUntilMessage(ctx context.Context) (message *signaling.ProxyServerMessage, err error) {
select {
case err = <-c.readErrorChan:
case msg := <-c.messageChan:
var m signaling.ProxyServerMessage
if err = json.Unmarshal(msg, &m); err == nil {
message = &m
}
case <-ctx.Done():
err = ctx.Err()
}
return
}
func checkUnexpectedClose(err error) error {
if err != nil && websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
return fmt.Errorf("Connection was closed with unexpected error: %s", err)
}
return nil
}
func checkMessageType(message *signaling.ProxyServerMessage, expectedType string) error {
if message == nil {
return ErrNoMessageReceived
}
if message.Type != expectedType {
return fmt.Errorf("Expected \"%s\" message, got %+v", expectedType, message)
}
switch message.Type {
case "hello":
if message.Hello == nil {
return fmt.Errorf("Expected \"%s\" message, got %+v", expectedType, message)
}
case "command":
if message.Command == nil {
return fmt.Errorf("Expected \"%s\" message, got %+v", expectedType, message)
}
case "event":
if message.Event == nil {
return fmt.Errorf("Expected \"%s\" message, got %+v", expectedType, message)
}
}
return nil
}
func (c *ProxyTestClient) SendHello(key any) error {
claims := &signaling.TokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
IssuedAt: jwt.NewNumericDate(time.Now().Add(-maxTokenAge / 2)),
Issuer: TokenIdForTest,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenString, err := token.SignedString(key)
c.require.NoError(err)
hello := &signaling.ProxyClientMessage{
Id: "1234",
Type: "hello",
Hello: &signaling.HelloProxyClientMessage{
Version: "1.0",
Features: []string{},
Token: tokenString,
},
}
return c.WriteJSON(hello)
}
func (c *ProxyTestClient) RunUntilHello(ctx context.Context) (message *signaling.ProxyServerMessage, err error) {
if message, err = c.RunUntilMessage(ctx); err != nil {
return nil, err
}
if err := checkUnexpectedClose(err); err != nil {
return nil, err
}
if err := checkMessageType(message, "hello"); err != nil {
return nil, err
}
c.sessionId = message.Hello.SessionId
return message, nil
}
func (c *ProxyTestClient) RunUntilLoad(ctx context.Context, load uint64) (message *signaling.ProxyServerMessage, err error) {
if message, err = c.RunUntilMessage(ctx); err != nil {
return nil, err
}
if err := checkUnexpectedClose(err); err != nil {
return nil, err
}
if err := checkMessageType(message, "event"); err != nil {
return nil, err
}
if expectedType := "update-load"; message.Event.Type != expectedType {
return nil, fmt.Errorf("Expected \"%s\" event message, got %+v", expectedType, message)
}
if load != message.Event.Load {
return nil, fmt.Errorf("Expected load %d, got %+v", load, message)
}
return message, nil
}
func (c *ProxyTestClient) SendCommand(command *signaling.CommandProxyClientMessage) error {
message := &signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: command,
}
return c.WriteJSON(message)
}

47
cmd/proxy/proxy_tokens.go Normal file
View file

@ -0,0 +1,47 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"crypto/rsa"
"github.com/dlintw/goconf"
)
const (
TokenTypeEtcd = "etcd"
TokenTypeStatic = "static"
TokenTypeDefault = TokenTypeStatic
)
type ProxyToken struct {
id string
key *rsa.PublicKey
}
type ProxyTokens interface {
Get(id string) (*ProxyToken, error)
Reload(config *goconf.ConfigFile)
Close()
}

View file

@ -0,0 +1,172 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/dlintw/goconf"
"github.com/golang-jwt/jwt/v5"
"github.com/strukturag/nextcloud-spreed-signaling/container"
"github.com/strukturag/nextcloud-spreed-signaling/etcd"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
const (
tokenCacheSize = 4096
)
type tokenCacheEntry struct {
keyValue []byte
token *ProxyToken
}
type tokensEtcd struct {
logger log.Logger
client etcd.Client
tokenFormats atomic.Value
tokenCache *container.LruCache[*tokenCacheEntry]
}
func NewProxyTokensEtcd(logger log.Logger, config *goconf.ConfigFile) (ProxyTokens, error) {
client, err := etcd.NewClient(logger, config, "tokens")
if err != nil {
return nil, err
}
if !client.IsConfigured() {
return nil, errors.New("no etcd endpoints configured")
}
result := &tokensEtcd{
logger: logger,
client: client,
tokenCache: container.NewLruCache[*tokenCacheEntry](tokenCacheSize),
}
if err := result.load(config, false); err != nil {
return nil, err
}
return result, nil
}
func (t *tokensEtcd) getKeys(id string) []string {
format := t.tokenFormats.Load().([]string)
var result []string
for _, f := range format {
result = append(result, fmt.Sprintf(f, id))
}
return result
}
func (t *tokensEtcd) getByKey(id string, key string) (*ProxyToken, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := t.client.Get(ctx, key)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, nil
} else if len(resp.Kvs) > 1 {
t.logger.Printf("Received multiple keys for %s, using last", key)
}
keyValue := resp.Kvs[len(resp.Kvs)-1].Value
cached := t.tokenCache.Get(key)
if cached == nil || !bytes.Equal(cached.keyValue, keyValue) {
// Parsed public keys are cached to avoid the parse overhead.
publicKey, err := jwt.ParseRSAPublicKeyFromPEM(keyValue)
if err != nil {
return nil, err
}
cached = &tokenCacheEntry{
keyValue: keyValue,
token: &ProxyToken{
id: id,
key: publicKey,
},
}
t.tokenCache.Set(key, cached)
}
return cached.token, nil
}
func (t *tokensEtcd) Get(id string) (*ProxyToken, error) {
for _, k := range t.getKeys(id) {
token, err := t.getByKey(id, k)
if err != nil {
t.logger.Printf("Could not get public key from %s for %s: %s", k, id, err)
continue
} else if token == nil {
continue
}
return token, nil
}
return nil, nil
}
func (t *tokensEtcd) load(config *goconf.ConfigFile, ignoreErrors bool) error {
tokenFormat, _ := config.GetString("tokens", "keyformat")
formats := strings.Split(tokenFormat, ",")
var tokenFormats []string
for _, f := range formats {
f = strings.TrimSpace(f)
if f != "" {
tokenFormats = append(tokenFormats, f)
}
}
if len(tokenFormats) == 0 {
tokenFormats = []string{"/%s"}
}
t.tokenFormats.Store(tokenFormats)
t.logger.Printf("Using %v as token formats", tokenFormats)
return nil
}
func (t *tokensEtcd) Reload(config *goconf.ConfigFile) {
if err := t.load(config, true); err != nil {
t.logger.Printf("Error reloading etcd tokens: %s", err)
}
}
func (t *tokensEtcd) Close() {
if err := t.client.Close(); err != nil {
t.logger.Printf("Error while closing etcd client: %s", err)
}
}

View file

@ -0,0 +1,155 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"crypto"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"net"
"net/url"
"os"
"strconv"
"testing"
"github.com/dlintw/goconf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/lease"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/test"
)
var (
etcdListenUrl = "http://localhost:8080"
)
func newEtcdForTesting(t *testing.T) *embed.Etcd {
cfg := embed.NewConfig()
cfg.Dir = t.TempDir()
os.Chmod(cfg.Dir, 0700) // nolint
cfg.LogLevel = "warn"
cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel)))
u, err := url.Parse(etcdListenUrl)
require.NoError(t, err)
// Find a free port to bind the server to.
var etcd *embed.Etcd
for port := 50000; port < 50100; port++ {
u.Host = net.JoinHostPort("localhost", strconv.Itoa(port))
cfg.ListenClientUrls = []url.URL{*u}
httpListener := u
httpListener.Host = net.JoinHostPort("localhost", strconv.Itoa(port+1))
cfg.ListenClientHttpUrls = []url.URL{*httpListener}
peerListener := u
peerListener.Host = net.JoinHostPort("localhost", strconv.Itoa(port+2))
cfg.ListenPeerUrls = []url.URL{*peerListener}
etcd, err = embed.StartEtcd(cfg)
if test.IsErrorAddressAlreadyInUse(err) {
continue
}
require.NoError(t, err)
break
}
require.NotNil(t, etcd, "could not find free port")
t.Cleanup(func() {
etcd.Close()
<-etcd.Server.StopNotify()
})
// Wait for server to be ready.
<-etcd.Server.ReadyNotify()
return etcd
}
func newTokensEtcdForTesting(t *testing.T) (*tokensEtcd, *embed.Etcd) {
etcd := newEtcdForTesting(t)
cfg := goconf.NewConfigFile()
cfg.AddOption("etcd", "endpoints", etcd.Config().ListenClientUrls[0].String())
cfg.AddOption("tokens", "keyformat", "/%s, /testing/%s/key")
logger := log.NewLoggerForTest(t)
tokens, err := NewProxyTokensEtcd(logger, cfg)
require.NoError(t, err)
t.Cleanup(func() {
tokens.Close()
})
return tokens.(*tokensEtcd), etcd
}
func storeKey(t *testing.T, etcd *embed.Etcd, key string, pubkey crypto.PublicKey) {
var data []byte
var err error
switch pubkey := pubkey.(type) {
case rsa.PublicKey:
data, err = x509.MarshalPKIXPublicKey(&pubkey)
require.NoError(t, err)
default:
require.Fail(t, "unknown key type", "type %T in %+v", pubkey, pubkey)
}
data = pem.EncodeToMemory(&pem.Block{
Type: "RSA PUBLIC KEY",
Bytes: data,
})
if kv := etcd.Server.KV(); kv != nil {
kv.Put([]byte(key), data, lease.NoLease)
kv.Commit()
}
}
func generateAndSaveKey(t *testing.T, etcd *embed.Etcd, name string) *rsa.PrivateKey {
key, err := rsa.GenerateKey(rand.Reader, 1024)
require.NoError(t, err)
storeKey(t, etcd, name, key.PublicKey)
return key
}
func TestProxyTokensEtcd(t *testing.T) {
t.Parallel()
assert := assert.New(t)
tokens, etcd := newTokensEtcdForTesting(t)
key1 := generateAndSaveKey(t, etcd, "/foo")
key2 := generateAndSaveKey(t, etcd, "/testing/bar/key")
if token, err := tokens.Get("foo"); assert.NoError(err) && assert.NotNil(token) {
assert.True(key1.PublicKey.Equal(token.key))
}
if token, err := tokens.Get("bar"); assert.NoError(err) && assert.NotNil(token) {
assert.True(key2.PublicKey.Equal(token.key))
}
}

View file

@ -0,0 +1,131 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"fmt"
"os"
"slices"
"sync/atomic"
"github.com/dlintw/goconf"
"github.com/golang-jwt/jwt/v5"
"github.com/strukturag/nextcloud-spreed-signaling/config"
"github.com/strukturag/nextcloud-spreed-signaling/log"
)
type tokensStatic struct {
logger log.Logger
tokenKeys atomic.Value
}
func NewProxyTokensStatic(logger log.Logger, config *goconf.ConfigFile) (ProxyTokens, error) {
result := &tokensStatic{
logger: logger,
}
if err := result.load(config, false); err != nil {
return nil, err
}
return result, nil
}
func (t *tokensStatic) setTokenKeys(keys map[string]*ProxyToken) {
t.tokenKeys.Store(keys)
}
func (t *tokensStatic) getTokenKeys() map[string]*ProxyToken {
return t.tokenKeys.Load().(map[string]*ProxyToken)
}
func (t *tokensStatic) Get(id string) (*ProxyToken, error) {
tokenKeys := t.getTokenKeys()
token := tokenKeys[id]
return token, nil
}
func (t *tokensStatic) load(cfg *goconf.ConfigFile, ignoreErrors bool) error {
options, err := config.GetStringOptions(cfg, "tokens", ignoreErrors)
if err != nil {
return err
}
tokenKeys := make(map[string]*ProxyToken)
for id, filename := range options {
if filename == "" {
if !ignoreErrors {
return fmt.Errorf("no filename given for token %s", id)
}
t.logger.Printf("No filename given for token %s, ignoring", id)
continue
}
keyData, err := os.ReadFile(filename)
if err != nil {
if !ignoreErrors {
return fmt.Errorf("could not read public key from %s: %s", filename, err)
}
t.logger.Printf("Could not read public key from %s, ignoring: %s", filename, err)
continue
}
key, err := jwt.ParseRSAPublicKeyFromPEM(keyData)
if err != nil {
if !ignoreErrors {
return fmt.Errorf("could not parse public key from %s: %s", filename, err)
}
t.logger.Printf("Could not parse public key from %s, ignoring: %s", filename, err)
continue
}
tokenKeys[id] = &ProxyToken{
id: id,
key: key,
}
}
if len(tokenKeys) == 0 {
t.logger.Printf("No token keys loaded")
} else {
var keyIds []string
for k := range tokenKeys {
keyIds = append(keyIds, k)
}
slices.Sort(keyIds)
t.logger.Printf("Enabled token keys: %v", keyIds)
}
t.setTokenKeys(tokenKeys)
return nil
}
func (t *tokensStatic) Reload(config *goconf.ConfigFile) {
if err := t.load(config, true); err != nil {
t.logger.Printf("Error reloading static tokens: %s", err)
}
}
func (t *tokensStatic) Close() {
t.setTokenKeys(map[string]*ProxyToken{})
}

433
cmd/server/main.go Normal file
View file

@ -0,0 +1,433 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2017 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"context"
"crypto/tls"
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"runtime"
runtimepprof "runtime/pprof"
"sync"
"syscall"
"time"
"github.com/dlintw/goconf"
"github.com/gorilla/mux"
signaling "github.com/strukturag/nextcloud-spreed-signaling"
"github.com/strukturag/nextcloud-spreed-signaling/async/events"
"github.com/strukturag/nextcloud-spreed-signaling/config"
"github.com/strukturag/nextcloud-spreed-signaling/dns"
"github.com/strukturag/nextcloud-spreed-signaling/etcd"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
signalinglog "github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/nats"
)
var (
version = "unreleased"
configFlag = flag.String("config", "server.conf", "config file to use")
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write memory profile to file")
showVersion = flag.Bool("version", false, "show version and quit")
)
const (
defaultReadTimeout = 15
defaultWriteTimeout = 30
initialMcuRetry = time.Second
maxMcuRetry = time.Second * 16
dnsMonitorInterval = time.Second
)
func createListener(addr string) (net.Listener, error) {
if addr[0] == '/' {
os.Remove(addr)
return net.Listen("unix", addr)
}
return net.Listen("tcp", addr)
}
func createTLSListener(addr string, certFile, keyFile string) (net.Listener, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
config := tls.Config{
Certificates: []tls.Certificate{cert},
}
if addr[0] == '/' {
os.Remove(addr)
return tls.Listen("unix", addr, &config)
}
return tls.Listen("tcp", addr, &config)
}
type Listeners struct {
logger signalinglog.Logger // +checklocksignore
mu sync.Mutex
// +checklocks:mu
listeners []net.Listener
}
func (l *Listeners) Add(listener net.Listener) {
l.mu.Lock()
defer l.mu.Unlock()
l.listeners = append(l.listeners, listener)
}
func (l *Listeners) Close() {
l.mu.Lock()
defer l.mu.Unlock()
for _, listener := range l.listeners {
if err := listener.Close(); err != nil {
l.logger.Printf("Error closing listener %s: %s", listener.Addr(), err)
}
}
}
func main() {
log.SetFlags(log.Lshortfile)
flag.Parse()
if *showVersion {
fmt.Printf("nextcloud-spreed-signaling version %s/%s\n", version, runtime.Version())
os.Exit(0)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
signal.Notify(sigChan, syscall.SIGUSR1)
stopCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
logger := log.Default()
stopCtx = signalinglog.NewLoggerContext(stopCtx, logger)
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
logger.Fatal(err)
}
if err := runtimepprof.StartCPUProfile(f); err != nil {
logger.Fatalf("Error writing CPU profile to %s: %s", *cpuprofile, err)
}
logger.Printf("Writing CPU profile to %s ...", *cpuprofile)
defer runtimepprof.StopCPUProfile()
}
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
logger.Fatal(err)
}
defer func() {
logger.Printf("Writing Memory profile to %s ...", *memprofile)
runtime.GC()
if err := runtimepprof.WriteHeapProfile(f); err != nil {
logger.Printf("Error writing Memory profile to %s: %s", *memprofile, err)
}
}()
}
logger.Printf("Starting up version %s/%s as pid %d", version, runtime.Version(), os.Getpid())
cfg, err := goconf.ReadConfigFile(*configFlag)
if err != nil {
logger.Fatal("Could not read configuration: ", err)
}
logger.Printf("Using a maximum of %d CPUs", runtime.GOMAXPROCS(0))
signaling.RegisterStats()
natsUrl, _ := config.GetStringOptionWithEnv(cfg, "nats", "url")
if natsUrl == "" {
natsUrl = nats.DefaultURL
}
events, err := events.NewAsyncEvents(stopCtx, natsUrl)
if err != nil {
logger.Fatal("Could not create async events client: ", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := events.Close(ctx); err != nil {
logger.Printf("Error closing events handler: %s", err)
}
}()
dnsMonitor, err := dns.NewMonitor(logger, dnsMonitorInterval, nil)
if err != nil {
logger.Fatal("Could not create DNS monitor: ", err)
}
if err := dnsMonitor.Start(); err != nil {
logger.Fatal("Could not start DNS monitor: ", err)
}
defer dnsMonitor.Stop()
etcdClient, err := etcd.NewClient(logger, cfg, "mcu")
if err != nil {
logger.Fatalf("Could not create etcd client: %s", err)
}
defer func() {
if err := etcdClient.Close(); err != nil {
logger.Printf("Error while closing etcd client: %s", err)
}
}()
rpcServer, err := signaling.NewGrpcServer(stopCtx, cfg, version)
if err != nil {
logger.Fatalf("Could not create RPC server: %s", err)
}
go func() {
if err := rpcServer.Run(); err != nil {
logger.Fatalf("Could not start RPC server: %s", err)
}
}()
defer rpcServer.Close()
rpcClients, err := signaling.NewGrpcClients(stopCtx, cfg, etcdClient, dnsMonitor, version)
if err != nil {
logger.Fatalf("Could not create RPC clients: %s", err)
}
defer rpcClients.Close()
r := mux.NewRouter()
hub, err := signaling.NewHub(stopCtx, cfg, events, rpcServer, rpcClients, etcdClient, r, version)
if err != nil {
logger.Fatal("Could not create hub: ", err)
}
mcuUrl, _ := config.GetStringOptionWithEnv(cfg, "mcu", "url")
mcuType, _ := cfg.GetString("mcu", "type")
if mcuType == "" && mcuUrl != "" {
logger.Printf("WARNING: Old-style MCU configuration detected with url but no type, defaulting to type %s", signaling.McuTypeJanus)
mcuType = signaling.McuTypeJanus
} else if mcuType == signaling.McuTypeJanus && mcuUrl == "" {
logger.Printf("WARNING: Old-style MCU configuration detected with type but no url, disabling")
mcuType = ""
}
if mcuType != "" {
var mcu signaling.Mcu
mcuRetry := initialMcuRetry
mcuRetryTimer := time.NewTimer(mcuRetry)
mcuTypeLoop:
for {
// Context should be cancelled on signals but need a way to differentiate later.
ctx := context.TODO()
switch mcuType {
case signaling.McuTypeJanus:
mcu, err = signaling.NewMcuJanus(ctx, mcuUrl, cfg)
signaling.UnregisterProxyMcuStats()
signaling.RegisterJanusMcuStats()
case signaling.McuTypeProxy:
mcu, err = signaling.NewMcuProxy(ctx, cfg, etcdClient, rpcClients, dnsMonitor)
signaling.UnregisterJanusMcuStats()
signaling.RegisterProxyMcuStats()
default:
logger.Fatal("Unsupported MCU type: ", mcuType)
}
if err == nil {
err = mcu.Start(ctx)
if err != nil {
logger.Printf("Could not create %s MCU: %s", mcuType, err)
}
}
if err == nil {
break
}
logger.Printf("Could not initialize %s MCU (%s) will retry in %s", mcuType, err, mcuRetry)
mcuRetryTimer.Reset(mcuRetry)
select {
case <-stopCtx.Done():
logger.Fatalf("Cancelled")
case sig := <-sigChan:
switch sig {
case syscall.SIGHUP:
logger.Printf("Received SIGHUP, reloading %s", *configFlag)
if cfg, err = goconf.ReadConfigFile(*configFlag); err != nil {
logger.Printf("Could not read configuration from %s: %s", *configFlag, err)
} else {
mcuUrl, _ = config.GetStringOptionWithEnv(cfg, "mcu", "url")
mcuType, _ = cfg.GetString("mcu", "type")
if mcuType == "" && mcuUrl != "" {
logger.Printf("WARNING: Old-style MCU configuration detected with url but no type, defaulting to type %s", signaling.McuTypeJanus)
mcuType = signaling.McuTypeJanus
} else if mcuType == signaling.McuTypeJanus && mcuUrl == "" {
logger.Printf("WARNING: Old-style MCU configuration detected with type but no url, disabling")
mcuType = ""
break mcuTypeLoop
}
}
}
case <-mcuRetryTimer.C:
// Retry connection
mcuRetry = min(mcuRetry*2, maxMcuRetry)
}
}
if mcu != nil {
defer mcu.Stop()
logger.Printf("Using %s MCU", mcuType)
hub.SetMcu(mcu)
}
}
go hub.Run()
defer hub.Stop()
server, err := signaling.NewBackendServer(stopCtx, cfg, hub, version)
if err != nil {
logger.Fatal("Could not create backend server: ", err)
}
if err := server.Start(r); err != nil {
logger.Fatal("Could not start backend server: ", err)
}
listeners := Listeners{
logger: logger,
}
if saddr, _ := config.GetStringOptionWithEnv(cfg, "https", "listen"); saddr != "" {
cert, _ := cfg.GetString("https", "certificate")
key, _ := cfg.GetString("https", "key")
if cert == "" || key == "" {
logger.Fatal("Need a certificate and key for the HTTPS listener")
}
readTimeout, _ := cfg.GetInt("https", "readtimeout")
if readTimeout <= 0 {
readTimeout = defaultReadTimeout
}
writeTimeout, _ := cfg.GetInt("https", "writetimeout")
if writeTimeout <= 0 {
writeTimeout = defaultWriteTimeout
}
for address := range internal.SplitEntries(saddr, " ") {
go func(address string) {
logger.Println("Listening on", address)
listener, err := createTLSListener(address, cert, key)
if err != nil {
logger.Fatal("Could not start listening: ", err)
}
srv := &http.Server{
Handler: r,
ReadTimeout: time.Duration(readTimeout) * time.Second,
WriteTimeout: time.Duration(writeTimeout) * time.Second,
}
listeners.Add(listener)
if err := srv.Serve(listener); err != nil {
if !hub.IsShutdownScheduled() || !errors.Is(err, net.ErrClosed) {
logger.Fatal("Could not start server: ", err)
}
}
}(address)
}
}
if addr, _ := config.GetStringOptionWithEnv(cfg, "http", "listen"); addr != "" {
readTimeout, _ := cfg.GetInt("http", "readtimeout")
if readTimeout <= 0 {
readTimeout = defaultReadTimeout
}
writeTimeout, _ := cfg.GetInt("http", "writetimeout")
if writeTimeout <= 0 {
writeTimeout = defaultWriteTimeout
}
for address := range internal.SplitEntries(addr, " ") {
go func(address string) {
logger.Println("Listening on", address)
listener, err := createListener(address)
if err != nil {
logger.Fatal("Could not start listening: ", err)
}
srv := &http.Server{
Handler: r,
Addr: addr,
ReadTimeout: time.Duration(readTimeout) * time.Second,
WriteTimeout: time.Duration(writeTimeout) * time.Second,
}
listeners.Add(listener)
if err := srv.Serve(listener); err != nil {
if !hub.IsShutdownScheduled() || !errors.Is(err, net.ErrClosed) {
logger.Fatal("Could not start server: ", err)
}
}
}(address)
}
}
loop:
for {
select {
case <-stopCtx.Done():
logger.Println("Interrupted")
break loop
case sig := <-sigChan:
switch sig {
case syscall.SIGHUP:
logger.Printf("Received SIGHUP, reloading %s", *configFlag)
if config, err := goconf.ReadConfigFile(*configFlag); err != nil {
logger.Printf("Could not read configuration from %s: %s", *configFlag, err)
} else {
hub.Reload(stopCtx, config)
server.Reload(config)
}
case syscall.SIGUSR1:
logger.Printf("Received SIGUSR1, scheduling server to shutdown")
hub.ScheduleShutdown()
listeners.Close()
}
case <-hub.ShutdownChannel():
logger.Printf("All clients disconnected, shutting down")
break loop
}
}
}