Add initial clustering support.

This commit is contained in:
Joachim Bauch 2022-06-21 16:04:40 +02:00
parent 7b24dc1d1d
commit a0d3af14e0
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
21 changed files with 809 additions and 48 deletions

View file

@ -45,6 +45,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt -y update && sudo apt -y install protobuf-compiler
make common
- name: lint

View file

@ -50,6 +50,10 @@ jobs:
path: ${{ steps.go-cache-paths.outputs.go-mod }}
key: ${{ runner.os }}-${{ steps.go-cache-paths.outputs.go-version }}-mod-${{ hashFiles('**/go.mod', '**/go.sum') }}
- name: Install dependencies
run: |
sudo apt -y update && sudo apt -y install protobuf-compiler
- name: Build applications
run: |
echo "Building with $(nproc) threads"

1
.gitignore vendored
View file

@ -3,6 +3,7 @@ vendor/
*_easyjson.go
*.pem
*.pb.go
*.prof
*.socket
*.tar.gz

View file

@ -3,6 +3,7 @@ FROM golang:1.18 AS builder
WORKDIR /workdir
COPY . .
RUN apt -y update && apt -y install protobuf-compiler
RUN make build
FROM alpine:3.15

View file

@ -56,6 +56,14 @@ $(GOPATHBIN)/easyjson:
$(GO) get -u -d github.com/mailru/easyjson/...
$(GO) install github.com/mailru/easyjson/...
$(GOPATHBIN)/protoc-gen-go:
$(GO) get -u -d google.golang.org/protobuf/cmd/protoc-gen-go
$(GO) install google.golang.org/protobuf/cmd/protoc-gen-go
$(GOPATHBIN)/protoc-gen-go-grpc:
$(GO) get -u -d google.golang.org/grpc/cmd/protoc-gen-go-grpc
$(GO) install google.golang.org/grpc/cmd/protoc-gen-go-grpc
continentmap.go:
$(CURDIR)/scripts/get_continent_map.py $@
@ -70,7 +78,7 @@ check-continentmap:
get:
$(GO) get $(PACKAGE)
fmt: hook
fmt: hook | common_proto
$(GOFMT) -s -w *.go client proxy server
vet: common
@ -83,23 +91,36 @@ cover: vet common
rm -f cover.out && \
$(GO) test -v -timeout $(TIMEOUT) -coverprofile cover.out $(ALL_PACKAGES) && \
sed -i "/_easyjson/d" cover.out && \
sed -i "/\.pb\.go/d" cover.out && \
$(GO) tool cover -func=cover.out
coverhtml: vet common
rm -f cover.out && \
$(GO) test -v -timeout $(TIMEOUT) -coverprofile cover.out $(ALL_PACKAGES) && \
sed -i "/_easyjson/d" cover.out && \
sed -i "/\.pb\.go/d" cover.out && \
$(GO) tool cover -html=cover.out -o coverage.html
%_easyjson.go: %.go $(GOPATHBIN)/easyjson
%_easyjson.go: %.go $(GOPATHBIN)/easyjson | common_proto
PATH="$(GODIR)":$(PATH) "$(GOPATHBIN)/easyjson" -all $*.go
common: \
%.pb.go: %.proto $(GOPATHBIN)/protoc-gen-go $(GOPATHBIN)/protoc-gen-go-grpc
PATH="$(GODIR)":"$(GOPATHBIN)":$(PATH) protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
$*.proto
common: common_easyjson common_proto
common_easyjson: \
api_async_easyjson.go \
api_backend_easyjson.go \
api_proxy_easyjson.go \
api_signaling_easyjson.go
common_proto: \
grpc_mcu.pb.go \
grpc_sessions.pb.go
$(BINDIR):
mkdir -p $(BINDIR)
@ -115,6 +136,7 @@ proxy: common $(BINDIR)
clean:
rm -f *_easyjson.go
rm -f easyjson-bootstrap*.go
rm -f *.pb.go
build: server proxy

View file

@ -19,6 +19,7 @@ The following tools are required for building the signaling server.
- git
- go >= 1.17
- make
- protobuf-compiler >= 3
All other dependencies are fetched automatically while building.

View file

@ -22,6 +22,7 @@
package signaling
import (
"context"
"crypto/hmac"
"crypto/rand"
"crypto/sha1"
@ -324,6 +325,8 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso
}
timeout := time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var wg sync.WaitGroup
for _, sessionid := range sessionids {
if sessionid == sessionIdNotInMeeting {
@ -334,7 +337,7 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso
wg.Add(1)
go func(sessionid string) {
defer wg.Done()
if sid, err := b.lookupByRoomSessionId(sessionid, nil, timeout); err != nil {
if sid, err := b.lookupByRoomSessionId(ctx, sessionid, nil); err != nil {
log.Printf("Could not lookup by room session %s: %s", sessionid, err)
} else if sid != "" {
if err := b.events.PublishSessionMessage(sid, backend, msg); err != nil {
@ -377,7 +380,7 @@ func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified
}
}
func (b *BackendServer) lookupByRoomSessionId(roomSessionId string, cache *ConcurrentStringStringMap, timeout time.Duration) (string, error) {
func (b *BackendServer) lookupByRoomSessionId(ctx context.Context, roomSessionId string, cache *ConcurrentStringStringMap) (string, error) {
if roomSessionId == sessionIdNotInMeeting {
log.Printf("Trying to lookup empty room session id: %s", roomSessionId)
return "", nil
@ -389,7 +392,7 @@ func (b *BackendServer) lookupByRoomSessionId(roomSessionId string, cache *Concu
}
}
sid, err := b.roomSessions.GetSessionId(roomSessionId)
sid, err := b.roomSessions.LookupSessionId(ctx, roomSessionId)
if err == ErrNoSuchRoomSession {
return "", nil
} else if err != nil {
@ -402,7 +405,7 @@ func (b *BackendServer) lookupByRoomSessionId(roomSessionId string, cache *Concu
return sid, nil
}
func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, users []map[string]interface{}, timeout time.Duration) []map[string]interface{} {
func (b *BackendServer) fixupUserSessions(ctx context.Context, cache *ConcurrentStringStringMap, users []map[string]interface{}) []map[string]interface{} {
if len(users) == 0 {
return users
}
@ -430,7 +433,7 @@ func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, user
wg.Add(1)
go func(roomSessionId string, u map[string]interface{}) {
defer wg.Done()
if sessionId, err := b.lookupByRoomSessionId(roomSessionId, cache, timeout); err != nil {
if sessionId, err := b.lookupByRoomSessionId(ctx, roomSessionId, cache); err != nil {
log.Printf("Could not lookup by room session %s: %s", roomSessionId, err)
delete(u, "sessionId")
} else if sessionId != "" {
@ -456,11 +459,13 @@ func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request
if !request.InCall.All {
timeout := time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var cache ConcurrentStringStringMap
// Convert (Nextcloud) session ids to signaling session ids.
request.InCall.Users = b.fixupUserSessions(&cache, request.InCall.Users, timeout)
request.InCall.Users = b.fixupUserSessions(ctx, &cache, request.InCall.Users)
// Entries in "Changed" are most likely already fetched through the "Users" list.
request.InCall.Changed = b.fixupUserSessions(&cache, request.InCall.Changed, timeout)
request.InCall.Changed = b.fixupUserSessions(ctx, &cache, request.InCall.Changed)
if len(request.InCall.Users) == 0 && len(request.InCall.Changed) == 0 {
return nil
@ -478,9 +483,11 @@ func (b *BackendServer) sendRoomParticipantsUpdate(roomid string, backend *Backe
timeout := time.Second
// Convert (Nextcloud) session ids to signaling session ids.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var cache ConcurrentStringStringMap
request.Participants.Users = b.fixupUserSessions(&cache, request.Participants.Users, timeout)
request.Participants.Changed = b.fixupUserSessions(&cache, request.Participants.Changed, timeout)
request.Participants.Users = b.fixupUserSessions(ctx, &cache, request.Participants.Users)
request.Participants.Changed = b.fixupUserSessions(ctx, &cache, request.Participants.Changed)
if len(request.Participants.Users) == 0 && len(request.Participants.Changed) == 0 {
return nil

View file

@ -88,7 +88,7 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil
config.AddOption("clients", "internalsecret", string(testInternalSecret))
config.AddOption("geoip", "url", "none")
events := getAsyncEventsForTest(t)
hub, err := NewHub(config, events, r, "no-version")
hub, err := NewHub(config, events, nil, r, "no-version")
if err != nil {
t.Fatal(err)
}

View file

@ -75,6 +75,9 @@ type ClientSession struct {
room unsafe.Pointer
roomSessionId string
publisherWaitersId uint64
publisherWaiters map[uint64]chan bool
publishers map[string]McuPublisher
subscribers map[string]McuSubscriber
@ -803,6 +806,26 @@ func (s *ClientSession) checkOfferTypeLocked(streamType string, data *MessageCli
return 0, nil
}
func (s *ClientSession) wakeupPublisherWaiters() {
for _, ch := range s.publisherWaiters {
ch <- true
}
}
func (s *ClientSession) addPublisherWaiter(ch chan bool) uint64 {
if s.publisherWaiters == nil {
s.publisherWaiters = make(map[uint64]chan bool)
}
id := s.publisherWaitersId + 1
s.publisherWaitersId = id
s.publisherWaiters[id] = ch
return id
}
func (s *ClientSession) removePublisherWaiter(id uint64) {
delete(s.publisherWaiters, id)
}
func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, streamType string, data *MessageClientMessageData) (McuPublisher, error) {
s.mu.Lock()
defer s.mu.Unlock()
@ -851,6 +874,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
s.publishers[streamType] = publisher
}
log.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId())
s.wakeupPublisherWaiters()
} else {
publisher.SetMedia(mediaTypes)
}
@ -858,11 +882,44 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea
return publisher, nil
}
func (s *ClientSession) getPublisherLocked(streamType string) McuPublisher {
return s.publishers[streamType]
}
func (s *ClientSession) GetPublisher(streamType string) McuPublisher {
s.mu.Lock()
defer s.mu.Unlock()
return s.publishers[streamType]
return s.getPublisherLocked(streamType)
}
func (s *ClientSession) GetOrWaitForPublisher(ctx context.Context, streamType string) McuPublisher {
s.mu.Lock()
defer s.mu.Unlock()
publisher := s.getPublisherLocked(streamType)
if publisher != nil {
return publisher
}
ch := make(chan bool, 1)
id := s.addPublisherWaiter(ch)
defer s.removePublisherWaiter(id)
for {
s.mu.Unlock()
select {
case <-ch:
s.mu.Lock()
publisher := s.getPublisherLocked(streamType)
if publisher != nil {
return publisher
}
case <-ctx.Done():
s.mu.Lock()
return nil
}
}
}
func (s *ClientSession) GetOrCreateSubscriber(ctx context.Context, mcu Mcu, id string, streamType string) (McuSubscriber, error) {

225
grpc_client.go Normal file
View file

@ -0,0 +1,225 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"context"
"log"
"net"
"strings"
"sync"
"github.com/dlintw/goconf"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
status "google.golang.org/grpc/status"
)
type grpcClientImpl struct {
RpcMcuClient
RpcSessionsClient
}
func newGrpcClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl {
return &grpcClientImpl{
RpcMcuClient: NewRpcMcuClient(conn),
RpcSessionsClient: NewRpcSessionsClient(conn),
}
}
type GrpcClient struct {
conn *grpc.ClientConn
impl *grpcClientImpl
}
func NewGrpcClient(target string) (*GrpcClient, error) {
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
result := &GrpcClient{
conn: conn,
impl: newGrpcClientImpl(conn),
}
return result, nil
}
func (c *GrpcClient) Target() string {
return c.conn.Target()
}
func (c *GrpcClient) Close() error {
return c.conn.Close()
}
func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) {
// TODO: Remove debug logging
log.Printf("Lookup room session %s on %s", roomSessionId, c.Target())
response, err := c.impl.LookupSessionId(ctx, &LookupSessionIdRequest{
RoomSessionId: roomSessionId,
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return "", ErrNoSuchRoomSession
} else if err != nil {
return "", err
}
sessionId := response.GetSessionId()
if sessionId == "" {
return "", ErrNoSuchRoomSession
}
return sessionId, nil
}
func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room *Room) (bool, error) {
// TODO: Remove debug logging
log.Printf("Check if session %s is in call %s on %s", sessionId, room.Id(), c.Target())
response, err := c.impl.IsSessionInCall(ctx, &IsSessionInCallRequest{
SessionId: sessionId,
RoomId: room.Id(),
BackendUrl: room.Backend().url,
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return false, nil
} else if err != nil {
return false, err
}
return response.GetInCall(), nil
}
func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, streamType string) (string, string, net.IP, error) {
// TODO: Remove debug logging
log.Printf("Get %s publisher id %s on %s", streamType, sessionId, c.Target())
response, err := c.impl.GetPublisherId(ctx, &GetPublisherIdRequest{
SessionId: sessionId,
StreamType: streamType,
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return "", "", nil, nil
} else if err != nil {
return "", "", nil, err
}
return response.GetPublisherId(), response.GetProxyUrl(), net.ParseIP(response.GetIp()), nil
}
type GrpcClients struct {
mu sync.RWMutex
clientsMap map[string]*GrpcClient
clients []*GrpcClient
}
func NewGrpcClients(config *goconf.ConfigFile) (*GrpcClients, error) {
result := &GrpcClients{}
if err := result.load(config); err != nil {
return nil, err
}
return result, nil
}
func (c *GrpcClients) load(config *goconf.ConfigFile) error {
c.mu.Lock()
defer c.mu.Unlock()
targets, _ := config.GetString("grpc", "targets")
clientsMap := make(map[string]*GrpcClient)
var clients []*GrpcClient
removeTargets := make(map[string]bool, len(c.clientsMap))
for target, client := range c.clientsMap {
removeTargets[target] = true
clientsMap[target] = client
}
for _, target := range strings.Split(targets, ",") {
target = strings.TrimSpace(target)
if target == "" {
continue
}
if client, found := clientsMap[target]; found {
clients = append(clients, client)
delete(removeTargets, target)
continue
}
client, err := NewGrpcClient(target)
if err != nil {
for target, client := range clientsMap {
if closeerr := client.Close(); closeerr != nil {
log.Printf("Error closing client to %s: %s", target, closeerr)
}
}
return err
}
log.Printf("Adding %s as GRPC target", target)
clientsMap[target] = client
clients = append(clients, client)
}
for target := range removeTargets {
if client, found := clientsMap[target]; found {
log.Printf("Deleting GRPC target %s", target)
if err := client.Close(); err != nil {
log.Printf("Error closing client to %s: %s", target, err)
}
delete(clientsMap, target)
}
}
c.clients = clients
c.clientsMap = clientsMap
return nil
}
func (c *GrpcClients) Reload(config *goconf.ConfigFile) {
if err := c.load(config); err != nil {
log.Printf("Could not reload RPC clients: %s", err)
}
}
func (c *GrpcClients) Close() {
c.mu.Lock()
defer c.mu.Unlock()
for target, client := range c.clientsMap {
if err := client.Close(); err != nil {
log.Printf("Error closing client to %s: %s", target, err)
}
}
c.clients = nil
c.clientsMap = nil
}
func (c *GrpcClients) GetClients() []*GrpcClient {
c.mu.RLock()
defer c.mu.RUnlock()
return c.clients
}

41
grpc_mcu.proto Normal file
View file

@ -0,0 +1,41 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
syntax = "proto3";
option go_package = "github.com/strukturag/nextcloud-spreed-signaling;signaling";
package signaling;
service RpcMcu {
rpc GetPublisherId(GetPublisherIdRequest) returns (GetPublisherIdReply) {}
}
message GetPublisherIdRequest {
string sessionId = 1;
string streamType = 2;
}
message GetPublisherIdReply {
string publisherId = 1;
string proxyUrl = 2;
string ip = 3;
}

141
grpc_server.go Normal file
View file

@ -0,0 +1,141 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling
import (
"context"
"errors"
"fmt"
"log"
"net"
"github.com/dlintw/goconf"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
type GrpcServer struct {
UnimplementedRpcMcuServer
UnimplementedRpcSessionsServer
conn *grpc.Server
listener net.Listener
hub *Hub
}
func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) {
var listener net.Listener
if addr, _ := config.GetString("grpc", "listen"); addr != "" {
var err error
listener, err = net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("could not create GRPC server: %w", err)
}
}
conn := grpc.NewServer()
result := &GrpcServer{
conn: conn,
listener: listener,
}
RegisterRpcSessionsServer(conn, result)
RegisterRpcMcuServer(conn, result)
return result, nil
}
func (s *GrpcServer) Run() error {
if s.listener == nil {
return nil
}
return s.conn.Serve(s.listener)
}
func (s *GrpcServer) Close() {
s.conn.GracefulStop()
}
func (s *GrpcServer) LookupSessionId(ctx context.Context, request *LookupSessionIdRequest) (*LookupSessionIdReply, error) {
// TODO: Remove debug logging
log.Printf("Lookup session id for room session id %s", request.RoomSessionId)
sid, err := s.hub.roomSessions.GetSessionId(request.RoomSessionId)
if errors.Is(err, ErrNoSuchRoomSession) {
return nil, status.Error(codes.NotFound, "no such room session id")
} else if err != nil {
return nil, err
}
return &LookupSessionIdReply{
SessionId: sid,
}, nil
}
func (s *GrpcServer) IsSessionInCall(ctx context.Context, request *IsSessionInCallRequest) (*IsSessionInCallReply, error) {
// TODO: Remove debug logging
log.Printf("Check if session %s is in call %s on %s", request.SessionId, request.RoomId, request.BackendUrl)
session := s.hub.GetSessionByPublicId(request.SessionId)
if session == nil {
return nil, status.Error(codes.NotFound, "no such session id")
}
result := &IsSessionInCallReply{}
room := session.GetRoom()
if room == nil || room.Id() != request.GetRoomId() || room.Backend().url != request.GetBackendUrl() ||
(session.ClientType() != HelloClientTypeInternal && !room.IsSessionInCall(session)) {
// Recipient is not in a room, a different room or not in the call.
result.InCall = false
} else {
result.InCall = true
}
return result, nil
}
func (s *GrpcServer) GetPublisherId(ctx context.Context, request *GetPublisherIdRequest) (*GetPublisherIdReply, error) {
// TODO: Remove debug logging
log.Printf("Get %s publisher id for session %s", request.StreamType, request.SessionId)
session := s.hub.GetSessionByPublicId(request.SessionId)
if session == nil {
return nil, status.Error(codes.NotFound, "no such session")
}
clientSession, ok := session.(*ClientSession)
if !ok {
return nil, status.Error(codes.NotFound, "no such session")
}
publisher := clientSession.GetOrWaitForPublisher(ctx, request.StreamType)
if publisher, ok := publisher.(*mcuProxyPublisher); ok {
reply := &GetPublisherIdReply{
PublisherId: publisher.Id(),
ProxyUrl: publisher.conn.rawUrl,
}
if ip := publisher.conn.ip; ip != nil {
reply.Ip = ip.String()
}
return reply, nil
}
return nil, status.Error(codes.NotFound, "no such publisher")
}

49
grpc_sessions.proto Normal file
View file

@ -0,0 +1,49 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
syntax = "proto3";
option go_package = "github.com/strukturag/nextcloud-spreed-signaling;signaling";
package signaling;
service RpcSessions {
rpc LookupSessionId(LookupSessionIdRequest) returns (LookupSessionIdReply) {}
rpc IsSessionInCall(IsSessionInCallRequest) returns (IsSessionInCallReply) {}
}
message LookupSessionIdRequest {
string roomSessionId = 1;
}
message LookupSessionIdReply {
string sessionId = 1;
}
message IsSessionInCallRequest {
string sessionId = 1;
string roomId = 2;
string backendUrl = 3;
}
message IsSessionInCallReply {
bool inCall = 1;
}

68
hub.go
View file

@ -28,6 +28,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"log"
@ -148,9 +149,12 @@ type Hub struct {
geoip *GeoLookup
geoipOverrides map[*net.IPNet]string
geoipUpdating int32
rpcServer *GrpcServer
rpcClients *GrpcClients
}
func NewHub(config *goconf.ConfigFile, events AsyncEvents, r *mux.Router, version string) (*Hub, error) {
func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcClients *GrpcClients, r *mux.Router, version string) (*Hub, error) {
hashKey, _ := config.GetString("sessions", "hashkey")
switch len(hashKey) {
case 32:
@ -210,7 +214,12 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, r *mux.Router, versio
decodeCaches = append(decodeCaches, NewLruCache(decodeCacheSize))
}
roomSessions, err := NewBuiltinRoomSessions()
rpcServer, err := NewGrpcServer(config)
if err != nil {
return nil, err
}
roomSessions, err := NewBuiltinRoomSessions(rpcClients)
if err != nil {
return nil, err
}
@ -338,8 +347,12 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, r *mux.Router, versio
geoip: geoip,
geoipOverrides: geoipOverrides,
rpcServer: rpcServer,
rpcClients: rpcClients,
}
backend.hub = hub
rpcServer.hub = hub
hub.upgrader.CheckOrigin = hub.checkOrigin
r.HandleFunc("/spreed", func(w http.ResponseWriter, r *http.Request) {
hub.serveWs(w, r)
@ -437,6 +450,12 @@ func (h *Hub) Run() {
go h.updateGeoDatabase()
h.roomPing.Start()
defer h.roomPing.Stop()
go func() {
if err := h.rpcServer.Run(); err != nil {
log.Fatalf("Could not start RPC server: %s", err)
}
}()
defer h.rpcServer.Close()
housekeeping := time.NewTicker(housekeepingInterval)
geoipUpdater := time.NewTicker(24 * time.Hour)
@ -480,6 +499,7 @@ func (h *Hub) Reload(config *goconf.ConfigFile) {
h.mcu.Reload(config)
}
h.backend.Reload(config)
h.rpcClients.Reload(config)
}
func reverseSessionId(s string) (string, error) {
@ -575,6 +595,10 @@ func (h *Hub) GetSessionByPublicId(sessionId string) Session {
h.mu.Lock()
session := h.sessions[data.Sid]
h.mu.Unlock()
if session != nil && session.PublicId() != sessionId {
// Session was created on different server.
return nil
}
return session
}
@ -1726,7 +1750,41 @@ func sendMcuProcessingFailed(session *ClientSession, message *ClientMessage) {
session.SendMessage(response)
}
func (h *Hub) isInSameCall(senderSession *ClientSession, recipientSessionId string) bool {
func (h *Hub) isInSameCallRemote(ctx context.Context, senderSession *ClientSession, senderRoom *Room, recipientSessionId string) bool {
clients := h.rpcClients.GetClients()
if len(clients) == 0 {
return false
}
var result int32
var wg sync.WaitGroup
rpcCtx, cancel := context.WithCancel(ctx)
defer cancel()
for _, client := range clients {
wg.Add(1)
go func(client *GrpcClient) {
defer wg.Done()
inCall, err := client.IsSessionInCall(rpcCtx, recipientSessionId, senderRoom)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
log.Printf("Error checking session %s in call on %s: %s", recipientSessionId, client.Target(), err)
return
} else if !inCall {
return
}
cancel()
atomic.StoreInt32(&result, 1)
}(client)
}
wg.Wait()
return atomic.LoadInt32(&result) != 0
}
func (h *Hub) isInSameCall(ctx context.Context, senderSession *ClientSession, recipientSessionId string) bool {
if senderSession.ClientType() == HelloClientTypeInternal {
// Internal clients may subscribe all streams.
return true
@ -1741,7 +1799,7 @@ func (h *Hub) isInSameCall(senderSession *ClientSession, recipientSessionId stri
recipientSession := h.GetSessionByPublicId(recipientSessionId)
if recipientSession == nil {
// Recipient session does not exist.
return false
return h.isInSameCallRemote(ctx, senderSession, senderRoom, recipientSessionId)
}
recipientRoom := recipientSession.GetRoom()
@ -1770,7 +1828,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes
// A user is only allowed to subscribe a stream if she is in the same room
// as the other user and both have their "inCall" flag set.
if !h.allowSubscribeAnyStream && !h.isInSameCall(senderSession, message.Recipient.SessionId) {
if !h.allowSubscribeAnyStream && !h.isInSameCall(ctx, senderSession, message.Recipient.SessionId) {
log.Printf("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId)
sendNotAllowed(senderSession, client_message, "Not allowed to request offer.")
return

View file

@ -111,7 +111,7 @@ func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Serve
if err != nil {
t.Fatal(err)
}
h, err := NewHub(config, events, r, "no-version")
h, err := NewHub(config, events, nil, r, "no-version")
if err != nil {
t.Fatal(err)
}

View file

@ -26,6 +26,7 @@ import (
"crypto/rsa"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log"
"net"
@ -1006,20 +1007,13 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe
return publisher, nil
}
func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) {
c.publishersLock.Lock()
id, found := c.publisherIds[publisher+"|"+streamType]
c.publishersLock.Unlock()
if !found {
return nil, fmt.Errorf("Unknown publisher %s", publisher)
}
func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuListener, publisherId string, publisherSessionId string, streamType string) (McuSubscriber, error) {
msg := &ProxyClientMessage{
Type: "command",
Command: &CommandProxyClientMessage{
Type: "create-subscriber",
StreamType: streamType,
PublisherId: id,
PublisherId: publisherId,
},
}
@ -1030,8 +1024,8 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList
}
proxyId := response.Command.Id
log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisher)
subscriber := newMcuProxySubscriber(publisher, response.Command.Sid, streamType, proxyId, c, listener)
log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisherSessionId)
subscriber := newMcuProxySubscriber(publisherSessionId, response.Command.Sid, streamType, proxyId, c, listener)
c.subscribersLock.Lock()
c.subscribers[proxyId] = subscriber
c.subscribersLock.Unlock()
@ -1075,9 +1069,11 @@ type mcuProxy struct {
publisherWaiters map[uint64]chan bool
continentsMap atomic.Value
rpcClients *GrpcClients
}
func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient) (Mcu, error) {
func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *GrpcClients) (Mcu, error) {
urlType, _ := config.GetString("mcu", "urltype")
if urlType == "" {
urlType = proxyUrlTypeStatic
@ -1139,6 +1135,8 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient) (Mcu, error)
publishers: make(map[string]*mcuProxyConnection),
publisherWaiters: make(map[uint64]chan bool),
rpcClients: rpcClients,
}
if err := mcu.loadContinentsMap(config); err != nil {
@ -1850,19 +1848,18 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
return nil, fmt.Errorf("No MCU connection available")
}
func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string, streamType string) *mcuProxyConnection {
func (m *mcuProxy) getPublisherConnection(publisher string, streamType string) *mcuProxyConnection {
m.mu.RLock()
conn := m.publishers[publisher+"|"+streamType]
m.mu.RUnlock()
if conn != nil {
return conn
}
defer m.mu.RUnlock()
log.Printf("No %s publisher %s found yet, deferring", streamType, publisher)
return m.publishers[publisher+"|"+streamType]
}
func (m *mcuProxy) waitForPublisherConnection(ctx context.Context, publisher string, streamType string) *mcuProxyConnection {
m.mu.Lock()
defer m.mu.Unlock()
conn = m.publishers[publisher+"|"+streamType]
conn := m.publishers[publisher+"|"+streamType]
if conn != nil {
// Publisher was created while waiting for lock.
return conn
@ -1890,10 +1887,92 @@ func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string,
}
func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) {
conn := m.getPublisherConnection(ctx, publisher, streamType)
if conn == nil {
return nil, fmt.Errorf("No %s publisher %s found", streamType, publisher)
if conn := m.getPublisherConnection(publisher, streamType); conn != nil {
// Fast common path: publisher is available locally.
conn.publishersLock.Lock()
id, found := conn.publisherIds[publisher+"|"+streamType]
conn.publishersLock.Unlock()
if !found {
return nil, fmt.Errorf("Unknown publisher %s", publisher)
}
return conn.newSubscriber(ctx, listener, id, publisher, streamType)
}
return conn.newSubscriber(ctx, listener, publisher, streamType)
log.Printf("No %s publisher %s found yet, deferring", streamType, publisher)
ch := make(chan McuSubscriber)
getctx, cancel := context.WithCancel(ctx)
defer cancel()
// Wait for publisher to be created locally.
go func() {
if conn := m.waitForPublisherConnection(getctx, publisher, streamType); conn != nil {
cancel() // Cancel pending RPC calls.
conn.publishersLock.Lock()
id, found := conn.publisherIds[publisher+"|"+streamType]
conn.publishersLock.Unlock()
if !found {
log.Printf("Unknown id for local %s publisher %s", streamType, publisher)
return
}
subscriber, err := conn.newSubscriber(ctx, listener, id, publisher, streamType)
if subscriber != nil {
ch <- subscriber
} else if err != nil {
log.Printf("Error creating local subscriber for %s publisher %s: %s", streamType, publisher, err)
}
}
}()
// Wait for publisher to be created on one of the other servers in the cluster.
if clients := m.rpcClients.GetClients(); len(clients) > 0 {
for _, client := range clients {
go func(client *GrpcClient) {
id, url, ip, err := client.GetPublisherId(getctx, publisher, streamType)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
log.Printf("Error getting %s publisher id %s from %s: %s", streamType, publisher, client.Target(), err)
return
} else if id == "" {
// Publisher not found on other server
return
}
cancel() // Cancel pending RPC calls.
log.Printf("Found publisher id %s through %s on proxy %s", id, client.Target(), url)
m.connectionsMu.RLock()
connections := m.connections
m.connectionsMu.RUnlock()
for _, conn := range connections {
if conn.rawUrl != url || !ip.Equal(conn.ip) {
continue
}
// Simple case, signaling server has a connection to the same endpoint
subscriber, err := conn.newSubscriber(ctx, listener, id, publisher, streamType)
if err != nil {
log.Printf("Could not create subscriber for %s publisher %s: %s", streamType, publisher, err)
return
}
ch <- subscriber
return
}
// TODO: Create temporary connection to new proxy and tear down when last subscriber left.
log.Printf("Not implemented yet: need new connection to %s (%s) for %s publisher %s (%s)", url, ip, streamType, publisher, id)
}(client)
}
}
select {
case subscriber := <-ch:
return subscriber, nil
case <-ctx.Done():
return nil, fmt.Errorf("No %s publisher %s found", streamType, publisher)
}
}

View file

@ -22,6 +22,7 @@
package signaling
import (
"context"
"fmt"
)
@ -34,4 +35,5 @@ type RoomSessions interface {
DeleteRoomSession(session Session)
GetSessionId(roomSessionId string) (string, error)
LookupSessionId(ctx context.Context, roomSessionId string) (string, error)
}

View file

@ -22,19 +22,27 @@
package signaling
import (
"context"
"errors"
"log"
"sync"
"sync/atomic"
)
type BuiltinRoomSessions struct {
sessionIdToRoomSession map[string]string
roomSessionToSessionid map[string]string
mu sync.RWMutex
clients *GrpcClients
}
func NewBuiltinRoomSessions() (RoomSessions, error) {
func NewBuiltinRoomSessions(clients *GrpcClients) (RoomSessions, error) {
return &BuiltinRoomSessions{
sessionIdToRoomSession: make(map[string]string),
roomSessionToSessionid: make(map[string]string),
clients: clients,
}, nil
}
@ -78,3 +86,53 @@ func (r *BuiltinRoomSessions) GetSessionId(roomSessionId string) (string, error)
return sid, nil
}
func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) {
sid, err := r.GetSessionId(roomSessionId)
if err == nil {
return sid, nil
}
if r.clients == nil {
return "", ErrNoSuchRoomSession
}
clients := r.clients.GetClients()
if len(clients) == 0 {
return "", ErrNoSuchRoomSession
}
lookupctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
var result atomic.Value
for _, client := range clients {
wg.Add(1)
go func(client *GrpcClient) {
defer wg.Done()
sid, err := client.LookupSessionId(lookupctx, roomSessionId)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
log.Printf("Received error while checking for room session id %s on %s: %s", roomSessionId, client.Target(), err)
return
} else if sid == "" {
log.Printf("Received empty session id for room session id %s from %s", roomSessionId, client.Target())
return
}
cancel() // Cancel pending RPC calls.
result.Store(sid)
}(client)
}
wg.Wait()
value := result.Load()
if value == nil {
return "", ErrNoSuchRoomSession
}
return value.(string), nil
}

View file

@ -26,7 +26,7 @@ import (
)
func TestBuiltinRoomSessions(t *testing.T) {
sessions, err := NewBuiltinRoomSessions()
sessions, err := NewBuiltinRoomSessions(nil)
if err != nil {
t.Fatal(err)
}

View file

@ -234,3 +234,11 @@ connectionsperhost = 8
#clientkey = /path/to/etcd-client.key
#clientcert = /path/to/etcd-client.crt
#cacert = /path/to/etcd-ca.crt
[grpc]
# IP and port to listen on for GRPC requests.
# Comment line to disable the listener.
#listen = 0.0.0.0:9090
# Comma-separated list of GRPC targets to connect to for clustering mode.
#targets = 192.168.0.1:9090, 192.168.0.1:9091

View file

@ -164,8 +164,14 @@ func main() {
}
}()
rpcClients, err := signaling.NewGrpcClients(config)
if err != nil {
log.Fatalf("Could not create RPC clients: %s", err)
}
defer rpcClients.Close()
r := mux.NewRouter()
hub, err := signaling.NewHub(config, events, r, version)
hub, err := signaling.NewHub(config, events, rpcClients, r, version)
if err != nil {
log.Fatal("Could not create hub: ", err)
}
@ -192,7 +198,7 @@ func main() {
signaling.UnregisterProxyMcuStats()
signaling.RegisterJanusMcuStats()
case signaling.McuTypeProxy:
mcu, err = signaling.NewMcuProxy(config, etcdClient)
mcu, err = signaling.NewMcuProxy(config, etcdClient, rpcClients)
signaling.UnregisterJanusMcuStats()
signaling.RegisterProxyMcuStats()
default: