Browse Source

Add proxy service implementation.

pull/36/head
Joachim Bauch 7 months ago
parent
commit
b7c258b459
Failed to extract signature
9 changed files with 1506 additions and 1 deletions
  1. +2
    -0
      .gitignore
  2. +5
    -1
      Makefile
  3. +1
    -0
      dependencies.tsv
  4. +55
    -0
      proxy.conf.in
  5. +153
    -0
      src/proxy/main.go
  6. +55
    -0
      src/proxy/proxy_client.go
  7. +962
    -0
      src/proxy/proxy_server.go
  8. +272
    -0
      src/proxy/proxy_session.go
  9. +1
    -0
      src/signaling/api_signaling.go

+ 2
- 0
.gitignore View File

@ -2,9 +2,11 @@ bin/
vendor/
*_easyjson.go
*.pem
*.prof
*.socket
*.tar.gz
cover.out
proxy.conf
server.conf

+ 5
- 1
Makefile View File

@ -109,10 +109,14 @@ server: dependencies common
mkdir -p $(BINDIR)
GOPATH=$(GOPATH) $(GO) build $(BUILDARGS) -ldflags '$(INTERNALLDFLAGS)' -o $(BINDIR)/signaling ./src/server/...
proxy: dependencies common
mkdir -p $(BINDIR)
GOPATH=$(GOPATH) $(GO) build $(BUILDARGS) -ldflags '$(INTERNALLDFLAGS)' -o $(BINDIR)/proxy ./src/proxy/...
clean:
rm -f src/signaling/*_easyjson.go
build: server
build: server proxy
tarball:
git archive \


+ 1
- 0
dependencies.tsv View File

@ -1,4 +1,5 @@
github.com/dlintw/goconf git dcc070983490608a14480e3bf943bad464785df5 2012-02-28T08:26:10Z
github.com/google/uuid git 0e4e31197428a347842d152773b4cace4645ca25 2020-07-02T18:56:42Z
github.com/gorilla/context git 08b5f424b9271eedf6f9f0ce86cb9396ed337a42 2016-08-17T18:46:32Z
github.com/gorilla/mux git ac112f7d75a0714af1bd86ab17749b31f7809640 2017-07-04T07:43:45Z
github.com/gorilla/securecookie git e59506cc896acb7f7bf732d4fdf5e25f7ccd8983 2017-02-24T19:38:04Z


+ 55
- 0
proxy.conf.in View File

@ -0,0 +1,55 @@
[http]
# IP and port to listen on for HTTP requests.
# Comment line to disable the listener.
#listen = 127.0.0.1:9090
[app]
# Set to "true" to install pprof debug handlers.
# See "https://golang.org/pkg/net/http/pprof/" for further information.
#debug = false
# ISO 3166 country this proxy is located at. This will be used by the signaling
# servers to determine the closest proxy for publishers.
#country = DE
[sessions]
# Secret value used to generate checksums of sessions. This should be a random
# string of 32 or 64 bytes.
hashkey = secret-for-session-checksums
# Optional key for encrypting data in the sessions. Must be either 16, 24 or
# 32 bytes.
# If no key is specified, data will not be encrypted (not recommended).
blockkey = -encryption-key-
[nats]
# Url of NATS backend to use. This can also be a list of URLs to connect to
# multiple backends. For local development, this can be set to ":loopback:"
# to process NATS messages internally instead of sending them through an
# external NATS backend.
#url = nats://localhost:4222
[tokens]
# Mapping of <tokenid> = <publickey> of signaling servers allowed to connect.
#server1 = pubkey1.pem
#server2 = pubkey2.pem
[mcu]
# The type of the MCU to use. Currently only "janus" is supported.
type = janus
# The URL to the websocket endpoint of the MCU server.
url = ws://localhost:8188/
# The maximum bitrate per publishing stream (in bits per second).
# Defaults to 1 mbit/sec.
#maxstreambitrate = 1048576
# The maximum bitrate per screensharing stream (in bits per second).
# Default is 2 mbit/sec.
#maxscreenbitrate = 2097152
[stats]
# Comma-separated list of IP addresses that are allowed to access the stats
# endpoint. Leave empty (or commented) to only allow access from "127.0.0.1".
#allowed_ips =

+ 153
- 0
src/proxy/main.go View File

@ -0,0 +1,153 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/dlintw/goconf"
"github.com/gorilla/mux"
"github.com/nats-io/go-nats"
"signaling"
)
var (
version = "unreleased"
configFlag = flag.String("config", "proxy.conf", "config file to use")
showVersion = flag.Bool("version", false, "show version and quit")
)
const (
defaultReadTimeout = 15
defaultWriteTimeout = 15
proxyDebugMessages = false
)
func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile)
flag.Parse()
if *showVersion {
fmt.Printf("nextcloud-spreed-signaling-proxy version %s/%s\n", version, runtime.Version())
os.Exit(0)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
signal.Notify(sigChan, syscall.SIGHUP)
log.Printf("Starting up version %s/%s as pid %d", version, runtime.Version(), os.Getpid())
config, err := goconf.ReadConfigFile(*configFlag)
if err != nil {
log.Fatal("Could not read configuration: ", err)
}
cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
log.Printf("Using a maximum of %d CPUs\n", cpus)
natsUrl, _ := config.GetString("nats", "url")
if natsUrl == "" {
natsUrl = nats.DefaultURL
}
nats, err := signaling.NewNatsClient(natsUrl)
if err != nil {
log.Fatal("Could not create NATS client: ", err)
}
r := mux.NewRouter()
proxy, err := NewProxyServer(r, version, config, nats)
if err != nil {
log.Fatal(err)
}
if err := proxy.Start(config); err != nil {
log.Fatal(err)
}
defer proxy.Stop()
if addr, _ := config.GetString("http", "listen"); addr != "" {
readTimeout, _ := config.GetInt("http", "readtimeout")
if readTimeout <= 0 {
readTimeout = defaultReadTimeout
}
writeTimeout, _ := config.GetInt("http", "writetimeout")
if writeTimeout <= 0 {
writeTimeout = defaultWriteTimeout
}
for _, address := range strings.Split(addr, " ") {
go func(address string) {
log.Println("Listening on", address)
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatal("Could not start listening: ", err)
}
srv := &http.Server{
Handler: r,
Addr: addr,
ReadTimeout: time.Duration(readTimeout) * time.Second,
WriteTimeout: time.Duration(writeTimeout) * time.Second,
}
if err := srv.Serve(listener); err != nil {
log.Fatal("Could not start server: ", err)
}
}(address)
}
}
loop:
for {
switch sig := <-sigChan; sig {
case os.Interrupt:
log.Println("Interrupted")
break loop
case syscall.SIGHUP:
log.Printf("Received SIGHUP, reloading %s", *configFlag)
config, err := goconf.ReadConfigFile(*configFlag)
if err != nil {
log.Printf("Could not read configuration from %s: %s", *configFlag, err)
continue
}
proxy.Reload(config)
}
}
}

+ 55
- 0
src/proxy/proxy_client.go View File

@ -0,0 +1,55 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"sync/atomic"
"unsafe"
"github.com/gorilla/websocket"
"signaling"
)
type ProxyClient struct {
signaling.Client
proxy *ProxyServer
session unsafe.Pointer
}
func NewProxyClient(proxy *ProxyServer, conn *websocket.Conn, addr string) (*ProxyClient, error) {
client := &ProxyClient{
proxy: proxy,
}
client.SetConn(conn, addr)
return client, nil
}
func (c *ProxyClient) GetSession() *ProxySession {
return (*ProxySession)(atomic.LoadPointer(&c.session))
}
func (c *ProxyClient) SetSession(session *ProxySession) {
atomic.StorePointer(&c.session, unsafe.Pointer(session))
}

+ 962
- 0
src/proxy/proxy_server.go View File

@ -0,0 +1,962 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"crypto/rsa"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
runtimepprof "runtime/pprof"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/dlintw/goconf"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/gorilla/securecookie"
"github.com/gorilla/websocket"
"golang.org/x/net/context"
"gopkg.in/dgrijalva/jwt-go.v3"
"signaling"
)
const (
// Buffer sizes when reading/writing websocket connections.
websocketReadBufferSize = 4096
websocketWriteBufferSize = 4096
initialMcuRetry = time.Second
maxMcuRetry = time.Second * 16
updateLoadInterval = time.Second
expireSessionsInterval = 10 * time.Second
// Maximum age a token may have to prevent reuse of old tokens.
maxTokenAge = 5 * time.Minute
)
type ContextKey string
var (
ContextKeySession = ContextKey("session")
TimeoutCreatingPublisher = signaling.NewError("timeout", "Timeout creating publisher.")
TimeoutCreatingSubscriber = signaling.NewError("timeout", "Timeout creating subscriber.")
TokenAuthFailed = signaling.NewError("auth_failed", "The token could not be authenticated.")
TokenExpired = signaling.NewError("token_expired", "The token is expired.")
UnknownClient = signaling.NewError("unknown_client", "Unknown client id given.")
UnsupportedCommand = signaling.NewError("bad_request", "Unsupported command received.")
UnsupportedMessage = signaling.NewError("bad_request", "Unsupported message received.")
UnsupportedPayload = signaling.NewError("unsupported_payload", "Unsupported payload type.")
)
type ProxyServer struct {
version string
country string
url string
nats signaling.NatsClient
mcu signaling.Mcu
stopped uint32
load int64
upgrader websocket.Upgrader
tokenKeys atomic.Value
statsAllowedIps map[string]bool
sid uint64
cookie *securecookie.SecureCookie
sessions map[uint64]*ProxySession
sessionsLock sync.RWMutex
clients map[string]signaling.McuClient
clientIds map[string]string
clientsLock sync.RWMutex
}
func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile, nats signaling.NatsClient) (*ProxyServer, error) {
hashKey, _ := config.GetString("sessions", "hashkey")
switch len(hashKey) {
case 32:
case 64:
default:
log.Printf("WARNING: The sessions hash key should be 32 or 64 bytes but is %d bytes", len(hashKey))
}
blockKey, _ := config.GetString("sessions", "blockkey")
blockBytes := []byte(blockKey)
switch len(blockKey) {
case 0:
blockBytes = nil
case 16:
case 24:
case 32:
default:
return nil, fmt.Errorf("The sessions block key must be 16, 24 or 32 bytes but is %d bytes", len(blockKey))
}
tokenKeys := make(map[string]*rsa.PublicKey)
options, _ := config.GetOptions("tokens")
for _, id := range options {
filename, _ := config.GetString("tokens", id)
if filename == "" {
return nil, fmt.Errorf("No filename given for token %s", id)
}
keyData, err := ioutil.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("Could not read public key from %s: %s", filename, err)
}
key, err := jwt.ParseRSAPublicKeyFromPEM(keyData)
if err != nil {
return nil, fmt.Errorf("Could not parse public key from %s: %s", filename, err)
}
tokenKeys[id] = key
}
var keyIds []string
for k, _ := range tokenKeys {
keyIds = append(keyIds, k)
}
sort.Strings(keyIds)
log.Printf("Enabled token keys: %v", keyIds)
statsAllowed, _ := config.GetString("stats", "allowed_ips")
var statsAllowedIps map[string]bool
if statsAllowed == "" {
log.Printf("No IPs configured for the stats endpoint, only allowing access from 127.0.0.1")
statsAllowedIps = map[string]bool{
"127.0.0.1": true,
}
} else {
log.Printf("Only allowing access to the stats endpoing from %s", statsAllowed)
statsAllowedIps = make(map[string]bool)
for _, ip := range strings.Split(statsAllowed, ",") {
ip = strings.TrimSpace(ip)
if ip != "" {
statsAllowedIps[ip] = true
}
}
}
country, _ := config.GetString("app", "country")
if country != "" {
log.Printf("Sending %s as country information", country)
} else {
log.Printf("Not sending country information")
}
result := &ProxyServer{
version: version,
country: country,
nats: nats,
upgrader: websocket.Upgrader{
ReadBufferSize: websocketReadBufferSize,
WriteBufferSize: websocketWriteBufferSize,
},
statsAllowedIps: statsAllowedIps,
cookie: securecookie.New([]byte(hashKey), blockBytes).MaxAge(0),
sessions: make(map[uint64]*ProxySession),
clients: make(map[string]signaling.McuClient),
clientIds: make(map[string]string),
}
result.setTokenKeys(tokenKeys)
result.upgrader.CheckOrigin = result.checkOrigin
if debug, _ := config.GetBool("app", "debug"); debug {
log.Println("Installing debug handlers in \"/debug/pprof\"")
r.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
r.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
r.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
r.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
r.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
for _, profile := range runtimepprof.Profiles() {
name := profile.Name()
r.Handle("/debug/pprof/"+name, pprof.Handler(name))
}
}
r.HandleFunc("/proxy", result.setCommonHeaders(result.proxyHandler)).Methods("GET")
r.HandleFunc("/stats", result.setCommonHeaders(result.validateStatsRequest(result.statsHandler))).Methods("GET")
return result, nil
}
func (s *ProxyServer) checkOrigin(r *http.Request) bool {
// We allow any Origin to connect to the service.
return true
}
func (s *ProxyServer) setTokenKeys(keys map[string]*rsa.PublicKey) {
s.tokenKeys.Store(keys)
}
func (s *ProxyServer) getTokenKeys() map[string]*rsa.PublicKey {
return s.tokenKeys.Load().(map[string]*rsa.PublicKey)
}
func (s *ProxyServer) Start(config *goconf.ConfigFile) error {
s.url, _ = config.GetString("mcu", "url")
if s.url == "" {
return fmt.Errorf("No MCU server url configured")
}
mcuType, _ := config.GetString("mcu", "type")
if mcuType == "" {
mcuType = signaling.McuTypeDefault
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
defer signal.Stop(interrupt)
var err error
var mcu signaling.Mcu
mcuRetry := initialMcuRetry
mcuRetryTimer := time.NewTimer(mcuRetry)
for {
switch mcuType {
case signaling.McuTypeJanus:
mcu, err = signaling.NewMcuJanus(s.url, config, s.nats)
default:
return fmt.Errorf("Unsupported MCU type: %s", mcuType)
}
if err == nil {
mcu.SetOnConnected(s.onMcuConnected)
mcu.SetOnDisconnected(s.onMcuDisconnected)
err = mcu.Start()
if err != nil {
log.Printf("Could not create %s MCU at %s: %s", mcuType, s.url, err)
}
}
if err == nil {
break
}
log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, mcuRetry)
mcuRetryTimer.Reset(mcuRetry)
select {
case <-interrupt:
return fmt.Errorf("Cancelled")
case <-mcuRetryTimer.C:
// Retry connection
mcuRetry = mcuRetry * 2
if mcuRetry > maxMcuRetry {
mcuRetry = maxMcuRetry
}
}
}
s.mcu = mcu
go s.run()
return nil
}
func (s *ProxyServer) run() {
updateLoadTicker := time.NewTicker(updateLoadInterval)
expireSessionsTicker := time.NewTicker(expireSessionsInterval)
loop:
for {
select {
case <-updateLoadTicker.C:
if atomic.LoadUint32(&s.stopped) != 0 {
break loop
}
s.updateLoad()
case <-expireSessionsTicker.C:
if atomic.LoadUint32(&s.stopped) != 0 {
break loop
}
s.expireSessions()
}
}
}
func (s *ProxyServer) updateLoad() {
// TODO: Take maximum bandwidth of clients into account when calculating
// load (screensharing requires more than regular audio/video).
load := s.GetClientCount()
if load == atomic.LoadInt64(&s.load) {
return
}
atomic.StoreInt64(&s.load, load)
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "update-load",
Load: load,
},
}
s.IterateSessions(func(session *ProxySession) {
session.sendMessage(msg)
})
}
func (s *ProxyServer) getExpiredSessions() []*ProxySession {
var expired []*ProxySession
s.IterateSessions(func(session *ProxySession) {
if session.IsExpired() {
expired = append(expired, session)
}
})
return expired
}
func (s *ProxyServer) expireSessions() {
expired := s.getExpiredSessions()
if len(expired) == 0 {
return
}
s.sessionsLock.Lock()
defer s.sessionsLock.Unlock()
for _, session := range expired {
if !session.IsExpired() {
// Session was used while waiting for the lock.
continue
}
log.Printf("Delete expired session %s", session.PublicId())
s.deleteSessionLocked(session.Sid())
}
}
func (s *ProxyServer) Stop() {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return
}
s.mcu.Stop()
}
func (s *ProxyServer) Reload(config *goconf.ConfigFile) {
tokenKeys := make(map[string]*rsa.PublicKey)
options, _ := config.GetOptions("tokens")
for _, id := range options {
filename, _ := config.GetString("tokens", id)
if filename == "" {
log.Printf("No filename given for token %s, ignoring", id)
continue
}
keyData, err := ioutil.ReadFile(filename)
if err != nil {
log.Printf("Could not read public key from %s, ignoring: %s", filename, err)
continue
}
key, err := jwt.ParseRSAPublicKeyFromPEM(keyData)
if err != nil {
log.Printf("Could not parse public key from %s, ignoring: %s", filename, err)
continue
}
tokenKeys[id] = key
}
if len(tokenKeys) == 0 {
log.Printf("No token keys loaded")
} else {
var keyIds []string
for k, _ := range tokenKeys {
keyIds = append(keyIds, k)
}
sort.Strings(keyIds)
log.Printf("Enabled token keys: %v", keyIds)
}
s.setTokenKeys(tokenKeys)
}
func (s *ProxyServer) setCommonHeaders(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "nextcloud-spreed-signaling-proxy/"+s.version)
f(w, r)
}
}
func getRealUserIP(r *http.Request) string {
// Note this function assumes it is running behind a trusted proxy, so
// the headers can be trusted.
if ip := r.Header.Get("X-Real-IP"); ip != "" {
return ip
}
if ip := r.Header.Get("X-Forwarded-For"); ip != "" {
// Result could be a list "clientip, proxy1, proxy2", so only use first element.
if pos := strings.Index(ip, ","); pos >= 0 {
ip = strings.TrimSpace(ip[:pos])
}
return ip
}
return r.RemoteAddr
}
func (s *ProxyServer) proxyHandler(w http.ResponseWriter, r *http.Request) {
addr := getRealUserIP(r)
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Could not upgrade request from %s: %s", addr, err)
return
}
client, err := NewProxyClient(s, conn, addr)
if err != nil {
log.Printf("Could not create client for %s: %s", addr, err)
return
}
client.OnClosed = s.clientClosed
client.OnMessageReceived = func(c *signaling.Client, data []byte) {
s.processMessage(client, data)
}
client.OnRTTReceived = func(c *signaling.Client, rtt time.Duration) {
if session := client.GetSession(); session != nil {
session.MarkUsed()
}
}
go client.WritePump()
go client.ReadPump()
}
func (s *ProxyServer) clientClosed(client *signaling.Client) {
log.Printf("Connection from %s closed", client.RemoteAddr())
}
func (s *ProxyServer) onMcuConnected() {
log.Printf("Connection to %s established", s.url)
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "backend-connected",
},
}
s.IterateSessions(func(session *ProxySession) {
session.sendMessage(msg)
})
}
func (s *ProxyServer) onMcuDisconnected() {
if atomic.LoadUint32(&s.stopped) != 0 {
// Shutting down, no need to notify.
return
}
log.Printf("Connection to %s lost", s.url)
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "backend-disconnected",
},
}
s.IterateSessions(func(session *ProxySession) {
session.sendMessage(msg)
session.NotifyDisconnected()
})
}
func (s *ProxyServer) sendCurrentLoad(session *ProxySession) {
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "update-load",
Load: atomic.LoadInt64(&s.load),
},
}
session.sendMessage(msg)
}
func (s *ProxyServer) processMessage(client *ProxyClient, data []byte) {
if proxyDebugMessages {
log.Printf("Message: %s", string(data))
}
var message signaling.ProxyClientMessage
if err := message.UnmarshalJSON(data); err != nil {
if session := client.GetSession(); session != nil {
log.Printf("Error decoding message from client %s: %v", session.PublicId(), err)
} else {
log.Printf("Error decoding message from %s: %v", client.RemoteAddr(), err)
}
client.SendError(signaling.InvalidFormat)
return
}
if err := message.CheckValid(); err != nil {
if session := client.GetSession(); session != nil {
log.Printf("Invalid message %+v from client %s: %v", message, session.PublicId(), err)
} else {
log.Printf("Invalid message %+v from %s: %v", message, client.RemoteAddr(), err)
}
client.SendMessage(message.NewErrorServerMessage(signaling.InvalidFormat))
return
}
session := client.GetSession()
if session == nil {
if message.Type != "hello" {
client.SendMessage(message.NewErrorServerMessage(signaling.HelloExpected))
return
}
var session *ProxySession
if resumeId := message.Hello.ResumeId; resumeId != "" {
var data signaling.SessionIdData
if s.cookie.Decode("session", resumeId, &data) == nil {
session = s.GetSession(data.Sid)
}
if session == nil {
client.SendMessage(message.NewErrorServerMessage(signaling.NoSuchSession))
return
}
log.Printf("Resumed session %s", session.PublicId())
s.sendCurrentLoad(session)
} else {
var err error
if session, err = s.NewSession(message.Hello); err != nil {
if e, ok := err.(*signaling.Error); ok {
client.SendMessage(message.NewErrorServerMessage(e))
} else {
client.SendMessage(message.NewWrappedErrorServerMessage(err))
}
return
}
}
prev := session.SetClient(client)
if prev != nil {
msg := &signaling.ProxyServerMessage{
Type: "bye",
Bye: &signaling.ByeProxyServerMessage{
Reason: "session_resumed",
},
}
prev.SendMessage(msg)
}
response := &signaling.ProxyServerMessage{
Id: message.Id,
Type: "hello",
Hello: &signaling.HelloProxyServerMessage{
Version: signaling.HelloVersion,
SessionId: session.PublicId(),
Server: &signaling.HelloServerMessageServer{
Version: s.version,
Country: s.country,
},
},
}
client.SendMessage(response)
s.sendCurrentLoad(session)
return
}
ctx := context.WithValue(context.Background(), ContextKeySession, session)
session.MarkUsed()
switch message.Type {
case "command":
s.processCommand(ctx, client, session, &message)
case "payload":
s.processPayload(ctx, client, session, &message)
default:
session.sendMessage(message.NewErrorServerMessage(UnsupportedMessage))
}
}
func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) {
cmd := message.Command
switch cmd.Type {
case "create-publisher":
id := uuid.New().String()
publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType)
if err == context.DeadlineExceeded {
log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId())
session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher))
return
} else if err != nil {
log.Printf("Error while creating %s publisher %s for %s: %s", cmd.StreamType, id, session.PublicId(), err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
}
log.Printf("Created %s publisher %s as %s", cmd.StreamType, publisher.Id(), id)
session.StorePublisher(ctx, id, publisher)
s.StoreClient(id, publisher)
response := &signaling.ProxyServerMessage{
Id: message.Id,
Type: "command",
Command: &signaling.CommandProxyServerMessage{
Id: id,
},
}
session.sendMessage(response)
case "create-subscriber":
id := uuid.New().String()
publisherId := cmd.PublisherId
subscriber, err := s.mcu.NewSubscriber(ctx, session, publisherId, cmd.StreamType)
if err == context.DeadlineExceeded {
log.Printf("Timeout while creating %s subscriber on %s for %s", cmd.StreamType, publisherId, session.PublicId())
session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingSubscriber))
return
} else if err != nil {
log.Printf("Error while creating %s subscriber on %s for %s: %s", cmd.StreamType, publisherId, session.PublicId(), err)
session.sendMessage(message.NewWrappedErrorServerMessage(err))
return
}
log.Printf("Created %s subscriber %s as %s", cmd.StreamType, subscriber.Id(), id)
session.StoreSubscriber(ctx, id, subscriber)
s.StoreClient(id, subscriber)
response := &signaling.ProxyServerMessage{
Id: message.Id,
Type: "command",
Command: &signaling.CommandProxyServerMessage{
Id: id,
},
}
session.sendMessage(response)
case "delete-publisher":
client := s.GetClient(cmd.ClientId)
if client == nil {
session.sendMessage(message.NewErrorServerMessage(UnknownClient))
return
}
if session.DeletePublisher(client) == "" {
session.sendMessage(message.NewErrorServerMessage(UnknownClient))
return
}
s.DeleteClient(cmd.ClientId, client)
go func() {
log.Printf("Closing %s publisher %s as %s", client.StreamType(), client.Id(), cmd.ClientId)
client.Close(context.Background())
}()
response := &signaling.ProxyServerMessage{
Id: message.Id,
Type: "command",
Command: &signaling.CommandProxyServerMessage{
Id: cmd.ClientId,
},
}
session.sendMessage(response)
case "delete-subscriber":
client := s.GetClient(cmd.ClientId)
if client == nil {
session.sendMessage(message.NewErrorServerMessage(UnknownClient))
return
}
subscriber, ok := client.(signaling.McuSubscriber)
if !ok {
session.sendMessage(message.NewErrorServerMessage(UnknownClient))
return
}
if session.DeleteSubscriber(subscriber) == "" {
session.sendMessage(message.NewErrorServerMessage(UnknownClient))
return
}
s.DeleteClient(cmd.ClientId, client)
go func() {
log.Printf("Closing %s subscriber %s as %s", client.StreamType(), client.Id(), cmd.ClientId)
client.Close(context.Background())
}()
response := &signaling.ProxyServerMessage{
Id: message.Id,
Type: "command",
Command: &signaling.CommandProxyServerMessage{
Id: cmd.ClientId,
},
}
session.sendMessage(response)
default:
log.Printf("Unsupported command %+v", message.Command)
session.sendMessage(message.NewErrorServerMessage(UnsupportedCommand))
}
}
func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) {
payload := message.Payload
mcuClient := s.GetClient(payload.ClientId)
if mcuClient == nil {
session.sendMessage(message.NewErrorServerMessage(UnknownClient))
return
}
var mcuData *signaling.MessageClientMessageData
switch payload.Type {
case "offer":
fallthrough
case "answer":
fallthrough
case "candidate":
mcuData = &signaling.MessageClientMessageData{
Type: payload.Type,
Payload: payload.Payload,
}
case "endOfCandidates":
// Ignore but confirm, not passed along to Janus anyway.
session.sendMessage(&signaling.ProxyServerMessage{
Id: message.Id,
Type: "payload",
Payload: &signaling.PayloadProxyServerMessage{
Type: payload.Type,
ClientId: payload.ClientId,
},
})
return
case "requestoffer":
fallthrough
case "sendoffer":
mcuData = &signaling.MessageClientMessageData{
Type: payload.Type,
}
default:
session.sendMessage(message.NewErrorServerMessage(UnsupportedPayload))
return
}
mcuClient.SendMessage(ctx, nil, mcuData, func(err error, response map[string]interface{}) {
var responseMsg *signaling.ProxyServerMessage
if err != nil {
log.Printf("Error sending %s to %s client %s: %s", mcuData, mcuClient.StreamType(), payload.ClientId, err)
responseMsg = message.NewWrappedErrorServerMessage(err)
} else {
responseMsg = &signaling.ProxyServerMessage{
Id: message.Id,
Type: "payload",
Payload: &signaling.PayloadProxyServerMessage{
Type: payload.Type,
ClientId: payload.ClientId,
Payload: response,
},
}
}
session.sendMessage(responseMsg)
})
}
func (s *ProxyServer) NewSession(hello *signaling.HelloProxyClientMessage) (*ProxySession, error) {
if proxyDebugMessages {
log.Printf("Hello: %+v", hello)
}
token, err := jwt.ParseWithClaims(hello.Token, &signaling.TokenClaims{}, func(token *jwt.Token) (interface{}, error) {
// Don't forget to validate the alg is what you expect:
if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok {
log.Printf("Unexpected signing method: %v", token.Header["alg"])
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
}
claims, ok := token.Claims.(*signaling.TokenClaims)
if !ok {
log.Printf("Unsupported claims type: %+v", token.Claims)
return nil, fmt.Errorf("Unsupported claims type")
}
tokenKeys := s.getTokenKeys()
publicKey := tokenKeys[claims.Issuer]
if publicKey == nil {
log.Printf("Issuer %s is not supported", claims.Issuer)
return nil, fmt.Errorf("No key found for issuer")
}
return publicKey, nil
})
if err != nil {
return nil, TokenAuthFailed
}
claims, ok := token.Claims.(*signaling.TokenClaims)
if !ok || !token.Valid {
return nil, TokenAuthFailed
}
minIssuedAt := time.Now().Add(-maxTokenAge)
if issuedAt := time.Unix(claims.IssuedAt, 0); issuedAt.Before(minIssuedAt) {
return nil, TokenExpired
}
sid := atomic.AddUint64(&s.sid, 1)
for sid == 0 {
sid = atomic.AddUint64(&s.sid, 1)
}
sessionIdData := &signaling.SessionIdData{
Sid: sid,
Created: time.Now(),
}
encoded, err := s.cookie.Encode("session", sessionIdData)
if err != nil {
return nil, err
}
log.Printf("Created session %s for %+v", encoded, claims)
session := NewProxySession(s, sid, encoded)
s.StoreSession(sid, session)
return session, nil
}
func (s *ProxyServer) StoreSession(id uint64, session *ProxySession) {
s.sessionsLock.Lock()
defer s.sessionsLock.Unlock()
s.sessions[id] = session
}
func (s *ProxyServer) GetSession(id uint64) *ProxySession {
s.sessionsLock.RLock()
defer s.sessionsLock.RUnlock()
return s.sessions[id]
}
func (s *ProxyServer) GetSessionsCount() int64 {
s.sessionsLock.RLock()
defer s.sessionsLock.RUnlock()
return int64(len(s.sessions))
}
func (s *ProxyServer) IterateSessions(f func(*ProxySession)) {
s.sessionsLock.RLock()
defer s.sessionsLock.RUnlock()
for _, session := range s.sessions {
f(session)
}
}
func (s *ProxyServer) DeleteSession(id uint64) {
s.sessionsLock.Lock()
defer s.sessionsLock.Unlock()
s.deleteSessionLocked(id)
}
func (s *ProxyServer) deleteSessionLocked(id uint64) {
delete(s.sessions, id)
}
func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) {
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
s.clients[id] = client
s.clientIds[client.Id()] = id
}
func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) {
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
delete(s.clients, id)
delete(s.clientIds, client.Id())
}
func (s *ProxyServer) GetClientCount() int64 {
s.clientsLock.RLock()
defer s.clientsLock.RUnlock()
return int64(len(s.clients))
}
func (s *ProxyServer) GetClient(id string) signaling.McuClient {
s.clientsLock.RLock()
defer s.clientsLock.RUnlock()
return s.clients[id]
}
func (s *ProxyServer) GetClientId(client signaling.McuClient) string {
s.clientsLock.RLock()
defer s.clientsLock.RUnlock()
return s.clientIds[client.Id()]
}
func (s *ProxyServer) getStats() map[string]interface{} {
result := map[string]interface{}{
"sessions": s.GetSessionsCount(),
"mcu": s.mcu.GetStats(),
}
return result
}
func (s *ProxyServer) validateStatsRequest(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
addr := getRealUserIP(r)
if strings.Contains(addr, ":") {
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
}
if !s.statsAllowedIps[addr] {
http.Error(w, "Authentication check failed", http.StatusForbidden)
return
}
f(w, r)
}
}
func (s *ProxyServer) statsHandler(w http.ResponseWriter, r *http.Request) {
stats := s.getStats()
statsData, err := json.MarshalIndent(stats, "", " ")
if err != nil {
log.Printf("Could not serialize stats %+v: %s", stats, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
w.Write(statsData)
}

+ 272
- 0
src/proxy/proxy_session.go View File

@ -0,0 +1,272 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"log"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/context"
"signaling"
)
const (
// Sessions expire if they have not been used for one minute.
sessionExpirationTime = time.Minute
)
type ProxySession struct {
proxy *ProxyServer
id string
sid uint64
lastUsed int64
clientLock sync.Mutex
client *ProxyClient
pendingMessages []*signaling.ProxyServerMessage
publishersLock sync.Mutex
publishers map[string]signaling.McuPublisher
publisherIds map[string]string
subscribersLock sync.Mutex
subscribers map[string]signaling.McuSubscriber
subscriberIds map[string]string
}
func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession {
return &ProxySession{
proxy: proxy,
id: id,
sid: sid,
lastUsed: time.Now().UnixNano(),
publishers: make(map[string]signaling.McuPublisher),
publisherIds: make(map[string]string),
subscribers: make(map[string]signaling.McuSubscriber),
subscriberIds: make(map[string]string),
}
}
func (s *ProxySession) PublicId() string {
return s.id
}
func (s *ProxySession) Sid() uint64 {
return s.sid
}
func (s *ProxySession) LastUsed() time.Time {
lastUsed := atomic.LoadInt64(&s.lastUsed)
return time.Unix(0, lastUsed)
}
func (s *ProxySession) IsExpired() bool {
expiresAt := s.LastUsed().Add(sessionExpirationTime)
return expiresAt.Before(time.Now())
}
func (s *ProxySession) MarkUsed() {
now := time.Now()
atomic.StoreInt64(&s.lastUsed, now.UnixNano())
}
func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {
s.clientLock.Lock()
prev := s.client
s.client = client
var messages []*signaling.ProxyServerMessage
if client != nil {
messages, s.pendingMessages = s.pendingMessages, nil
}
s.clientLock.Unlock()
if prev != nil {
prev.SetSession(nil)
}
if client != nil {
s.MarkUsed()
client.SetSession(s)
for _, msg := range messages {
client.SendMessage(msg)
}
}
return prev
}
func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) {
id := s.proxy.GetClientId(client)
if id == "" {
log.Printf("Received candidate %+v from unknown %s client %s (%+v)", candidate, client.StreamType(), client.Id(), client)
return
}
msg := &signaling.ProxyServerMessage{
Type: "payload",
Payload: &signaling.PayloadProxyServerMessage{
Type: "candidate",
ClientId: id,
Payload: map[string]interface{}{
"candidate": candidate,
},
},
}
s.sendMessage(msg)
}
func (s *ProxySession) sendMessage(message *signaling.ProxyServerMessage) {
var client *ProxyClient
s.clientLock.Lock()
client = s.client
if client == nil {
s.pendingMessages = append(s.pendingMessages, message)
}
s.clientLock.Unlock()
if client != nil {
client.SendMessage(message)
}
}
func (s *ProxySession) OnIceCompleted(client signaling.McuClient) {
id := s.proxy.GetClientId(client)
if id == "" {
log.Printf("Received ice completed event from unknown %s client %s (%+v)", client.StreamType(), client.Id(), client)
return
}
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "ice-completed",
ClientId: id,
},
}
s.sendMessage(msg)
}
func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) {
if id := s.DeletePublisher(publisher); id != "" {
s.proxy.DeleteClient(id, publisher)
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "publisher-closed",
ClientId: id,
},
}
s.sendMessage(msg)
}
}
func (s *ProxySession) SubscriberClosed(subscriber signaling.McuSubscriber) {
if id := s.DeleteSubscriber(subscriber); id != "" {
s.proxy.DeleteClient(id, subscriber)
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "subscriber-closed",
ClientId: id,
},
}
s.sendMessage(msg)
}
}
func (s *ProxySession) StorePublisher(ctx context.Context, id string, publisher signaling.McuPublisher) {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
s.publishers[id] = publisher
s.publisherIds[publisher.Id()] = id
}
func (s *ProxySession) DeletePublisher(publisher signaling.McuPublisher) string {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
id, found := s.publisherIds[publisher.Id()]
if !found {
return ""
}
delete(s.publishers, id)
delete(s.publisherIds, publisher.Id())
return id
}
func (s *ProxySession) StoreSubscriber(ctx context.Context, id string, subscriber signaling.McuSubscriber) {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
s.subscribers[id] = subscriber
s.subscriberIds[subscriber.Id()] = id
}
func (s *ProxySession) DeleteSubscriber(subscriber signaling.McuSubscriber) string {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
id, found := s.subscriberIds[subscriber.Id()]
if !found {
return ""
}
delete(s.subscribers, id)
delete(s.subscriberIds, subscriber.Id())
return id
}
func (s *ProxySession) clearPublishers() {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
go func(publishers map[string]signaling.McuPublisher) {
for _, publisher := range publishers {
publisher.Close(context.Background())
}
}(s.publishers)
s.publishers = make(map[string]signaling.McuPublisher)
s.publisherIds = make(map[string]string)
}
func (s *ProxySession) clearSubscribers() {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
go func(subscribers map[string]signaling.McuSubscriber) {
for _, subscriber := range subscribers {
subscriber.Close(context.Background())
}
}(s.subscribers)
s.subscribers = make(map[string]signaling.McuSubscriber)
s.subscriberIds = make(map[string]string)
}
func (s *ProxySession) NotifyDisconnected() {
s.clearPublishers()
s.clearSubscribers()
}

+ 1
- 0
src/signaling/api_signaling.go View File

@ -278,6 +278,7 @@ const (
type HelloServerMessageServer struct {
Version string `json:"version"`
Features []string `json:"features,omitempty"`
Country string `json:"country,omitempty"`
}
type HelloServerMessage struct {


Loading…
Cancel
Save