diff --git a/.gitignore b/.gitignore index ae1d4af..7296465 100644 --- a/.gitignore +++ b/.gitignore @@ -2,9 +2,11 @@ bin/ vendor/ *_easyjson.go +*.pem *.prof *.socket *.tar.gz cover.out +proxy.conf server.conf diff --git a/Makefile b/Makefile index 9d1cdcf..ea49861 100644 --- a/Makefile +++ b/Makefile @@ -109,10 +109,14 @@ server: dependencies common mkdir -p $(BINDIR) GOPATH=$(GOPATH) $(GO) build $(BUILDARGS) -ldflags '$(INTERNALLDFLAGS)' -o $(BINDIR)/signaling ./src/server/... +proxy: dependencies common + mkdir -p $(BINDIR) + GOPATH=$(GOPATH) $(GO) build $(BUILDARGS) -ldflags '$(INTERNALLDFLAGS)' -o $(BINDIR)/proxy ./src/proxy/... + clean: rm -f src/signaling/*_easyjson.go -build: server +build: server proxy tarball: git archive \ diff --git a/dependencies.tsv b/dependencies.tsv index 1a865cd..12f699b 100644 --- a/dependencies.tsv +++ b/dependencies.tsv @@ -1,4 +1,5 @@ github.com/dlintw/goconf git dcc070983490608a14480e3bf943bad464785df5 2012-02-28T08:26:10Z +github.com/google/uuid git 0e4e31197428a347842d152773b4cace4645ca25 2020-07-02T18:56:42Z github.com/gorilla/context git 08b5f424b9271eedf6f9f0ce86cb9396ed337a42 2016-08-17T18:46:32Z github.com/gorilla/mux git ac112f7d75a0714af1bd86ab17749b31f7809640 2017-07-04T07:43:45Z github.com/gorilla/securecookie git e59506cc896acb7f7bf732d4fdf5e25f7ccd8983 2017-02-24T19:38:04Z diff --git a/proxy.conf.in b/proxy.conf.in new file mode 100644 index 0000000..4770eda --- /dev/null +++ b/proxy.conf.in @@ -0,0 +1,55 @@ +[http] +# IP and port to listen on for HTTP requests. +# Comment line to disable the listener. +#listen = 127.0.0.1:9090 + +[app] +# Set to "true" to install pprof debug handlers. +# See "https://golang.org/pkg/net/http/pprof/" for further information. +#debug = false + +# ISO 3166 country this proxy is located at. This will be used by the signaling +# servers to determine the closest proxy for publishers. +#country = DE + +[sessions] +# Secret value used to generate checksums of sessions. This should be a random +# string of 32 or 64 bytes. +hashkey = secret-for-session-checksums + +# Optional key for encrypting data in the sessions. Must be either 16, 24 or +# 32 bytes. +# If no key is specified, data will not be encrypted (not recommended). +blockkey = -encryption-key- + +[nats] +# Url of NATS backend to use. This can also be a list of URLs to connect to +# multiple backends. For local development, this can be set to ":loopback:" +# to process NATS messages internally instead of sending them through an +# external NATS backend. +#url = nats://localhost:4222 + +[tokens] +# Mapping of = of signaling servers allowed to connect. +#server1 = pubkey1.pem +#server2 = pubkey2.pem + +[mcu] +# The type of the MCU to use. Currently only "janus" is supported. +type = janus + +# The URL to the websocket endpoint of the MCU server. +url = ws://localhost:8188/ + +# The maximum bitrate per publishing stream (in bits per second). +# Defaults to 1 mbit/sec. +#maxstreambitrate = 1048576 + +# The maximum bitrate per screensharing stream (in bits per second). +# Default is 2 mbit/sec. +#maxscreenbitrate = 2097152 + +[stats] +# Comma-separated list of IP addresses that are allowed to access the stats +# endpoint. Leave empty (or commented) to only allow access from "127.0.0.1". +#allowed_ips = diff --git a/src/proxy/main.go b/src/proxy/main.go new file mode 100644 index 0000000..de3ba1a --- /dev/null +++ b/src/proxy/main.go @@ -0,0 +1,153 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2020 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package main + +import ( + "flag" + "fmt" + "log" + "net" + "net/http" + "os" + "os/signal" + "runtime" + "strings" + "syscall" + "time" + + "github.com/dlintw/goconf" + "github.com/gorilla/mux" + "github.com/nats-io/go-nats" + + "signaling" +) + +var ( + version = "unreleased" + + configFlag = flag.String("config", "proxy.conf", "config file to use") + + showVersion = flag.Bool("version", false, "show version and quit") +) + +const ( + defaultReadTimeout = 15 + defaultWriteTimeout = 15 + + proxyDebugMessages = false +) + +func main() { + log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile) + flag.Parse() + + if *showVersion { + fmt.Printf("nextcloud-spreed-signaling-proxy version %s/%s\n", version, runtime.Version()) + os.Exit(0) + } + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + signal.Notify(sigChan, syscall.SIGHUP) + + log.Printf("Starting up version %s/%s as pid %d", version, runtime.Version(), os.Getpid()) + + config, err := goconf.ReadConfigFile(*configFlag) + if err != nil { + log.Fatal("Could not read configuration: ", err) + } + + cpus := runtime.NumCPU() + runtime.GOMAXPROCS(cpus) + log.Printf("Using a maximum of %d CPUs\n", cpus) + + natsUrl, _ := config.GetString("nats", "url") + if natsUrl == "" { + natsUrl = nats.DefaultURL + } + + nats, err := signaling.NewNatsClient(natsUrl) + if err != nil { + log.Fatal("Could not create NATS client: ", err) + } + + r := mux.NewRouter() + + proxy, err := NewProxyServer(r, version, config, nats) + if err != nil { + log.Fatal(err) + } + + if err := proxy.Start(config); err != nil { + log.Fatal(err) + } + defer proxy.Stop() + + if addr, _ := config.GetString("http", "listen"); addr != "" { + readTimeout, _ := config.GetInt("http", "readtimeout") + if readTimeout <= 0 { + readTimeout = defaultReadTimeout + } + writeTimeout, _ := config.GetInt("http", "writetimeout") + if writeTimeout <= 0 { + writeTimeout = defaultWriteTimeout + } + + for _, address := range strings.Split(addr, " ") { + go func(address string) { + log.Println("Listening on", address) + listener, err := net.Listen("tcp", address) + if err != nil { + log.Fatal("Could not start listening: ", err) + } + srv := &http.Server{ + Handler: r, + Addr: addr, + + ReadTimeout: time.Duration(readTimeout) * time.Second, + WriteTimeout: time.Duration(writeTimeout) * time.Second, + } + if err := srv.Serve(listener); err != nil { + log.Fatal("Could not start server: ", err) + } + }(address) + } + } + +loop: + for { + switch sig := <-sigChan; sig { + case os.Interrupt: + log.Println("Interrupted") + break loop + case syscall.SIGHUP: + log.Printf("Received SIGHUP, reloading %s", *configFlag) + config, err := goconf.ReadConfigFile(*configFlag) + if err != nil { + log.Printf("Could not read configuration from %s: %s", *configFlag, err) + continue + } + + proxy.Reload(config) + } + } +} diff --git a/src/proxy/proxy_client.go b/src/proxy/proxy_client.go new file mode 100644 index 0000000..5cb8d6a --- /dev/null +++ b/src/proxy/proxy_client.go @@ -0,0 +1,55 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2020 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package main + +import ( + "sync/atomic" + "unsafe" + + "github.com/gorilla/websocket" + + "signaling" +) + +type ProxyClient struct { + signaling.Client + + proxy *ProxyServer + + session unsafe.Pointer +} + +func NewProxyClient(proxy *ProxyServer, conn *websocket.Conn, addr string) (*ProxyClient, error) { + client := &ProxyClient{ + proxy: proxy, + } + client.SetConn(conn, addr) + return client, nil +} + +func (c *ProxyClient) GetSession() *ProxySession { + return (*ProxySession)(atomic.LoadPointer(&c.session)) +} + +func (c *ProxyClient) SetSession(session *ProxySession) { + atomic.StorePointer(&c.session, unsafe.Pointer(session)) +} diff --git a/src/proxy/proxy_server.go b/src/proxy/proxy_server.go new file mode 100644 index 0000000..a82a42a --- /dev/null +++ b/src/proxy/proxy_server.go @@ -0,0 +1,962 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2020 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package main + +import ( + "crypto/rsa" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net" + "net/http" + "net/http/pprof" + "os" + "os/signal" + runtimepprof "runtime/pprof" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/dlintw/goconf" + "github.com/google/uuid" + "github.com/gorilla/mux" + "github.com/gorilla/securecookie" + "github.com/gorilla/websocket" + + "golang.org/x/net/context" + + "gopkg.in/dgrijalva/jwt-go.v3" + + "signaling" +) + +const ( + // Buffer sizes when reading/writing websocket connections. + websocketReadBufferSize = 4096 + websocketWriteBufferSize = 4096 + + initialMcuRetry = time.Second + maxMcuRetry = time.Second * 16 + + updateLoadInterval = time.Second + expireSessionsInterval = 10 * time.Second + + // Maximum age a token may have to prevent reuse of old tokens. + maxTokenAge = 5 * time.Minute +) + +type ContextKey string + +var ( + ContextKeySession = ContextKey("session") + + TimeoutCreatingPublisher = signaling.NewError("timeout", "Timeout creating publisher.") + TimeoutCreatingSubscriber = signaling.NewError("timeout", "Timeout creating subscriber.") + TokenAuthFailed = signaling.NewError("auth_failed", "The token could not be authenticated.") + TokenExpired = signaling.NewError("token_expired", "The token is expired.") + UnknownClient = signaling.NewError("unknown_client", "Unknown client id given.") + UnsupportedCommand = signaling.NewError("bad_request", "Unsupported command received.") + UnsupportedMessage = signaling.NewError("bad_request", "Unsupported message received.") + UnsupportedPayload = signaling.NewError("unsupported_payload", "Unsupported payload type.") +) + +type ProxyServer struct { + version string + country string + + url string + nats signaling.NatsClient + mcu signaling.Mcu + stopped uint32 + load int64 + + upgrader websocket.Upgrader + + tokenKeys atomic.Value + statsAllowedIps map[string]bool + + sid uint64 + cookie *securecookie.SecureCookie + sessions map[uint64]*ProxySession + sessionsLock sync.RWMutex + + clients map[string]signaling.McuClient + clientIds map[string]string + clientsLock sync.RWMutex +} + +func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile, nats signaling.NatsClient) (*ProxyServer, error) { + hashKey, _ := config.GetString("sessions", "hashkey") + switch len(hashKey) { + case 32: + case 64: + default: + log.Printf("WARNING: The sessions hash key should be 32 or 64 bytes but is %d bytes", len(hashKey)) + } + + blockKey, _ := config.GetString("sessions", "blockkey") + blockBytes := []byte(blockKey) + switch len(blockKey) { + case 0: + blockBytes = nil + case 16: + case 24: + case 32: + default: + return nil, fmt.Errorf("The sessions block key must be 16, 24 or 32 bytes but is %d bytes", len(blockKey)) + } + + tokenKeys := make(map[string]*rsa.PublicKey) + options, _ := config.GetOptions("tokens") + for _, id := range options { + filename, _ := config.GetString("tokens", id) + if filename == "" { + return nil, fmt.Errorf("No filename given for token %s", id) + } + + keyData, err := ioutil.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("Could not read public key from %s: %s", filename, err) + } + key, err := jwt.ParseRSAPublicKeyFromPEM(keyData) + if err != nil { + return nil, fmt.Errorf("Could not parse public key from %s: %s", filename, err) + } + + tokenKeys[id] = key + } + + var keyIds []string + for k, _ := range tokenKeys { + keyIds = append(keyIds, k) + } + sort.Strings(keyIds) + log.Printf("Enabled token keys: %v", keyIds) + + statsAllowed, _ := config.GetString("stats", "allowed_ips") + var statsAllowedIps map[string]bool + if statsAllowed == "" { + log.Printf("No IPs configured for the stats endpoint, only allowing access from 127.0.0.1") + statsAllowedIps = map[string]bool{ + "127.0.0.1": true, + } + } else { + log.Printf("Only allowing access to the stats endpoing from %s", statsAllowed) + statsAllowedIps = make(map[string]bool) + for _, ip := range strings.Split(statsAllowed, ",") { + ip = strings.TrimSpace(ip) + if ip != "" { + statsAllowedIps[ip] = true + } + } + } + + country, _ := config.GetString("app", "country") + if country != "" { + log.Printf("Sending %s as country information", country) + } else { + log.Printf("Not sending country information") + } + + result := &ProxyServer{ + version: version, + country: country, + + nats: nats, + + upgrader: websocket.Upgrader{ + ReadBufferSize: websocketReadBufferSize, + WriteBufferSize: websocketWriteBufferSize, + }, + + statsAllowedIps: statsAllowedIps, + + cookie: securecookie.New([]byte(hashKey), blockBytes).MaxAge(0), + sessions: make(map[uint64]*ProxySession), + + clients: make(map[string]signaling.McuClient), + clientIds: make(map[string]string), + } + + result.setTokenKeys(tokenKeys) + result.upgrader.CheckOrigin = result.checkOrigin + + if debug, _ := config.GetBool("app", "debug"); debug { + log.Println("Installing debug handlers in \"/debug/pprof\"") + r.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + r.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + r.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + r.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + r.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + for _, profile := range runtimepprof.Profiles() { + name := profile.Name() + r.Handle("/debug/pprof/"+name, pprof.Handler(name)) + } + } + + r.HandleFunc("/proxy", result.setCommonHeaders(result.proxyHandler)).Methods("GET") + r.HandleFunc("/stats", result.setCommonHeaders(result.validateStatsRequest(result.statsHandler))).Methods("GET") + return result, nil +} + +func (s *ProxyServer) checkOrigin(r *http.Request) bool { + // We allow any Origin to connect to the service. + return true +} + +func (s *ProxyServer) setTokenKeys(keys map[string]*rsa.PublicKey) { + s.tokenKeys.Store(keys) +} + +func (s *ProxyServer) getTokenKeys() map[string]*rsa.PublicKey { + return s.tokenKeys.Load().(map[string]*rsa.PublicKey) +} + +func (s *ProxyServer) Start(config *goconf.ConfigFile) error { + s.url, _ = config.GetString("mcu", "url") + if s.url == "" { + return fmt.Errorf("No MCU server url configured") + } + + mcuType, _ := config.GetString("mcu", "type") + if mcuType == "" { + mcuType = signaling.McuTypeDefault + } + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + defer signal.Stop(interrupt) + + var err error + var mcu signaling.Mcu + mcuRetry := initialMcuRetry + mcuRetryTimer := time.NewTimer(mcuRetry) + for { + switch mcuType { + case signaling.McuTypeJanus: + mcu, err = signaling.NewMcuJanus(s.url, config, s.nats) + default: + return fmt.Errorf("Unsupported MCU type: %s", mcuType) + } + if err == nil { + mcu.SetOnConnected(s.onMcuConnected) + mcu.SetOnDisconnected(s.onMcuDisconnected) + err = mcu.Start() + if err != nil { + log.Printf("Could not create %s MCU at %s: %s", mcuType, s.url, err) + } + } + if err == nil { + break + } + + log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, mcuRetry) + mcuRetryTimer.Reset(mcuRetry) + select { + case <-interrupt: + return fmt.Errorf("Cancelled") + case <-mcuRetryTimer.C: + // Retry connection + mcuRetry = mcuRetry * 2 + if mcuRetry > maxMcuRetry { + mcuRetry = maxMcuRetry + } + } + } + + s.mcu = mcu + + go s.run() + + return nil +} + +func (s *ProxyServer) run() { + updateLoadTicker := time.NewTicker(updateLoadInterval) + expireSessionsTicker := time.NewTicker(expireSessionsInterval) +loop: + for { + select { + case <-updateLoadTicker.C: + if atomic.LoadUint32(&s.stopped) != 0 { + break loop + } + s.updateLoad() + case <-expireSessionsTicker.C: + if atomic.LoadUint32(&s.stopped) != 0 { + break loop + } + s.expireSessions() + } + } +} + +func (s *ProxyServer) updateLoad() { + // TODO: Take maximum bandwidth of clients into account when calculating + // load (screensharing requires more than regular audio/video). + load := s.GetClientCount() + if load == atomic.LoadInt64(&s.load) { + return + } + + atomic.StoreInt64(&s.load, load) + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "update-load", + Load: load, + }, + } + + s.IterateSessions(func(session *ProxySession) { + session.sendMessage(msg) + }) +} + +func (s *ProxyServer) getExpiredSessions() []*ProxySession { + var expired []*ProxySession + s.IterateSessions(func(session *ProxySession) { + if session.IsExpired() { + expired = append(expired, session) + } + }) + return expired +} + +func (s *ProxyServer) expireSessions() { + expired := s.getExpiredSessions() + if len(expired) == 0 { + return + } + + s.sessionsLock.Lock() + defer s.sessionsLock.Unlock() + for _, session := range expired { + if !session.IsExpired() { + // Session was used while waiting for the lock. + continue + } + + log.Printf("Delete expired session %s", session.PublicId()) + s.deleteSessionLocked(session.Sid()) + } +} + +func (s *ProxyServer) Stop() { + if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { + return + } + + s.mcu.Stop() +} + +func (s *ProxyServer) Reload(config *goconf.ConfigFile) { + tokenKeys := make(map[string]*rsa.PublicKey) + options, _ := config.GetOptions("tokens") + for _, id := range options { + filename, _ := config.GetString("tokens", id) + if filename == "" { + log.Printf("No filename given for token %s, ignoring", id) + continue + } + + keyData, err := ioutil.ReadFile(filename) + if err != nil { + log.Printf("Could not read public key from %s, ignoring: %s", filename, err) + continue + } + key, err := jwt.ParseRSAPublicKeyFromPEM(keyData) + if err != nil { + log.Printf("Could not parse public key from %s, ignoring: %s", filename, err) + continue + } + + tokenKeys[id] = key + } + + if len(tokenKeys) == 0 { + log.Printf("No token keys loaded") + } else { + var keyIds []string + for k, _ := range tokenKeys { + keyIds = append(keyIds, k) + } + sort.Strings(keyIds) + log.Printf("Enabled token keys: %v", keyIds) + } + s.setTokenKeys(tokenKeys) +} + +func (s *ProxyServer) setCommonHeaders(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "nextcloud-spreed-signaling-proxy/"+s.version) + f(w, r) + } +} + +func getRealUserIP(r *http.Request) string { + // Note this function assumes it is running behind a trusted proxy, so + // the headers can be trusted. + if ip := r.Header.Get("X-Real-IP"); ip != "" { + return ip + } + + if ip := r.Header.Get("X-Forwarded-For"); ip != "" { + // Result could be a list "clientip, proxy1, proxy2", so only use first element. + if pos := strings.Index(ip, ","); pos >= 0 { + ip = strings.TrimSpace(ip[:pos]) + } + return ip + } + + return r.RemoteAddr +} + +func (s *ProxyServer) proxyHandler(w http.ResponseWriter, r *http.Request) { + addr := getRealUserIP(r) + conn, err := s.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Could not upgrade request from %s: %s", addr, err) + return + } + + client, err := NewProxyClient(s, conn, addr) + if err != nil { + log.Printf("Could not create client for %s: %s", addr, err) + return + } + + client.OnClosed = s.clientClosed + client.OnMessageReceived = func(c *signaling.Client, data []byte) { + s.processMessage(client, data) + } + client.OnRTTReceived = func(c *signaling.Client, rtt time.Duration) { + if session := client.GetSession(); session != nil { + session.MarkUsed() + } + } + + go client.WritePump() + go client.ReadPump() +} + +func (s *ProxyServer) clientClosed(client *signaling.Client) { + log.Printf("Connection from %s closed", client.RemoteAddr()) +} + +func (s *ProxyServer) onMcuConnected() { + log.Printf("Connection to %s established", s.url) + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "backend-connected", + }, + } + + s.IterateSessions(func(session *ProxySession) { + session.sendMessage(msg) + }) +} + +func (s *ProxyServer) onMcuDisconnected() { + if atomic.LoadUint32(&s.stopped) != 0 { + // Shutting down, no need to notify. + return + } + + log.Printf("Connection to %s lost", s.url) + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "backend-disconnected", + }, + } + + s.IterateSessions(func(session *ProxySession) { + session.sendMessage(msg) + session.NotifyDisconnected() + }) +} + +func (s *ProxyServer) sendCurrentLoad(session *ProxySession) { + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "update-load", + Load: atomic.LoadInt64(&s.load), + }, + } + session.sendMessage(msg) +} + +func (s *ProxyServer) processMessage(client *ProxyClient, data []byte) { + if proxyDebugMessages { + log.Printf("Message: %s", string(data)) + } + var message signaling.ProxyClientMessage + if err := message.UnmarshalJSON(data); err != nil { + if session := client.GetSession(); session != nil { + log.Printf("Error decoding message from client %s: %v", session.PublicId(), err) + } else { + log.Printf("Error decoding message from %s: %v", client.RemoteAddr(), err) + } + client.SendError(signaling.InvalidFormat) + return + } + + if err := message.CheckValid(); err != nil { + if session := client.GetSession(); session != nil { + log.Printf("Invalid message %+v from client %s: %v", message, session.PublicId(), err) + } else { + log.Printf("Invalid message %+v from %s: %v", message, client.RemoteAddr(), err) + } + client.SendMessage(message.NewErrorServerMessage(signaling.InvalidFormat)) + return + } + + session := client.GetSession() + if session == nil { + if message.Type != "hello" { + client.SendMessage(message.NewErrorServerMessage(signaling.HelloExpected)) + return + } + + var session *ProxySession + if resumeId := message.Hello.ResumeId; resumeId != "" { + var data signaling.SessionIdData + if s.cookie.Decode("session", resumeId, &data) == nil { + session = s.GetSession(data.Sid) + } + + if session == nil { + client.SendMessage(message.NewErrorServerMessage(signaling.NoSuchSession)) + return + } + + log.Printf("Resumed session %s", session.PublicId()) + s.sendCurrentLoad(session) + } else { + var err error + if session, err = s.NewSession(message.Hello); err != nil { + if e, ok := err.(*signaling.Error); ok { + client.SendMessage(message.NewErrorServerMessage(e)) + } else { + client.SendMessage(message.NewWrappedErrorServerMessage(err)) + } + return + } + } + + prev := session.SetClient(client) + if prev != nil { + msg := &signaling.ProxyServerMessage{ + Type: "bye", + Bye: &signaling.ByeProxyServerMessage{ + Reason: "session_resumed", + }, + } + prev.SendMessage(msg) + } + response := &signaling.ProxyServerMessage{ + Id: message.Id, + Type: "hello", + Hello: &signaling.HelloProxyServerMessage{ + Version: signaling.HelloVersion, + SessionId: session.PublicId(), + Server: &signaling.HelloServerMessageServer{ + Version: s.version, + Country: s.country, + }, + }, + } + client.SendMessage(response) + s.sendCurrentLoad(session) + return + } + + ctx := context.WithValue(context.Background(), ContextKeySession, session) + session.MarkUsed() + + switch message.Type { + case "command": + s.processCommand(ctx, client, session, &message) + case "payload": + s.processPayload(ctx, client, session, &message) + default: + session.sendMessage(message.NewErrorServerMessage(UnsupportedMessage)) + } +} + +func (s *ProxyServer) processCommand(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) { + cmd := message.Command + switch cmd.Type { + case "create-publisher": + id := uuid.New().String() + publisher, err := s.mcu.NewPublisher(ctx, session, id, cmd.StreamType) + if err == context.DeadlineExceeded { + log.Printf("Timeout while creating %s publisher %s for %s", cmd.StreamType, id, session.PublicId()) + session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingPublisher)) + return + } else if err != nil { + log.Printf("Error while creating %s publisher %s for %s: %s", cmd.StreamType, id, session.PublicId(), err) + session.sendMessage(message.NewWrappedErrorServerMessage(err)) + return + } + + log.Printf("Created %s publisher %s as %s", cmd.StreamType, publisher.Id(), id) + session.StorePublisher(ctx, id, publisher) + s.StoreClient(id, publisher) + + response := &signaling.ProxyServerMessage{ + Id: message.Id, + Type: "command", + Command: &signaling.CommandProxyServerMessage{ + Id: id, + }, + } + session.sendMessage(response) + case "create-subscriber": + id := uuid.New().String() + publisherId := cmd.PublisherId + subscriber, err := s.mcu.NewSubscriber(ctx, session, publisherId, cmd.StreamType) + if err == context.DeadlineExceeded { + log.Printf("Timeout while creating %s subscriber on %s for %s", cmd.StreamType, publisherId, session.PublicId()) + session.sendMessage(message.NewErrorServerMessage(TimeoutCreatingSubscriber)) + return + } else if err != nil { + log.Printf("Error while creating %s subscriber on %s for %s: %s", cmd.StreamType, publisherId, session.PublicId(), err) + session.sendMessage(message.NewWrappedErrorServerMessage(err)) + return + } + + log.Printf("Created %s subscriber %s as %s", cmd.StreamType, subscriber.Id(), id) + session.StoreSubscriber(ctx, id, subscriber) + s.StoreClient(id, subscriber) + + response := &signaling.ProxyServerMessage{ + Id: message.Id, + Type: "command", + Command: &signaling.CommandProxyServerMessage{ + Id: id, + }, + } + session.sendMessage(response) + case "delete-publisher": + client := s.GetClient(cmd.ClientId) + if client == nil { + session.sendMessage(message.NewErrorServerMessage(UnknownClient)) + return + } + + if session.DeletePublisher(client) == "" { + session.sendMessage(message.NewErrorServerMessage(UnknownClient)) + return + } + + s.DeleteClient(cmd.ClientId, client) + + go func() { + log.Printf("Closing %s publisher %s as %s", client.StreamType(), client.Id(), cmd.ClientId) + client.Close(context.Background()) + }() + + response := &signaling.ProxyServerMessage{ + Id: message.Id, + Type: "command", + Command: &signaling.CommandProxyServerMessage{ + Id: cmd.ClientId, + }, + } + session.sendMessage(response) + case "delete-subscriber": + client := s.GetClient(cmd.ClientId) + if client == nil { + session.sendMessage(message.NewErrorServerMessage(UnknownClient)) + return + } + + subscriber, ok := client.(signaling.McuSubscriber) + if !ok { + session.sendMessage(message.NewErrorServerMessage(UnknownClient)) + return + } + + if session.DeleteSubscriber(subscriber) == "" { + session.sendMessage(message.NewErrorServerMessage(UnknownClient)) + return + } + + s.DeleteClient(cmd.ClientId, client) + + go func() { + log.Printf("Closing %s subscriber %s as %s", client.StreamType(), client.Id(), cmd.ClientId) + client.Close(context.Background()) + }() + + response := &signaling.ProxyServerMessage{ + Id: message.Id, + Type: "command", + Command: &signaling.CommandProxyServerMessage{ + Id: cmd.ClientId, + }, + } + session.sendMessage(response) + default: + log.Printf("Unsupported command %+v", message.Command) + session.sendMessage(message.NewErrorServerMessage(UnsupportedCommand)) + } +} + +func (s *ProxyServer) processPayload(ctx context.Context, client *ProxyClient, session *ProxySession, message *signaling.ProxyClientMessage) { + payload := message.Payload + mcuClient := s.GetClient(payload.ClientId) + if mcuClient == nil { + session.sendMessage(message.NewErrorServerMessage(UnknownClient)) + return + } + + var mcuData *signaling.MessageClientMessageData + switch payload.Type { + case "offer": + fallthrough + case "answer": + fallthrough + case "candidate": + mcuData = &signaling.MessageClientMessageData{ + Type: payload.Type, + Payload: payload.Payload, + } + case "endOfCandidates": + // Ignore but confirm, not passed along to Janus anyway. + session.sendMessage(&signaling.ProxyServerMessage{ + Id: message.Id, + Type: "payload", + Payload: &signaling.PayloadProxyServerMessage{ + Type: payload.Type, + ClientId: payload.ClientId, + }, + }) + return + case "requestoffer": + fallthrough + case "sendoffer": + mcuData = &signaling.MessageClientMessageData{ + Type: payload.Type, + } + default: + session.sendMessage(message.NewErrorServerMessage(UnsupportedPayload)) + return + } + + mcuClient.SendMessage(ctx, nil, mcuData, func(err error, response map[string]interface{}) { + var responseMsg *signaling.ProxyServerMessage + if err != nil { + log.Printf("Error sending %s to %s client %s: %s", mcuData, mcuClient.StreamType(), payload.ClientId, err) + responseMsg = message.NewWrappedErrorServerMessage(err) + } else { + responseMsg = &signaling.ProxyServerMessage{ + Id: message.Id, + Type: "payload", + Payload: &signaling.PayloadProxyServerMessage{ + Type: payload.Type, + ClientId: payload.ClientId, + Payload: response, + }, + } + } + + session.sendMessage(responseMsg) + }) +} + +func (s *ProxyServer) NewSession(hello *signaling.HelloProxyClientMessage) (*ProxySession, error) { + if proxyDebugMessages { + log.Printf("Hello: %+v", hello) + } + + token, err := jwt.ParseWithClaims(hello.Token, &signaling.TokenClaims{}, func(token *jwt.Token) (interface{}, error) { + // Don't forget to validate the alg is what you expect: + if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok { + log.Printf("Unexpected signing method: %v", token.Header["alg"]) + return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"]) + } + + claims, ok := token.Claims.(*signaling.TokenClaims) + if !ok { + log.Printf("Unsupported claims type: %+v", token.Claims) + return nil, fmt.Errorf("Unsupported claims type") + } + + tokenKeys := s.getTokenKeys() + publicKey := tokenKeys[claims.Issuer] + if publicKey == nil { + log.Printf("Issuer %s is not supported", claims.Issuer) + return nil, fmt.Errorf("No key found for issuer") + } + return publicKey, nil + }) + if err != nil { + return nil, TokenAuthFailed + } + + claims, ok := token.Claims.(*signaling.TokenClaims) + if !ok || !token.Valid { + return nil, TokenAuthFailed + } + + minIssuedAt := time.Now().Add(-maxTokenAge) + if issuedAt := time.Unix(claims.IssuedAt, 0); issuedAt.Before(minIssuedAt) { + return nil, TokenExpired + } + + sid := atomic.AddUint64(&s.sid, 1) + for sid == 0 { + sid = atomic.AddUint64(&s.sid, 1) + } + + sessionIdData := &signaling.SessionIdData{ + Sid: sid, + Created: time.Now(), + } + + encoded, err := s.cookie.Encode("session", sessionIdData) + if err != nil { + return nil, err + } + + log.Printf("Created session %s for %+v", encoded, claims) + session := NewProxySession(s, sid, encoded) + s.StoreSession(sid, session) + return session, nil +} + +func (s *ProxyServer) StoreSession(id uint64, session *ProxySession) { + s.sessionsLock.Lock() + defer s.sessionsLock.Unlock() + s.sessions[id] = session +} + +func (s *ProxyServer) GetSession(id uint64) *ProxySession { + s.sessionsLock.RLock() + defer s.sessionsLock.RUnlock() + return s.sessions[id] +} + +func (s *ProxyServer) GetSessionsCount() int64 { + s.sessionsLock.RLock() + defer s.sessionsLock.RUnlock() + return int64(len(s.sessions)) +} + +func (s *ProxyServer) IterateSessions(f func(*ProxySession)) { + s.sessionsLock.RLock() + defer s.sessionsLock.RUnlock() + + for _, session := range s.sessions { + f(session) + } +} + +func (s *ProxyServer) DeleteSession(id uint64) { + s.sessionsLock.Lock() + defer s.sessionsLock.Unlock() + s.deleteSessionLocked(id) +} + +func (s *ProxyServer) deleteSessionLocked(id uint64) { + delete(s.sessions, id) +} + +func (s *ProxyServer) StoreClient(id string, client signaling.McuClient) { + s.clientsLock.Lock() + defer s.clientsLock.Unlock() + s.clients[id] = client + s.clientIds[client.Id()] = id +} + +func (s *ProxyServer) DeleteClient(id string, client signaling.McuClient) { + s.clientsLock.Lock() + defer s.clientsLock.Unlock() + delete(s.clients, id) + delete(s.clientIds, client.Id()) +} + +func (s *ProxyServer) GetClientCount() int64 { + s.clientsLock.RLock() + defer s.clientsLock.RUnlock() + return int64(len(s.clients)) +} + +func (s *ProxyServer) GetClient(id string) signaling.McuClient { + s.clientsLock.RLock() + defer s.clientsLock.RUnlock() + return s.clients[id] +} + +func (s *ProxyServer) GetClientId(client signaling.McuClient) string { + s.clientsLock.RLock() + defer s.clientsLock.RUnlock() + return s.clientIds[client.Id()] +} + +func (s *ProxyServer) getStats() map[string]interface{} { + result := map[string]interface{}{ + "sessions": s.GetSessionsCount(), + "mcu": s.mcu.GetStats(), + } + return result +} + +func (s *ProxyServer) validateStatsRequest(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + addr := getRealUserIP(r) + if strings.Contains(addr, ":") { + if host, _, err := net.SplitHostPort(addr); err == nil { + addr = host + } + } + if !s.statsAllowedIps[addr] { + http.Error(w, "Authentication check failed", http.StatusForbidden) + return + } + + f(w, r) + } +} + +func (s *ProxyServer) statsHandler(w http.ResponseWriter, r *http.Request) { + stats := s.getStats() + statsData, err := json.MarshalIndent(stats, "", " ") + if err != nil { + log.Printf("Could not serialize stats %+v: %s", stats, err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(http.StatusOK) + w.Write(statsData) +} diff --git a/src/proxy/proxy_session.go b/src/proxy/proxy_session.go new file mode 100644 index 0000000..ebbb85d --- /dev/null +++ b/src/proxy/proxy_session.go @@ -0,0 +1,272 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2020 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package main + +import ( + "log" + "sync" + "sync/atomic" + "time" + + "golang.org/x/net/context" + + "signaling" +) + +const ( + // Sessions expire if they have not been used for one minute. + sessionExpirationTime = time.Minute +) + +type ProxySession struct { + proxy *ProxyServer + id string + sid uint64 + lastUsed int64 + + clientLock sync.Mutex + client *ProxyClient + pendingMessages []*signaling.ProxyServerMessage + + publishersLock sync.Mutex + publishers map[string]signaling.McuPublisher + publisherIds map[string]string + + subscribersLock sync.Mutex + subscribers map[string]signaling.McuSubscriber + subscriberIds map[string]string +} + +func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession { + return &ProxySession{ + proxy: proxy, + id: id, + sid: sid, + lastUsed: time.Now().UnixNano(), + + publishers: make(map[string]signaling.McuPublisher), + publisherIds: make(map[string]string), + + subscribers: make(map[string]signaling.McuSubscriber), + subscriberIds: make(map[string]string), + } +} + +func (s *ProxySession) PublicId() string { + return s.id +} + +func (s *ProxySession) Sid() uint64 { + return s.sid +} + +func (s *ProxySession) LastUsed() time.Time { + lastUsed := atomic.LoadInt64(&s.lastUsed) + return time.Unix(0, lastUsed) +} + +func (s *ProxySession) IsExpired() bool { + expiresAt := s.LastUsed().Add(sessionExpirationTime) + return expiresAt.Before(time.Now()) +} + +func (s *ProxySession) MarkUsed() { + now := time.Now() + atomic.StoreInt64(&s.lastUsed, now.UnixNano()) +} + +func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient { + s.clientLock.Lock() + prev := s.client + s.client = client + var messages []*signaling.ProxyServerMessage + if client != nil { + messages, s.pendingMessages = s.pendingMessages, nil + } + s.clientLock.Unlock() + if prev != nil { + prev.SetSession(nil) + } + if client != nil { + s.MarkUsed() + client.SetSession(s) + for _, msg := range messages { + client.SendMessage(msg) + } + } + return prev +} + +func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) { + id := s.proxy.GetClientId(client) + if id == "" { + log.Printf("Received candidate %+v from unknown %s client %s (%+v)", candidate, client.StreamType(), client.Id(), client) + return + } + + msg := &signaling.ProxyServerMessage{ + Type: "payload", + Payload: &signaling.PayloadProxyServerMessage{ + Type: "candidate", + ClientId: id, + Payload: map[string]interface{}{ + "candidate": candidate, + }, + }, + } + s.sendMessage(msg) +} + +func (s *ProxySession) sendMessage(message *signaling.ProxyServerMessage) { + var client *ProxyClient + s.clientLock.Lock() + client = s.client + if client == nil { + s.pendingMessages = append(s.pendingMessages, message) + } + s.clientLock.Unlock() + if client != nil { + client.SendMessage(message) + } +} + +func (s *ProxySession) OnIceCompleted(client signaling.McuClient) { + id := s.proxy.GetClientId(client) + if id == "" { + log.Printf("Received ice completed event from unknown %s client %s (%+v)", client.StreamType(), client.Id(), client) + return + } + + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "ice-completed", + ClientId: id, + }, + } + s.sendMessage(msg) +} + +func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) { + if id := s.DeletePublisher(publisher); id != "" { + s.proxy.DeleteClient(id, publisher) + + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "publisher-closed", + ClientId: id, + }, + } + s.sendMessage(msg) + } +} + +func (s *ProxySession) SubscriberClosed(subscriber signaling.McuSubscriber) { + if id := s.DeleteSubscriber(subscriber); id != "" { + s.proxy.DeleteClient(id, subscriber) + + msg := &signaling.ProxyServerMessage{ + Type: "event", + Event: &signaling.EventProxyServerMessage{ + Type: "subscriber-closed", + ClientId: id, + }, + } + s.sendMessage(msg) + } +} + +func (s *ProxySession) StorePublisher(ctx context.Context, id string, publisher signaling.McuPublisher) { + s.publishersLock.Lock() + defer s.publishersLock.Unlock() + + s.publishers[id] = publisher + s.publisherIds[publisher.Id()] = id +} + +func (s *ProxySession) DeletePublisher(publisher signaling.McuPublisher) string { + s.publishersLock.Lock() + defer s.publishersLock.Unlock() + + id, found := s.publisherIds[publisher.Id()] + if !found { + return "" + } + + delete(s.publishers, id) + delete(s.publisherIds, publisher.Id()) + return id +} + +func (s *ProxySession) StoreSubscriber(ctx context.Context, id string, subscriber signaling.McuSubscriber) { + s.subscribersLock.Lock() + defer s.subscribersLock.Unlock() + + s.subscribers[id] = subscriber + s.subscriberIds[subscriber.Id()] = id +} + +func (s *ProxySession) DeleteSubscriber(subscriber signaling.McuSubscriber) string { + s.subscribersLock.Lock() + defer s.subscribersLock.Unlock() + + id, found := s.subscriberIds[subscriber.Id()] + if !found { + return "" + } + + delete(s.subscribers, id) + delete(s.subscriberIds, subscriber.Id()) + return id +} + +func (s *ProxySession) clearPublishers() { + s.publishersLock.Lock() + defer s.publishersLock.Unlock() + + go func(publishers map[string]signaling.McuPublisher) { + for _, publisher := range publishers { + publisher.Close(context.Background()) + } + }(s.publishers) + s.publishers = make(map[string]signaling.McuPublisher) + s.publisherIds = make(map[string]string) +} + +func (s *ProxySession) clearSubscribers() { + s.publishersLock.Lock() + defer s.publishersLock.Unlock() + + go func(subscribers map[string]signaling.McuSubscriber) { + for _, subscriber := range subscribers { + subscriber.Close(context.Background()) + } + }(s.subscribers) + s.subscribers = make(map[string]signaling.McuSubscriber) + s.subscriberIds = make(map[string]string) +} + +func (s *ProxySession) NotifyDisconnected() { + s.clearPublishers() + s.clearSubscribers() +} diff --git a/src/signaling/api_signaling.go b/src/signaling/api_signaling.go index 5f43969..5762459 100644 --- a/src/signaling/api_signaling.go +++ b/src/signaling/api_signaling.go @@ -278,6 +278,7 @@ const ( type HelloServerMessageServer struct { Version string `json:"version"` Features []string `json:"features,omitempty"` + Country string `json:"country,omitempty"` } type HelloServerMessage struct {