Merge pull request #36 from strukturag/multiple-mcu-support

Support connecting to multiple Janus servers
This commit is contained in:
Joachim Bauch 2020-08-13 13:20:28 +02:00 committed by GitHub
commit 015fa3565d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 3319 additions and 121 deletions

2
.gitignore vendored
View file

@ -2,9 +2,11 @@ bin/
vendor/
*_easyjson.go
*.pem
*.prof
*.socket
*.tar.gz
cover.out
proxy.conf
server.conf

View file

@ -97,6 +97,7 @@ coverhtml: dependencies vet common
common: easyjson \
src/signaling/api_signaling_easyjson.go \
src/signaling/api_backend_easyjson.go \
src/signaling/api_proxy_easyjson.go \
src/signaling/natsclient_easyjson.go \
src/signaling/room_easyjson.go
@ -108,10 +109,14 @@ server: dependencies common
mkdir -p $(BINDIR)
GOPATH=$(GOPATH) $(GO) build $(BUILDARGS) -ldflags '$(INTERNALLDFLAGS)' -o $(BINDIR)/signaling ./src/server/...
proxy: dependencies common
mkdir -p $(BINDIR)
GOPATH=$(GOPATH) $(GO) build $(BUILDARGS) -ldflags '$(INTERNALLDFLAGS)' -o $(BINDIR)/proxy ./src/proxy/...
clean:
rm -f src/signaling/*_easyjson.go
build: server
build: server proxy
tarball:
git archive \

View file

@ -127,6 +127,33 @@ The maximum bandwidth per publishing stream can also be configured in the
section `[mcu]`, see properties `maxstreambitrate` and `maxscreenbitrate`.
### Use multiple Janus servers
To scale the setup and add high availability, a signaling server can connect to
one or multiple proxy servers that each provide access to a single Janus server.
For that, set the `type` key in section `[mcu]` to `proxy` and set `url` to a
space-separated list of URLs where a proxy server is running.
Each signaling server that connects to a proxy needs a unique token id and a
public / private RSA keypair. The token id must be configured as `token_id` in
section `[mcu]`, the path to the private key file as `token_key`.
### Setup of proxy server
The proxy server is built with the standard make command `make build` as
`bin/proxy` binary. Copy the `proxy.conf.in` as `proxy.conf` and edit section
`[tokens]` to the list of allowed token ids and filenames of the public keys
for each token id. See the comments in `proxy.conf.in` for other configuration
options.
When the proxy process receives a `SIGHUP` signal, the list of allowed token
ids / public keys is reloaded. A `SIGUSR1` signal can be used to shutdown a
proxy process gracefully after all clients have been disconnected. No new
publishers will be accepted in this case.
## Setup of frontend webserver
Usually the standalone signaling server is running behind a webserver that does

View file

@ -1,4 +1,5 @@
github.com/dlintw/goconf git dcc070983490608a14480e3bf943bad464785df5 2012-02-28T08:26:10Z
github.com/google/uuid git 0e4e31197428a347842d152773b4cace4645ca25 2020-07-02T18:56:42Z
github.com/gorilla/context git 08b5f424b9271eedf6f9f0ce86cb9396ed337a42 2016-08-17T18:46:32Z
github.com/gorilla/mux git ac112f7d75a0714af1bd86ab17749b31f7809640 2017-07-04T07:43:45Z
github.com/gorilla/securecookie git e59506cc896acb7f7bf732d4fdf5e25f7ccd8983 2017-02-24T19:38:04Z
@ -10,3 +11,4 @@ github.com/notedit/janus-go git 10eb8b95d1a0469ac8921c5ce5fb55b4c0d3ad7d 2020-05
github.com/oschwald/maxminddb-golang git 1960b16a5147df3a4c61ac83b2f31cd8f811d609 2019-05-23T23:57:38Z
golang.org/x/net git f01ecb60fe3835d80d9a0b7b2bf24b228c89260e 2017-07-11T18:12:19Z
golang.org/x/sys git ac767d655b305d4e9612f5f6e33120b9176c4ad4 2018-07-15T08:55:29Z
gopkg.in/dgrijalva/jwt-go.v3 git 06ea1031745cb8b3dab3f6a236daf2b0aa468b7e 2018-03-08T23:13:08Z

1 github.com/dlintw/goconf git dcc070983490608a14480e3bf943bad464785df5 2012-02-28T08:26:10Z
2 github.com/google/uuid git 0e4e31197428a347842d152773b4cace4645ca25 2020-07-02T18:56:42Z
3 github.com/gorilla/context git 08b5f424b9271eedf6f9f0ce86cb9396ed337a42 2016-08-17T18:46:32Z
4 github.com/gorilla/mux git ac112f7d75a0714af1bd86ab17749b31f7809640 2017-07-04T07:43:45Z
5 github.com/gorilla/securecookie git e59506cc896acb7f7bf732d4fdf5e25f7ccd8983 2017-02-24T19:38:04Z
11 github.com/oschwald/maxminddb-golang git 1960b16a5147df3a4c61ac83b2f31cd8f811d609 2019-05-23T23:57:38Z
12 golang.org/x/net git f01ecb60fe3835d80d9a0b7b2bf24b228c89260e 2017-07-11T18:12:19Z
13 golang.org/x/sys git ac767d655b305d4e9612f5f6e33120b9176c4ad4 2018-07-15T08:55:29Z
14 gopkg.in/dgrijalva/jwt-go.v3 git 06ea1031745cb8b3dab3f6a236daf2b0aa468b7e 2018-03-08T23:13:08Z

55
proxy.conf.in Normal file
View file

@ -0,0 +1,55 @@
[http]
# IP and port to listen on for HTTP requests.
# Comment line to disable the listener.
#listen = 127.0.0.1:9090
[app]
# Set to "true" to install pprof debug handlers.
# See "https://golang.org/pkg/net/http/pprof/" for further information.
#debug = false
# ISO 3166 country this proxy is located at. This will be used by the signaling
# servers to determine the closest proxy for publishers.
#country = DE
[sessions]
# Secret value used to generate checksums of sessions. This should be a random
# string of 32 or 64 bytes.
hashkey = secret-for-session-checksums
# Optional key for encrypting data in the sessions. Must be either 16, 24 or
# 32 bytes.
# If no key is specified, data will not be encrypted (not recommended).
blockkey = -encryption-key-
[nats]
# Url of NATS backend to use. This can also be a list of URLs to connect to
# multiple backends. For local development, this can be set to ":loopback:"
# to process NATS messages internally instead of sending them through an
# external NATS backend.
#url = nats://localhost:4222
[tokens]
# Mapping of <tokenid> = <publickey> of signaling servers allowed to connect.
#server1 = pubkey1.pem
#server2 = pubkey2.pem
[mcu]
# The type of the MCU to use. Currently only "janus" is supported.
type = janus
# The URL to the websocket endpoint of the MCU server.
url = ws://localhost:8188/
# The maximum bitrate per publishing stream (in bits per second).
# Defaults to 1 mbit/sec.
#maxstreambitrate = 1048576
# The maximum bitrate per screensharing stream (in bits per second).
# Default is 2 mbit/sec.
#maxscreenbitrate = 2097152
[stats]
# Comma-separated list of IP addresses that are allowed to access the stats
# endpoint. Leave empty (or commented) to only allow access from "127.0.0.1".
#allowed_ips =

View file

@ -98,21 +98,31 @@ connectionsperhost = 8
#url = nats://localhost:4222
[mcu]
# The type of the MCU to use. Currently only "janus" is supported.
# The type of the MCU to use. Currently only "janus" and "proxy" are supported.
type = janus
# The URL to the websocket endpoint of the MCU server. Leave empty to disable
# MCU functionality.
# For type "janus": the URL to the websocket endpoint of the MCU server.
# For type "proxy": a space-separated list of proxy URLs to connect to.
# Leave empty to disable MCU functionality.
url =
# The maximum bitrate per publishing stream (in bits per second).
# For type "janus": the maximum bitrate per publishing stream (in bits per
# second).
# Defaults to 1 mbit/sec.
#maxstreambitrate = 1048576
# The maximum bitrate per screensharing stream (in bits per second).
# For type "janus": the maximum bitrate per screensharing stream (in bits per
# second).
# Default is 2 mbit/sec.
#maxscreenbitrate = 2097152
# For type "proxy": the id of the token to use when connecting to proxy servers.
#token_id = server1
# For type "proxy": the private key for the configured token id to use when
# connecting to proxy servers.
#token_key = privkey.pem
[turn]
# API key that the MCU will need to send when requesting TURN credentials.
#apikey = the-api-key-for-the-rest-service

161
src/proxy/main.go Normal file
View file

@ -0,0 +1,161 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/dlintw/goconf"
"github.com/gorilla/mux"
"github.com/nats-io/go-nats"
"signaling"
)
var (
version = "unreleased"
configFlag = flag.String("config", "proxy.conf", "config file to use")
showVersion = flag.Bool("version", false, "show version and quit")
)
const (
defaultReadTimeout = 15
defaultWriteTimeout = 15
proxyDebugMessages = false
)
func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile)
flag.Parse()
if *showVersion {
fmt.Printf("nextcloud-spreed-signaling-proxy version %s/%s\n", version, runtime.Version())
os.Exit(0)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
signal.Notify(sigChan, syscall.SIGHUP)
signal.Notify(sigChan, syscall.SIGUSR1)
log.Printf("Starting up version %s/%s as pid %d", version, runtime.Version(), os.Getpid())
config, err := goconf.ReadConfigFile(*configFlag)
if err != nil {
log.Fatal("Could not read configuration: ", err)
}
cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
log.Printf("Using a maximum of %d CPUs\n", cpus)
natsUrl, _ := config.GetString("nats", "url")
if natsUrl == "" {
natsUrl = nats.DefaultURL
}
nats, err := signaling.NewNatsClient(natsUrl)
if err != nil {
log.Fatal("Could not create NATS client: ", err)
}
r := mux.NewRouter()
proxy, err := NewProxyServer(r, version, config, nats)
if err != nil {
log.Fatal(err)
}
if err := proxy.Start(config); err != nil {
log.Fatal(err)
}
defer proxy.Stop()
if addr, _ := config.GetString("http", "listen"); addr != "" {
readTimeout, _ := config.GetInt("http", "readtimeout")
if readTimeout <= 0 {
readTimeout = defaultReadTimeout
}
writeTimeout, _ := config.GetInt("http", "writetimeout")
if writeTimeout <= 0 {
writeTimeout = defaultWriteTimeout
}
for _, address := range strings.Split(addr, " ") {
go func(address string) {
log.Println("Listening on", address)
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatal("Could not start listening: ", err)
}
srv := &http.Server{
Handler: r,
Addr: addr,
ReadTimeout: time.Duration(readTimeout) * time.Second,
WriteTimeout: time.Duration(writeTimeout) * time.Second,
}
if err := srv.Serve(listener); err != nil {
log.Fatal("Could not start server: ", err)
}
}(address)
}
}
loop:
for {
select {
case sig := <-sigChan:
switch sig {
case os.Interrupt:
log.Println("Interrupted")
break loop
case syscall.SIGHUP:
log.Printf("Received SIGHUP, reloading %s", *configFlag)
if config, err := goconf.ReadConfigFile(*configFlag); err != nil {
log.Printf("Could not read configuration from %s: %s", *configFlag, err)
} else {
proxy.Reload(config)
}
case syscall.SIGUSR1:
log.Printf("Received SIGUSR1, scheduling server to shutdown")
proxy.ScheduleShutdown()
}
case <-proxy.ShutdownChannel():
log.Printf("All clients disconnected, shutting down")
break loop
}
}
}

55
src/proxy/proxy_client.go Normal file
View file

@ -0,0 +1,55 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"sync/atomic"
"unsafe"
"github.com/gorilla/websocket"
"signaling"
)
type ProxyClient struct {
signaling.Client
proxy *ProxyServer
session unsafe.Pointer
}
func NewProxyClient(proxy *ProxyServer, conn *websocket.Conn, addr string) (*ProxyClient, error) {
client := &ProxyClient{
proxy: proxy,
}
client.SetConn(conn, addr)
return client, nil
}
func (c *ProxyClient) GetSession() *ProxySession {
return (*ProxySession)(atomic.LoadPointer(&c.session))
}
func (c *ProxyClient) SetSession(session *ProxySession) {
atomic.StorePointer(&c.session, unsafe.Pointer(session))
}

1037
src/proxy/proxy_server.go Normal file

File diff suppressed because it is too large Load diff

272
src/proxy/proxy_session.go Normal file
View file

@ -0,0 +1,272 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"log"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/context"
"signaling"
)
const (
// Sessions expire if they have not been used for one minute.
sessionExpirationTime = time.Minute
)
type ProxySession struct {
proxy *ProxyServer
id string
sid uint64
lastUsed int64
clientLock sync.Mutex
client *ProxyClient
pendingMessages []*signaling.ProxyServerMessage
publishersLock sync.Mutex
publishers map[string]signaling.McuPublisher
publisherIds map[string]string
subscribersLock sync.Mutex
subscribers map[string]signaling.McuSubscriber
subscriberIds map[string]string
}
func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession {
return &ProxySession{
proxy: proxy,
id: id,
sid: sid,
lastUsed: time.Now().UnixNano(),
publishers: make(map[string]signaling.McuPublisher),
publisherIds: make(map[string]string),
subscribers: make(map[string]signaling.McuSubscriber),
subscriberIds: make(map[string]string),
}
}
func (s *ProxySession) PublicId() string {
return s.id
}
func (s *ProxySession) Sid() uint64 {
return s.sid
}
func (s *ProxySession) LastUsed() time.Time {
lastUsed := atomic.LoadInt64(&s.lastUsed)
return time.Unix(0, lastUsed)
}
func (s *ProxySession) IsExpired() bool {
expiresAt := s.LastUsed().Add(sessionExpirationTime)
return expiresAt.Before(time.Now())
}
func (s *ProxySession) MarkUsed() {
now := time.Now()
atomic.StoreInt64(&s.lastUsed, now.UnixNano())
}
func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {
s.clientLock.Lock()
prev := s.client
s.client = client
var messages []*signaling.ProxyServerMessage
if client != nil {
messages, s.pendingMessages = s.pendingMessages, nil
}
s.clientLock.Unlock()
if prev != nil {
prev.SetSession(nil)
}
if client != nil {
s.MarkUsed()
client.SetSession(s)
for _, msg := range messages {
client.SendMessage(msg)
}
}
return prev
}
func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) {
id := s.proxy.GetClientId(client)
if id == "" {
log.Printf("Received candidate %+v from unknown %s client %s (%+v)", candidate, client.StreamType(), client.Id(), client)
return
}
msg := &signaling.ProxyServerMessage{
Type: "payload",
Payload: &signaling.PayloadProxyServerMessage{
Type: "candidate",
ClientId: id,
Payload: map[string]interface{}{
"candidate": candidate,
},
},
}
s.sendMessage(msg)
}
func (s *ProxySession) sendMessage(message *signaling.ProxyServerMessage) {
var client *ProxyClient
s.clientLock.Lock()
client = s.client
if client == nil {
s.pendingMessages = append(s.pendingMessages, message)
}
s.clientLock.Unlock()
if client != nil {
client.SendMessage(message)
}
}
func (s *ProxySession) OnIceCompleted(client signaling.McuClient) {
id := s.proxy.GetClientId(client)
if id == "" {
log.Printf("Received ice completed event from unknown %s client %s (%+v)", client.StreamType(), client.Id(), client)
return
}
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "ice-completed",
ClientId: id,
},
}
s.sendMessage(msg)
}
func (s *ProxySession) PublisherClosed(publisher signaling.McuPublisher) {
if id := s.DeletePublisher(publisher); id != "" {
s.proxy.DeleteClient(id, publisher)
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "publisher-closed",
ClientId: id,
},
}
s.sendMessage(msg)
}
}
func (s *ProxySession) SubscriberClosed(subscriber signaling.McuSubscriber) {
if id := s.DeleteSubscriber(subscriber); id != "" {
s.proxy.DeleteClient(id, subscriber)
msg := &signaling.ProxyServerMessage{
Type: "event",
Event: &signaling.EventProxyServerMessage{
Type: "subscriber-closed",
ClientId: id,
},
}
s.sendMessage(msg)
}
}
func (s *ProxySession) StorePublisher(ctx context.Context, id string, publisher signaling.McuPublisher) {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
s.publishers[id] = publisher
s.publisherIds[publisher.Id()] = id
}
func (s *ProxySession) DeletePublisher(publisher signaling.McuPublisher) string {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
id, found := s.publisherIds[publisher.Id()]
if !found {
return ""
}
delete(s.publishers, id)
delete(s.publisherIds, publisher.Id())
return id
}
func (s *ProxySession) StoreSubscriber(ctx context.Context, id string, subscriber signaling.McuSubscriber) {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
s.subscribers[id] = subscriber
s.subscriberIds[subscriber.Id()] = id
}
func (s *ProxySession) DeleteSubscriber(subscriber signaling.McuSubscriber) string {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()
id, found := s.subscriberIds[subscriber.Id()]
if !found {
return ""
}
delete(s.subscribers, id)
delete(s.subscriberIds, subscriber.Id())
return id
}
func (s *ProxySession) clearPublishers() {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
go func(publishers map[string]signaling.McuPublisher) {
for _, publisher := range publishers {
publisher.Close(context.Background())
}
}(s.publishers)
s.publishers = make(map[string]signaling.McuPublisher)
s.publisherIds = make(map[string]string)
}
func (s *ProxySession) clearSubscribers() {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()
go func(subscribers map[string]signaling.McuSubscriber) {
for _, subscriber := range subscribers {
subscriber.Close(context.Background())
}
}(s.subscribers)
s.subscribers = make(map[string]signaling.McuSubscriber)
s.subscriberIds = make(map[string]string)
}
func (s *ProxySession) NotifyDisconnected() {
s.clearPublishers()
s.clearSubscribers()
}

View file

@ -166,6 +166,8 @@ func main() {
switch mcuType {
case signaling.McuTypeJanus:
mcu, err = signaling.NewMcuJanus(mcuUrl, config, nats)
case signaling.McuTypeProxy:
mcu, err = signaling.NewMcuProxy(mcuUrl, config)
default:
log.Fatal("Unsupported MCU type: ", mcuType)
}

254
src/signaling/api_proxy.go Normal file
View file

@ -0,0 +1,254 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"fmt"
"gopkg.in/dgrijalva/jwt-go.v3"
)
type ProxyClientMessage struct {
// The unique request id (optional).
Id string `json:"id,omitempty"`
// The type of the request.
Type string `json:"type"`
// Filled for type "hello"
Hello *HelloProxyClientMessage `json:"hello,omitempty"`
Bye *ByeProxyClientMessage `json:"bye,omitempty"`
Command *CommandProxyClientMessage `json:"command,omitempty"`
Payload *PayloadProxyClientMessage `json:"payload,omitempty"`
}
func (m *ProxyClientMessage) CheckValid() error {
switch m.Type {
case "":
return fmt.Errorf("type missing")
case "hello":
if m.Hello == nil {
return fmt.Errorf("hello missing")
} else if err := m.Hello.CheckValid(); err != nil {
return err
}
case "bye":
if m.Bye != nil {
// Bye contents are optional
if err := m.Bye.CheckValid(); err != nil {
return err
}
}
case "command":
if m.Command == nil {
return fmt.Errorf("command missing")
} else if err := m.Command.CheckValid(); err != nil {
return err
}
case "payload":
if m.Payload == nil {
return fmt.Errorf("payload missing")
} else if err := m.Payload.CheckValid(); err != nil {
return err
}
}
return nil
}
func (m *ProxyClientMessage) NewErrorServerMessage(e *Error) *ProxyServerMessage {
return &ProxyServerMessage{
Id: m.Id,
Type: "error",
Error: e,
}
}
func (m *ProxyClientMessage) NewWrappedErrorServerMessage(e error) *ProxyServerMessage {
return m.NewErrorServerMessage(NewError("internal_error", e.Error()))
}
// ProxyServerMessage is a message that is sent from the server to a client.
type ProxyServerMessage struct {
Id string `json:"id,omitempty"`
Type string `json:"type"`
Error *Error `json:"error,omitempty"`
Hello *HelloProxyServerMessage `json:"hello,omitempty"`
Bye *ByeProxyServerMessage `json:"bye,omitempty"`
Command *CommandProxyServerMessage `json:"command,omitempty"`
Payload *PayloadProxyServerMessage `json:"payload,omitempty"`
Event *EventProxyServerMessage `json:"event,omitempty"`
}
func (r *ProxyServerMessage) CloseAfterSend(session Session) bool {
if r.Type == "bye" {
return true
}
return false
}
// Type "hello"
type TokenClaims struct {
jwt.StandardClaims
}
type HelloProxyClientMessage struct {
Version string `json:"version"`
ResumeId string `json:"resumeid"`
Features []string `json:"features,omitempty"`
// The authentication credentials.
Token string `json:"token"`
}
func (m *HelloProxyClientMessage) CheckValid() error {
if m.Version != HelloVersion {
return fmt.Errorf("unsupported hello version: %s", m.Version)
}
if m.ResumeId == "" {
if m.Token == "" {
return fmt.Errorf("token missing")
}
}
return nil
}
type HelloProxyServerMessage struct {
Version string `json:"version"`
SessionId string `json:"sessionid"`
Server *HelloServerMessageServer `json:"server,omitempty"`
}
// Type "bye"
type ByeProxyClientMessage struct {
}
func (m *ByeProxyClientMessage) CheckValid() error {
// No additional validation required.
return nil
}
type ByeProxyServerMessage struct {
Reason string `json:"reason"`
}
// Type "command"
type CommandProxyClientMessage struct {
Type string `json:"type"`
StreamType string `json:"streamType,omitempty"`
PublisherId string `json:"publisherId,omitempty"`
ClientId string `json:"clientId,omitempty"`
}
func (m *CommandProxyClientMessage) CheckValid() error {
switch m.Type {
case "":
return fmt.Errorf("type missing")
case "create-publisher":
if m.StreamType == "" {
return fmt.Errorf("stream type missing")
}
case "create-subscriber":
if m.PublisherId == "" {
return fmt.Errorf("publisher id missing")
}
if m.StreamType == "" {
return fmt.Errorf("stream type missing")
}
case "delete-publisher":
fallthrough
case "delete-subscriber":
if m.ClientId == "" {
return fmt.Errorf("client id missing")
}
}
return nil
}
type CommandProxyServerMessage struct {
Id string `json:"id,omitempty"`
}
// Type "payload"
type PayloadProxyClientMessage struct {
Type string `json:"type"`
ClientId string `json:"clientId"`
Payload map[string]interface{} `json:"payload,omitempty"`
}
func (m *PayloadProxyClientMessage) CheckValid() error {
switch m.Type {
case "":
return fmt.Errorf("type missing")
case "offer":
fallthrough
case "answer":
fallthrough
case "candidate":
if len(m.Payload) == 0 {
return fmt.Errorf("payload missing")
}
case "endOfCandidates":
fallthrough
case "requestoffer":
// No payload required.
}
if m.ClientId == "" {
return fmt.Errorf("client id missing")
}
return nil
}
type PayloadProxyServerMessage struct {
Type string `json:"type"`
ClientId string `json:"clientId"`
Payload map[string]interface{} `json:"payload"`
}
// Type "event"
type EventProxyServerMessage struct {
Type string `json:"type"`
ClientId string `json:"clientId,omitempty"`
Load int64 `json:"load,omitempty"`
}

View file

@ -278,6 +278,7 @@ const (
type HelloServerMessageServer struct {
Version string `json:"version"`
Features []string `json:"features,omitempty"`
Country string `json:"country,omitempty"`
}
type HelloServerMessage struct {

View file

@ -25,7 +25,6 @@ import (
"bytes"
"encoding/json"
"log"
"net"
"strconv"
"strings"
"sync"
@ -52,16 +51,28 @@ const (
)
var (
_noCountry string = "no-country"
noCountry *string = &_noCountry
noCountry string = "no-country"
_loopback string = "loopback"
loopback *string = &_loopback
loopback string = "loopback"
_unknownCountry string = "unknown-country"
unknownCountry *string = &_unknownCountry
unknownCountry string = "unknown-country"
)
func IsValidCountry(country string) bool {
switch country {
case "":
fallthrough
case noCountry:
fallthrough
case loopback:
fallthrough
case unknownCountry:
return false
default:
return true
}
}
var (
InvalidFormat = NewError("invalid_format", "Invalid data format.")
@ -72,8 +83,13 @@ var (
}
)
type WritableClientMessage interface {
json.Marshaler
CloseAfterSend(session Session) bool
}
type Client struct {
hub *Hub
conn *websocket.Conn
addr string
agent string
@ -85,9 +101,14 @@ type Client struct {
mu sync.Mutex
closeChan chan bool
OnLookupCountry func(*Client) string
OnClosed func(*Client)
OnMessageReceived func(*Client, []byte)
OnRTTReceived func(*Client, time.Duration)
}
func NewClient(hub *Hub, conn *websocket.Conn, remoteAddress string, agent string) (*Client, error) {
func NewClient(conn *websocket.Conn, remoteAddress string, agent string) (*Client, error) {
remoteAddress = strings.TrimSpace(remoteAddress)
if remoteAddress == "" {
remoteAddress = "unknown remote address"
@ -97,15 +118,28 @@ func NewClient(hub *Hub, conn *websocket.Conn, remoteAddress string, agent strin
agent = "unknown user agent"
}
client := &Client{
hub: hub,
conn: conn,
addr: remoteAddress,
agent: agent,
closeChan: make(chan bool, 1),
OnLookupCountry: func(client *Client) string { return unknownCountry },
OnClosed: func(client *Client) {},
OnMessageReceived: func(client *Client, data []byte) {},
OnRTTReceived: func(client *Client, rtt time.Duration) {},
}
return client, nil
}
func (c *Client) SetConn(conn *websocket.Conn, remoteAddress string) {
c.conn = conn
c.addr = remoteAddress
c.closeChan = make(chan bool, 1)
c.OnLookupCountry = func(client *Client) string { return unknownCountry }
c.OnClosed = func(client *Client) {}
c.OnMessageReceived = func(client *Client, data []byte) {}
}
func (c *Client) IsConnected() bool {
return atomic.LoadUint32(&c.closed) == 0
}
@ -132,25 +166,7 @@ func (c *Client) UserAgent() string {
func (c *Client) Country() string {
if c.country == nil {
if c.hub.geoip == nil {
c.country = unknownCountry
return *c.country
}
ip := net.ParseIP(c.RemoteAddr())
if ip == nil {
c.country = noCountry
return *c.country
} else if ip.IsLoopback() {
c.country = loopback
return *c.country
}
country, err := c.hub.geoip.LookupCountry(ip)
if err != nil {
log.Printf("Could not lookup country for %s", ip)
c.country = unknownCountry
return *c.country
}
country := c.OnLookupCountry(c)
c.country = &country
}
@ -164,7 +180,7 @@ func (c *Client) Close() {
c.closeChan <- true
c.hub.processUnregister(c)
c.OnClosed(c)
c.SetSession(nil)
c.mu.Lock()
@ -183,41 +199,6 @@ func (c *Client) SendError(e *Error) bool {
return c.SendMessage(message)
}
func (c *Client) SendRoom(message *ClientMessage, room *Room) bool {
response := &ServerMessage{
Type: "room",
}
if message != nil {
response.Id = message.Id
}
if room == nil {
response.Room = &RoomServerMessage{
RoomId: "",
}
} else {
response.Room = &RoomServerMessage{
RoomId: room.id,
Properties: room.properties,
}
}
return c.SendMessage(response)
}
func (c *Client) SendHelloResponse(message *ClientMessage, session *ClientSession) bool {
response := &ServerMessage{
Id: message.Id,
Type: "hello",
Hello: &HelloServerMessage{
Version: HelloVersion,
SessionId: session.PublicId(),
ResumeId: session.PrivateId(),
UserId: session.UserId(),
Server: c.hub.GetServerInfo(),
},
}
return c.SendMessage(response)
}
func (c *Client) SendByeResponse(message *ClientMessage) bool {
return c.SendByeResponseWithReason(message, "")
}
@ -236,11 +217,11 @@ func (c *Client) SendByeResponseWithReason(message *ClientMessage, reason string
return c.SendMessage(response)
}
func (c *Client) SendMessage(message *ServerMessage) bool {
func (c *Client) SendMessage(message WritableClientMessage) bool {
return c.writeMessage(message)
}
func (c *Client) readPump() {
func (c *Client) ReadPump() {
defer func() {
c.Close()
}()
@ -270,6 +251,7 @@ func (c *Client) readPump() {
} else {
log.Printf("Client from %s has RTT of %d ms (%s)", addr, rtt_ms, rtt)
}
c.OnRTTReceived(c, rtt)
}
return nil
})
@ -312,28 +294,7 @@ func (c *Client) readPump() {
break
}
var message ClientMessage
if err := message.UnmarshalJSON(decodeBuffer.Bytes()); err != nil {
if session := c.GetSession(); session != nil {
log.Printf("Error decoding message from client %s: %v", session.PublicId(), err)
} else {
log.Printf("Error decoding message from %s: %v", addr, err)
}
c.SendError(InvalidFormat)
continue
}
if err := message.CheckValid(); err != nil {
if session := c.GetSession(); session != nil {
log.Printf("Invalid message %+v from client %s: %v", message, session.PublicId(), err)
} else {
log.Printf("Invalid message %+v from %s: %v", message, addr, err)
}
c.SendMessage(message.NewErrorServerMessage(InvalidFormat))
continue
}
c.hub.processMessage(c, &message)
c.OnMessageReceived(c, decodeBuffer.Bytes())
}
}
@ -407,7 +368,7 @@ func (c *Client) writeError(e error) bool {
return false
}
func (c *Client) writeMessage(message *ServerMessage) bool {
func (c *Client) writeMessage(message WritableClientMessage) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
@ -417,7 +378,7 @@ func (c *Client) writeMessage(message *ServerMessage) bool {
return c.writeMessageLocked(message)
}
func (c *Client) writeMessageLocked(message *ServerMessage) bool {
func (c *Client) writeMessageLocked(message WritableClientMessage) bool {
if !c.writeInternal(message) {
return false
}
@ -458,7 +419,7 @@ func (c *Client) sendPing() bool {
return true
}
func (c *Client) writePump() {
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()

View file

@ -436,6 +436,10 @@ func (s *ClientSession) GetClient() *Client {
s.mu.Lock()
defer s.mu.Unlock()
return s.getClientUnlocked()
}
func (s *ClientSession) getClientUnlocked() *Client {
return s.client
}
@ -554,9 +558,10 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
publisher, found := s.publishers[streamType]
if !found {
client := s.getClientUnlocked()
s.mu.Unlock()
var err error
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType)
publisher, err = mcu.NewPublisher(ctx, s, s.PublicId(), streamType, client)
s.mu.Lock()
if err != nil {
return nil, err

View file

@ -30,6 +30,7 @@ import (
"fmt"
"hash/fnv"
"log"
"net"
"net/http"
"strings"
"sync"
@ -633,7 +634,7 @@ func (h *Hub) processRegister(client *Client, message *ClientMessage, backend *B
h.setDecodedSessionId(privateSessionId, privateSessionName, sessionIdData)
h.setDecodedSessionId(publicSessionId, publicSessionName, sessionIdData)
client.SendHelloResponse(message, session)
h.sendHelloResponse(client, message, session)
}
func (h *Hub) processUnregister(client *Client) *ClientSession {
@ -656,7 +657,28 @@ func (h *Hub) processUnregister(client *Client) *ClientSession {
return session
}
func (h *Hub) processMessage(client *Client, message *ClientMessage) {
func (h *Hub) processMessage(client *Client, data []byte) {
var message ClientMessage
if err := message.UnmarshalJSON(data); err != nil {
if session := client.GetSession(); session != nil {
log.Printf("Error decoding message from client %s: %v", session.PublicId(), err)
} else {
log.Printf("Error decoding message from %s: %v", client.RemoteAddr(), err)
}
client.SendError(InvalidFormat)
return
}
if err := message.CheckValid(); err != nil {
if session := client.GetSession(); session != nil {
log.Printf("Invalid message %+v from client %s: %v", message, session.PublicId(), err)
} else {
log.Printf("Invalid message %+v from %s: %v", message, client.RemoteAddr(), err)
}
client.SendMessage(message.NewErrorServerMessage(InvalidFormat))
return
}
session := client.GetSession()
if session == nil {
if message.Type != "hello" {
@ -664,19 +686,19 @@ func (h *Hub) processMessage(client *Client, message *ClientMessage) {
return
}
h.processHello(client, message)
h.processHello(client, &message)
return
}
switch message.Type {
case "room":
h.processRoom(client, message)
h.processRoom(client, &message)
case "message":
h.processMessageMsg(client, message)
h.processMessageMsg(client, &message)
case "control":
h.processControlMsg(client, message)
h.processControlMsg(client, &message)
case "bye":
h.processByeMsg(client, message)
h.processByeMsg(client, &message)
case "hello":
log.Printf("Ignore hello %+v for already authenticated connection %s", message.Hello, session.PublicId())
default:
@ -684,6 +706,21 @@ func (h *Hub) processMessage(client *Client, message *ClientMessage) {
}
}
func (h *Hub) sendHelloResponse(client *Client, message *ClientMessage, session *ClientSession) bool {
response := &ServerMessage{
Id: message.Id,
Type: "hello",
Hello: &HelloServerMessage{
Version: HelloVersion,
SessionId: session.PublicId(),
ResumeId: session.PrivateId(),
UserId: session.UserId(),
Server: h.GetServerInfo(),
},
}
return client.SendMessage(response)
}
func (h *Hub) processHello(client *Client, message *ClientMessage) {
resumeId := message.Hello.ResumeId
if resumeId != "" {
@ -728,7 +765,7 @@ func (h *Hub) processHello(client *Client, message *ClientMessage) {
log.Printf("Resume session from %s in %s (%s) %s (private=%s)", client.RemoteAddr(), client.Country(), client.UserAgent(), session.PublicId(), session.PrivateId())
client.SendHelloResponse(message, clientSession)
h.sendHelloResponse(client, message, clientSession)
clientSession.NotifySessionResumed(client)
return
}
@ -839,6 +876,26 @@ func (h *Hub) disconnectByRoomSessionId(roomSessionId string) {
session.Close()
}
func (h *Hub) sendRoom(client *Client, message *ClientMessage, room *Room) bool {
response := &ServerMessage{
Type: "room",
}
if message != nil {
response.Id = message.Id
}
if room == nil {
response.Room = &RoomServerMessage{
RoomId: "",
}
} else {
response.Room = &RoomServerMessage{
RoomId: room.id,
Properties: room.properties,
}
}
return client.SendMessage(response)
}
func (h *Hub) processRoom(client *Client, message *ClientMessage) {
session := client.GetSession()
roomId := message.Room.RoomId
@ -850,7 +907,7 @@ func (h *Hub) processRoom(client *Client, message *ClientMessage) {
// We can handle leaving a room directly.
if session.LeaveRoom(true) != nil {
// User was in a room before, so need to notify about leaving it.
client.SendRoom(message, nil)
h.sendRoom(client, message, nil)
}
if session.UserId() == "" && session.ClientType() != HelloClientTypeInternal {
h.startWaitAnonymousClientRoom(client)
@ -965,7 +1022,7 @@ func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *Back
if err := session.SubscribeRoomNats(h.nats, roomId, message.Room.SessionId); err != nil {
client.SendMessage(message.NewWrappedErrorServerMessage(err))
// The client (implicitly) left the room due to an error.
client.SendRoom(nil, nil)
h.sendRoom(client, nil, nil)
return
}
@ -978,7 +1035,7 @@ func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *Back
client.SendMessage(message.NewWrappedErrorServerMessage(err))
// The client (implicitly) left the room due to an error.
session.UnsubscribeRoomNats()
client.SendRoom(nil, nil)
h.sendRoom(client, nil, nil)
return
}
}
@ -992,7 +1049,7 @@ func (h *Hub) processJoinRoom(client *Client, message *ClientMessage, room *Back
if room.Room.Permissions != nil {
session.SetPermissions(*room.Room.Permissions)
}
client.SendRoom(message, r)
h.sendRoom(client, message, r)
h.notifyUserJoinedRoom(r, client, session, room.Room.Session)
}
@ -1427,7 +1484,7 @@ func (h *Hub) processRoomDeleted(message *BackendServerRoomRequest) {
switch sess := session.(type) {
case *ClientSession:
if client := sess.GetClient(); client != nil {
client.SendRoom(nil, nil)
h.sendRoom(client, nil, nil)
}
}
}
@ -1477,6 +1534,26 @@ func getRealUserIP(r *http.Request) string {
return r.RemoteAddr
}
func (h *Hub) lookupClientCountry(client *Client) string {
ip := net.ParseIP(client.RemoteAddr())
if ip == nil {
return noCountry
} else if ip.IsLoopback() {
return loopback
}
country, err := h.geoip.LookupCountry(ip)
if err != nil {
log.Printf("Could not lookup country for %s: %s", ip, err)
return unknownCountry
}
if country == "" {
return unknownCountry
}
return country
}
func (h *Hub) serveWs(w http.ResponseWriter, r *http.Request) {
addr := getRealUserIP(r)
agent := r.Header.Get("User-Agent")
@ -1487,13 +1564,21 @@ func (h *Hub) serveWs(w http.ResponseWriter, r *http.Request) {
return
}
client, err := NewClient(h, conn, addr, agent)
client, err := NewClient(conn, addr, agent)
if err != nil {
log.Printf("Could not create client for %s: %s", addr, err)
return
}
if h.geoip != nil {
client.OnLookupCountry = h.lookupClientCountry
}
client.OnMessageReceived = h.processMessage
client.OnClosed = func(client *Client) {
h.processUnregister(client)
}
h.processNewClient(client)
go client.writePump()
go client.readPump()
go client.WritePump()
go client.ReadPump()
}

View file

@ -22,17 +22,24 @@
package signaling
import (
"fmt"
"golang.org/x/net/context"
)
const (
McuTypeJanus = "janus"
McuTypeProxy = "proxy"
McuTypeDefault = McuTypeJanus
)
var (
ErrNotConnected = fmt.Errorf("Not connected")
)
type McuListener interface {
Session
PublicId() string
OnIceCandidate(client McuClient, candidate interface{})
OnIceCompleted(client McuClient)
@ -41,13 +48,20 @@ type McuListener interface {
SubscriberClosed(subscriber McuSubscriber)
}
type McuInitiator interface {
Country() string
}
type Mcu interface {
Start() error
Stop()
SetOnConnected(func())
SetOnDisconnected(func())
GetStats() interface{}
NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error)
NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error)
NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error)
}

View file

@ -28,6 +28,7 @@ import (
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/dlintw/goconf"
@ -64,8 +65,6 @@ var (
videoPublisherUserId: streamTypeVideo,
screenPublisherUserId: streamTypeScreen,
}
ErrNotConnected = fmt.Errorf("Not connected")
)
func getPluginValue(data janus.PluginData, pluginName string, key string) interface{} {
@ -161,8 +160,13 @@ type mcuJanus struct {
reconnectInterval time.Duration
connectedSince time.Time
onConnected atomic.Value
onDisconnected atomic.Value
}
func emptyOnConnected() {}
func emptyOnDisconnected() {}
func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, error) {
maxStreamBitrate, _ := config.GetInt("mcu", "maxstreambitrate")
if maxStreamBitrate <= 0 {
@ -190,6 +194,9 @@ func NewMcuJanus(url string, config *goconf.ConfigFile, nats NatsClient) (Mcu, e
reconnectInterval: initialReconnectInterval,
}
mcu.onConnected.Store(emptyOnConnected)
mcu.onDisconnected.Store(emptyOnDisconnected)
mcu.reconnectTimer = time.AfterFunc(mcu.reconnectInterval, mcu.doReconnect)
mcu.reconnectTimer.Stop()
if err := mcu.reconnect(); err != nil {
@ -269,6 +276,7 @@ func (m *mcuJanus) scheduleReconnect(err error) {
func (m *mcuJanus) ConnectionInterrupted() {
m.scheduleReconnect(nil)
m.notifyOnDisconnected()
}
func (m *mcuJanus) Start() error {
@ -314,6 +322,8 @@ func (m *mcuJanus) Start() error {
log.Println("Created Janus handle", m.handle.Id)
go m.run()
m.notifyOnConnected()
return nil
}
@ -349,6 +359,32 @@ func (m *mcuJanus) Stop() {
m.reconnectTimer.Stop()
}
func (m *mcuJanus) SetOnConnected(f func()) {
if f == nil {
f = emptyOnConnected
}
m.onConnected.Store(f)
}
func (m *mcuJanus) notifyOnConnected() {
f := m.onConnected.Load().(func())
f()
}
func (m *mcuJanus) SetOnDisconnected(f func()) {
if f == nil {
f = emptyOnDisconnected
}
m.onDisconnected.Store(f)
}
func (m *mcuJanus) notifyOnDisconnected() {
f := m.onDisconnected.Load().(func())
f()
}
type mcuJanusConnectionStats struct {
Url string `json:"url"`
Connected bool `json:"connected"`
@ -599,7 +635,7 @@ func (m *mcuJanus) getOrCreatePublisherHandle(ctx context.Context, id string, st
return handle, response.Session, roomId, nil
}
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) {
func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) {
if _, found := streamTypeUserIds[streamType]; !found {
return nil, fmt.Errorf("Unsupported stream type %s", streamType)
}

1122
src/signaling/mcu_proxy.go Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,86 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2020 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"testing"
)
func newProxyConnectionWithCountry(country string) *mcuProxyConnection {
conn := &mcuProxyConnection{}
conn.country.Store(country)
return conn
}
func Test_sortConnectionsForCountry(t *testing.T) {
conn_de := newProxyConnectionWithCountry("DE")
conn_at := newProxyConnectionWithCountry("AT")
conn_jp := newProxyConnectionWithCountry("JP")
conn_us := newProxyConnectionWithCountry("US")
testcases := map[string][][]*mcuProxyConnection{
// Direct country match
"DE": [][]*mcuProxyConnection{
[]*mcuProxyConnection{conn_at, conn_jp, conn_de},
[]*mcuProxyConnection{conn_de, conn_at, conn_jp},
},
// Direct country match
"AT": [][]*mcuProxyConnection{
[]*mcuProxyConnection{conn_at, conn_jp, conn_de},
[]*mcuProxyConnection{conn_at, conn_de, conn_jp},
},
// Continent match
"CH": [][]*mcuProxyConnection{
[]*mcuProxyConnection{conn_de, conn_jp, conn_at},
[]*mcuProxyConnection{conn_de, conn_at, conn_jp},
},
// Direct country match
"JP": [][]*mcuProxyConnection{
[]*mcuProxyConnection{conn_de, conn_jp, conn_at},
[]*mcuProxyConnection{conn_jp, conn_de, conn_at},
},
// Continent match
"CN": [][]*mcuProxyConnection{
[]*mcuProxyConnection{conn_de, conn_jp, conn_at},
[]*mcuProxyConnection{conn_jp, conn_de, conn_at},
},
// Partial continent match
"RU": [][]*mcuProxyConnection{
[]*mcuProxyConnection{conn_us, conn_de, conn_jp, conn_at},
[]*mcuProxyConnection{conn_de, conn_jp, conn_at, conn_us},
},
// No match
"AU": [][]*mcuProxyConnection{
[]*mcuProxyConnection{conn_us, conn_de, conn_jp, conn_at},
[]*mcuProxyConnection{conn_us, conn_de, conn_jp, conn_at},
},
}
for country, test := range testcases {
sorted := sortConnectionsForCountry(test[0], country)
for idx, conn := range sorted {
if test[1][idx] != conn {
t.Errorf("Index %d for %s: expected %s, got %s", idx, country, test[1][idx].Country(), conn.Country())
}
}
}
}

View file

@ -41,11 +41,17 @@ func (m *TestMCU) Start() error {
func (m *TestMCU) Stop() {
}
func (m *TestMCU) SetOnConnected(f func()) {
}
func (m *TestMCU) SetOnDisconnected(f func()) {
}
func (m *TestMCU) GetStats() interface{} {
return nil
}
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string) (McuPublisher, error) {
func (m *TestMCU) NewPublisher(ctx context.Context, listener McuListener, id string, streamType string, initiator McuInitiator) (McuPublisher, error) {
return nil, fmt.Errorf("Not implemented")
}