diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 226720f..18f9840 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -45,6 +45,7 @@ jobs: - name: Install dependencies run: | + sudo apt -y update && sudo apt -y install protobuf-compiler make common - name: lint diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f62f95e..e61ebec 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -50,6 +50,10 @@ jobs: path: ${{ steps.go-cache-paths.outputs.go-mod }} key: ${{ runner.os }}-${{ steps.go-cache-paths.outputs.go-version }}-mod-${{ hashFiles('**/go.mod', '**/go.sum') }} + - name: Install dependencies + run: | + sudo apt -y update && sudo apt -y install protobuf-compiler + - name: Build applications run: | echo "Building with $(nproc) threads" diff --git a/.gitignore b/.gitignore index 7296465..9a623dc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ vendor/ *_easyjson.go *.pem +*.pb.go *.prof *.socket *.tar.gz diff --git a/Dockerfile b/Dockerfile index 1418601..3887ba5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,7 @@ FROM golang:1.18 AS builder WORKDIR /workdir COPY . . +RUN apt -y update && apt -y install protobuf-compiler RUN make build FROM alpine:3.15 diff --git a/Makefile b/Makefile index e1ca9ea..8cd998f 100644 --- a/Makefile +++ b/Makefile @@ -56,6 +56,14 @@ $(GOPATHBIN)/easyjson: $(GO) get -u -d github.com/mailru/easyjson/... $(GO) install github.com/mailru/easyjson/... +$(GOPATHBIN)/protoc-gen-go: + $(GO) get -u -d google.golang.org/protobuf/cmd/protoc-gen-go + $(GO) install google.golang.org/protobuf/cmd/protoc-gen-go + +$(GOPATHBIN)/protoc-gen-go-grpc: + $(GO) get -u -d google.golang.org/grpc/cmd/protoc-gen-go-grpc + $(GO) install google.golang.org/grpc/cmd/protoc-gen-go-grpc + continentmap.go: $(CURDIR)/scripts/get_continent_map.py $@ @@ -70,7 +78,7 @@ check-continentmap: get: $(GO) get $(PACKAGE) -fmt: hook +fmt: hook | common_proto $(GOFMT) -s -w *.go client proxy server vet: common @@ -83,23 +91,36 @@ cover: vet common rm -f cover.out && \ $(GO) test -v -timeout $(TIMEOUT) -coverprofile cover.out $(ALL_PACKAGES) && \ sed -i "/_easyjson/d" cover.out && \ + sed -i "/\.pb\.go/d" cover.out && \ $(GO) tool cover -func=cover.out coverhtml: vet common rm -f cover.out && \ $(GO) test -v -timeout $(TIMEOUT) -coverprofile cover.out $(ALL_PACKAGES) && \ sed -i "/_easyjson/d" cover.out && \ + sed -i "/\.pb\.go/d" cover.out && \ $(GO) tool cover -html=cover.out -o coverage.html -%_easyjson.go: %.go $(GOPATHBIN)/easyjson +%_easyjson.go: %.go $(GOPATHBIN)/easyjson | common_proto PATH="$(GODIR)":$(PATH) "$(GOPATHBIN)/easyjson" -all $*.go -common: \ +%.pb.go: %.proto $(GOPATHBIN)/protoc-gen-go $(GOPATHBIN)/protoc-gen-go-grpc + PATH="$(GODIR)":"$(GOPATHBIN)":$(PATH) protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + $*.proto + +common: common_easyjson common_proto + +common_easyjson: \ api_async_easyjson.go \ api_backend_easyjson.go \ api_proxy_easyjson.go \ api_signaling_easyjson.go +common_proto: \ + grpc_mcu.pb.go \ + grpc_sessions.pb.go + $(BINDIR): mkdir -p $(BINDIR) @@ -115,6 +136,7 @@ proxy: common $(BINDIR) clean: rm -f *_easyjson.go rm -f easyjson-bootstrap*.go + rm -f *.pb.go build: server proxy diff --git a/README.md b/README.md index 2b1afac..f9daabe 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ The following tools are required for building the signaling server. - git - go >= 1.17 - make +- protobuf-compiler >= 3 All other dependencies are fetched automatically while building. diff --git a/backend_server.go b/backend_server.go index 81120d2..9998c33 100644 --- a/backend_server.go +++ b/backend_server.go @@ -22,6 +22,7 @@ package signaling import ( + "context" "crypto/hmac" "crypto/rand" "crypto/sha1" @@ -324,6 +325,8 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso } timeout := time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() var wg sync.WaitGroup for _, sessionid := range sessionids { if sessionid == sessionIdNotInMeeting { @@ -334,7 +337,7 @@ func (b *BackendServer) sendRoomDisinvite(roomid string, backend *Backend, reaso wg.Add(1) go func(sessionid string) { defer wg.Done() - if sid, err := b.lookupByRoomSessionId(sessionid, nil, timeout); err != nil { + if sid, err := b.lookupByRoomSessionId(ctx, sessionid, nil); err != nil { log.Printf("Could not lookup by room session %s: %s", sessionid, err) } else if sid != "" { if err := b.events.PublishSessionMessage(sid, backend, msg); err != nil { @@ -377,7 +380,7 @@ func (b *BackendServer) sendRoomUpdate(roomid string, backend *Backend, notified } } -func (b *BackendServer) lookupByRoomSessionId(roomSessionId string, cache *ConcurrentStringStringMap, timeout time.Duration) (string, error) { +func (b *BackendServer) lookupByRoomSessionId(ctx context.Context, roomSessionId string, cache *ConcurrentStringStringMap) (string, error) { if roomSessionId == sessionIdNotInMeeting { log.Printf("Trying to lookup empty room session id: %s", roomSessionId) return "", nil @@ -389,7 +392,7 @@ func (b *BackendServer) lookupByRoomSessionId(roomSessionId string, cache *Concu } } - sid, err := b.roomSessions.GetSessionId(roomSessionId) + sid, err := b.roomSessions.LookupSessionId(ctx, roomSessionId) if err == ErrNoSuchRoomSession { return "", nil } else if err != nil { @@ -402,7 +405,7 @@ func (b *BackendServer) lookupByRoomSessionId(roomSessionId string, cache *Concu return sid, nil } -func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, users []map[string]interface{}, timeout time.Duration) []map[string]interface{} { +func (b *BackendServer) fixupUserSessions(ctx context.Context, cache *ConcurrentStringStringMap, users []map[string]interface{}) []map[string]interface{} { if len(users) == 0 { return users } @@ -430,7 +433,7 @@ func (b *BackendServer) fixupUserSessions(cache *ConcurrentStringStringMap, user wg.Add(1) go func(roomSessionId string, u map[string]interface{}) { defer wg.Done() - if sessionId, err := b.lookupByRoomSessionId(roomSessionId, cache, timeout); err != nil { + if sessionId, err := b.lookupByRoomSessionId(ctx, roomSessionId, cache); err != nil { log.Printf("Could not lookup by room session %s: %s", roomSessionId, err) delete(u, "sessionId") } else if sessionId != "" { @@ -456,11 +459,13 @@ func (b *BackendServer) sendRoomIncall(roomid string, backend *Backend, request if !request.InCall.All { timeout := time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() var cache ConcurrentStringStringMap // Convert (Nextcloud) session ids to signaling session ids. - request.InCall.Users = b.fixupUserSessions(&cache, request.InCall.Users, timeout) + request.InCall.Users = b.fixupUserSessions(ctx, &cache, request.InCall.Users) // Entries in "Changed" are most likely already fetched through the "Users" list. - request.InCall.Changed = b.fixupUserSessions(&cache, request.InCall.Changed, timeout) + request.InCall.Changed = b.fixupUserSessions(ctx, &cache, request.InCall.Changed) if len(request.InCall.Users) == 0 && len(request.InCall.Changed) == 0 { return nil @@ -478,9 +483,11 @@ func (b *BackendServer) sendRoomParticipantsUpdate(roomid string, backend *Backe timeout := time.Second // Convert (Nextcloud) session ids to signaling session ids. + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() var cache ConcurrentStringStringMap - request.Participants.Users = b.fixupUserSessions(&cache, request.Participants.Users, timeout) - request.Participants.Changed = b.fixupUserSessions(&cache, request.Participants.Changed, timeout) + request.Participants.Users = b.fixupUserSessions(ctx, &cache, request.Participants.Users) + request.Participants.Changed = b.fixupUserSessions(ctx, &cache, request.Participants.Changed) if len(request.Participants.Users) == 0 && len(request.Participants.Changed) == 0 { return nil diff --git a/backend_server_test.go b/backend_server_test.go index e14b740..bc0acfa 100644 --- a/backend_server_test.go +++ b/backend_server_test.go @@ -88,7 +88,7 @@ func CreateBackendServerForTestFromConfig(t *testing.T, config *goconf.ConfigFil config.AddOption("clients", "internalsecret", string(testInternalSecret)) config.AddOption("geoip", "url", "none") events := getAsyncEventsForTest(t) - hub, err := NewHub(config, events, r, "no-version") + hub, err := NewHub(config, events, nil, r, "no-version") if err != nil { t.Fatal(err) } diff --git a/clientsession.go b/clientsession.go index 3d447ef..c958937 100644 --- a/clientsession.go +++ b/clientsession.go @@ -75,6 +75,9 @@ type ClientSession struct { room unsafe.Pointer roomSessionId string + publisherWaitersId uint64 + publisherWaiters map[uint64]chan bool + publishers map[string]McuPublisher subscribers map[string]McuSubscriber @@ -803,6 +806,26 @@ func (s *ClientSession) checkOfferTypeLocked(streamType string, data *MessageCli return 0, nil } +func (s *ClientSession) wakeupPublisherWaiters() { + for _, ch := range s.publisherWaiters { + ch <- true + } +} + +func (s *ClientSession) addPublisherWaiter(ch chan bool) uint64 { + if s.publisherWaiters == nil { + s.publisherWaiters = make(map[uint64]chan bool) + } + id := s.publisherWaitersId + 1 + s.publisherWaitersId = id + s.publisherWaiters[id] = ch + return id +} + +func (s *ClientSession) removePublisherWaiter(id uint64) { + delete(s.publisherWaiters, id) +} + func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, streamType string, data *MessageClientMessageData) (McuPublisher, error) { s.mu.Lock() defer s.mu.Unlock() @@ -851,6 +874,7 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea s.publishers[streamType] = publisher } log.Printf("Publishing %s as %s for session %s", streamType, publisher.Id(), s.PublicId()) + s.wakeupPublisherWaiters() } else { publisher.SetMedia(mediaTypes) } @@ -858,11 +882,44 @@ func (s *ClientSession) GetOrCreatePublisher(ctx context.Context, mcu Mcu, strea return publisher, nil } +func (s *ClientSession) getPublisherLocked(streamType string) McuPublisher { + return s.publishers[streamType] +} + func (s *ClientSession) GetPublisher(streamType string) McuPublisher { s.mu.Lock() defer s.mu.Unlock() - return s.publishers[streamType] + return s.getPublisherLocked(streamType) +} + +func (s *ClientSession) GetOrWaitForPublisher(ctx context.Context, streamType string) McuPublisher { + s.mu.Lock() + defer s.mu.Unlock() + + publisher := s.getPublisherLocked(streamType) + if publisher != nil { + return publisher + } + + ch := make(chan bool, 1) + id := s.addPublisherWaiter(ch) + defer s.removePublisherWaiter(id) + + for { + s.mu.Unlock() + select { + case <-ch: + s.mu.Lock() + publisher := s.getPublisherLocked(streamType) + if publisher != nil { + return publisher + } + case <-ctx.Done(): + s.mu.Lock() + return nil + } + } } func (s *ClientSession) GetOrCreateSubscriber(ctx context.Context, mcu Mcu, id string, streamType string) (McuSubscriber, error) { diff --git a/grpc_client.go b/grpc_client.go new file mode 100644 index 0000000..ce32b89 --- /dev/null +++ b/grpc_client.go @@ -0,0 +1,225 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "log" + "net" + "strings" + "sync" + + "github.com/dlintw/goconf" + "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + status "google.golang.org/grpc/status" +) + +type grpcClientImpl struct { + RpcMcuClient + RpcSessionsClient +} + +func newGrpcClientImpl(conn grpc.ClientConnInterface) *grpcClientImpl { + return &grpcClientImpl{ + RpcMcuClient: NewRpcMcuClient(conn), + RpcSessionsClient: NewRpcSessionsClient(conn), + } +} + +type GrpcClient struct { + conn *grpc.ClientConn + impl *grpcClientImpl +} + +func NewGrpcClient(target string) (*GrpcClient, error) { + conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + + result := &GrpcClient{ + conn: conn, + impl: newGrpcClientImpl(conn), + } + return result, nil +} + +func (c *GrpcClient) Target() string { + return c.conn.Target() +} + +func (c *GrpcClient) Close() error { + return c.conn.Close() +} + +func (c *GrpcClient) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) { + // TODO: Remove debug logging + log.Printf("Lookup room session %s on %s", roomSessionId, c.Target()) + response, err := c.impl.LookupSessionId(ctx, &LookupSessionIdRequest{ + RoomSessionId: roomSessionId, + }, grpc.WaitForReady(true)) + if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { + return "", ErrNoSuchRoomSession + } else if err != nil { + return "", err + } + + sessionId := response.GetSessionId() + if sessionId == "" { + return "", ErrNoSuchRoomSession + } + + return sessionId, nil +} + +func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room *Room) (bool, error) { + // TODO: Remove debug logging + log.Printf("Check if session %s is in call %s on %s", sessionId, room.Id(), c.Target()) + response, err := c.impl.IsSessionInCall(ctx, &IsSessionInCallRequest{ + SessionId: sessionId, + RoomId: room.Id(), + BackendUrl: room.Backend().url, + }, grpc.WaitForReady(true)) + if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { + return false, nil + } else if err != nil { + return false, err + } + + return response.GetInCall(), nil +} + +func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, streamType string) (string, string, net.IP, error) { + // TODO: Remove debug logging + log.Printf("Get %s publisher id %s on %s", streamType, sessionId, c.Target()) + response, err := c.impl.GetPublisherId(ctx, &GetPublisherIdRequest{ + SessionId: sessionId, + StreamType: streamType, + }, grpc.WaitForReady(true)) + if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { + return "", "", nil, nil + } else if err != nil { + return "", "", nil, err + } + + return response.GetPublisherId(), response.GetProxyUrl(), net.ParseIP(response.GetIp()), nil +} + +type GrpcClients struct { + mu sync.RWMutex + + clientsMap map[string]*GrpcClient + clients []*GrpcClient +} + +func NewGrpcClients(config *goconf.ConfigFile) (*GrpcClients, error) { + result := &GrpcClients{} + if err := result.load(config); err != nil { + return nil, err + } + return result, nil +} + +func (c *GrpcClients) load(config *goconf.ConfigFile) error { + c.mu.Lock() + defer c.mu.Unlock() + + targets, _ := config.GetString("grpc", "targets") + + clientsMap := make(map[string]*GrpcClient) + var clients []*GrpcClient + removeTargets := make(map[string]bool, len(c.clientsMap)) + for target, client := range c.clientsMap { + removeTargets[target] = true + clientsMap[target] = client + } + + for _, target := range strings.Split(targets, ",") { + target = strings.TrimSpace(target) + if target == "" { + continue + } + + if client, found := clientsMap[target]; found { + clients = append(clients, client) + delete(removeTargets, target) + continue + } + + client, err := NewGrpcClient(target) + if err != nil { + for target, client := range clientsMap { + if closeerr := client.Close(); closeerr != nil { + log.Printf("Error closing client to %s: %s", target, closeerr) + } + } + return err + } + + log.Printf("Adding %s as GRPC target", target) + clientsMap[target] = client + clients = append(clients, client) + } + + for target := range removeTargets { + if client, found := clientsMap[target]; found { + log.Printf("Deleting GRPC target %s", target) + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", target, err) + } + delete(clientsMap, target) + } + } + + c.clients = clients + c.clientsMap = clientsMap + return nil +} + +func (c *GrpcClients) Reload(config *goconf.ConfigFile) { + if err := c.load(config); err != nil { + log.Printf("Could not reload RPC clients: %s", err) + } +} + +func (c *GrpcClients) Close() { + c.mu.Lock() + defer c.mu.Unlock() + + for target, client := range c.clientsMap { + if err := client.Close(); err != nil { + log.Printf("Error closing client to %s: %s", target, err) + } + } + + c.clients = nil + c.clientsMap = nil +} + +func (c *GrpcClients) GetClients() []*GrpcClient { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.clients +} diff --git a/grpc_mcu.proto b/grpc_mcu.proto new file mode 100644 index 0000000..b2313d2 --- /dev/null +++ b/grpc_mcu.proto @@ -0,0 +1,41 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +syntax = "proto3"; + +option go_package = "github.com/strukturag/nextcloud-spreed-signaling;signaling"; + +package signaling; + +service RpcMcu { + rpc GetPublisherId(GetPublisherIdRequest) returns (GetPublisherIdReply) {} +} + +message GetPublisherIdRequest { + string sessionId = 1; + string streamType = 2; +} + +message GetPublisherIdReply { + string publisherId = 1; + string proxyUrl = 2; + string ip = 3; +} diff --git a/grpc_server.go b/grpc_server.go new file mode 100644 index 0000000..7d4102f --- /dev/null +++ b/grpc_server.go @@ -0,0 +1,141 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "errors" + "fmt" + "log" + "net" + + "github.com/dlintw/goconf" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +type GrpcServer struct { + UnimplementedRpcMcuServer + UnimplementedRpcSessionsServer + + conn *grpc.Server + listener net.Listener + + hub *Hub +} + +func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) { + var listener net.Listener + if addr, _ := config.GetString("grpc", "listen"); addr != "" { + var err error + listener, err = net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("could not create GRPC server: %w", err) + } + } + + conn := grpc.NewServer() + + result := &GrpcServer{ + conn: conn, + listener: listener, + } + RegisterRpcSessionsServer(conn, result) + RegisterRpcMcuServer(conn, result) + return result, nil +} + +func (s *GrpcServer) Run() error { + if s.listener == nil { + return nil + } + + return s.conn.Serve(s.listener) +} + +func (s *GrpcServer) Close() { + s.conn.GracefulStop() +} + +func (s *GrpcServer) LookupSessionId(ctx context.Context, request *LookupSessionIdRequest) (*LookupSessionIdReply, error) { + // TODO: Remove debug logging + log.Printf("Lookup session id for room session id %s", request.RoomSessionId) + sid, err := s.hub.roomSessions.GetSessionId(request.RoomSessionId) + if errors.Is(err, ErrNoSuchRoomSession) { + return nil, status.Error(codes.NotFound, "no such room session id") + } else if err != nil { + return nil, err + } + + return &LookupSessionIdReply{ + SessionId: sid, + }, nil +} + +func (s *GrpcServer) IsSessionInCall(ctx context.Context, request *IsSessionInCallRequest) (*IsSessionInCallReply, error) { + // TODO: Remove debug logging + log.Printf("Check if session %s is in call %s on %s", request.SessionId, request.RoomId, request.BackendUrl) + session := s.hub.GetSessionByPublicId(request.SessionId) + if session == nil { + return nil, status.Error(codes.NotFound, "no such session id") + } + + result := &IsSessionInCallReply{} + room := session.GetRoom() + if room == nil || room.Id() != request.GetRoomId() || room.Backend().url != request.GetBackendUrl() || + (session.ClientType() != HelloClientTypeInternal && !room.IsSessionInCall(session)) { + // Recipient is not in a room, a different room or not in the call. + result.InCall = false + } else { + result.InCall = true + } + return result, nil +} + +func (s *GrpcServer) GetPublisherId(ctx context.Context, request *GetPublisherIdRequest) (*GetPublisherIdReply, error) { + // TODO: Remove debug logging + log.Printf("Get %s publisher id for session %s", request.StreamType, request.SessionId) + session := s.hub.GetSessionByPublicId(request.SessionId) + if session == nil { + return nil, status.Error(codes.NotFound, "no such session") + } + + clientSession, ok := session.(*ClientSession) + if !ok { + return nil, status.Error(codes.NotFound, "no such session") + } + + publisher := clientSession.GetOrWaitForPublisher(ctx, request.StreamType) + if publisher, ok := publisher.(*mcuProxyPublisher); ok { + reply := &GetPublisherIdReply{ + PublisherId: publisher.Id(), + ProxyUrl: publisher.conn.rawUrl, + } + if ip := publisher.conn.ip; ip != nil { + reply.Ip = ip.String() + } + return reply, nil + } + + return nil, status.Error(codes.NotFound, "no such publisher") +} diff --git a/grpc_sessions.proto b/grpc_sessions.proto new file mode 100644 index 0000000..497a74f --- /dev/null +++ b/grpc_sessions.proto @@ -0,0 +1,49 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +syntax = "proto3"; + +option go_package = "github.com/strukturag/nextcloud-spreed-signaling;signaling"; + +package signaling; + +service RpcSessions { + rpc LookupSessionId(LookupSessionIdRequest) returns (LookupSessionIdReply) {} + rpc IsSessionInCall(IsSessionInCallRequest) returns (IsSessionInCallReply) {} +} + +message LookupSessionIdRequest { + string roomSessionId = 1; +} + +message LookupSessionIdReply { + string sessionId = 1; +} + +message IsSessionInCallRequest { + string sessionId = 1; + string roomId = 2; + string backendUrl = 3; +} + +message IsSessionInCallReply { + bool inCall = 1; +} diff --git a/hub.go b/hub.go index 403e38e..97b2a03 100644 --- a/hub.go +++ b/hub.go @@ -28,6 +28,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "errors" "fmt" "hash/fnv" "log" @@ -148,9 +149,12 @@ type Hub struct { geoip *GeoLookup geoipOverrides map[*net.IPNet]string geoipUpdating int32 + + rpcServer *GrpcServer + rpcClients *GrpcClients } -func NewHub(config *goconf.ConfigFile, events AsyncEvents, r *mux.Router, version string) (*Hub, error) { +func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcClients *GrpcClients, r *mux.Router, version string) (*Hub, error) { hashKey, _ := config.GetString("sessions", "hashkey") switch len(hashKey) { case 32: @@ -210,7 +214,12 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, r *mux.Router, versio decodeCaches = append(decodeCaches, NewLruCache(decodeCacheSize)) } - roomSessions, err := NewBuiltinRoomSessions() + rpcServer, err := NewGrpcServer(config) + if err != nil { + return nil, err + } + + roomSessions, err := NewBuiltinRoomSessions(rpcClients) if err != nil { return nil, err } @@ -338,8 +347,12 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, r *mux.Router, versio geoip: geoip, geoipOverrides: geoipOverrides, + + rpcServer: rpcServer, + rpcClients: rpcClients, } backend.hub = hub + rpcServer.hub = hub hub.upgrader.CheckOrigin = hub.checkOrigin r.HandleFunc("/spreed", func(w http.ResponseWriter, r *http.Request) { hub.serveWs(w, r) @@ -437,6 +450,12 @@ func (h *Hub) Run() { go h.updateGeoDatabase() h.roomPing.Start() defer h.roomPing.Stop() + go func() { + if err := h.rpcServer.Run(); err != nil { + log.Fatalf("Could not start RPC server: %s", err) + } + }() + defer h.rpcServer.Close() housekeeping := time.NewTicker(housekeepingInterval) geoipUpdater := time.NewTicker(24 * time.Hour) @@ -480,6 +499,7 @@ func (h *Hub) Reload(config *goconf.ConfigFile) { h.mcu.Reload(config) } h.backend.Reload(config) + h.rpcClients.Reload(config) } func reverseSessionId(s string) (string, error) { @@ -575,6 +595,10 @@ func (h *Hub) GetSessionByPublicId(sessionId string) Session { h.mu.Lock() session := h.sessions[data.Sid] h.mu.Unlock() + if session != nil && session.PublicId() != sessionId { + // Session was created on different server. + return nil + } return session } @@ -1726,7 +1750,41 @@ func sendMcuProcessingFailed(session *ClientSession, message *ClientMessage) { session.SendMessage(response) } -func (h *Hub) isInSameCall(senderSession *ClientSession, recipientSessionId string) bool { +func (h *Hub) isInSameCallRemote(ctx context.Context, senderSession *ClientSession, senderRoom *Room, recipientSessionId string) bool { + clients := h.rpcClients.GetClients() + if len(clients) == 0 { + return false + } + + var result int32 + var wg sync.WaitGroup + rpcCtx, cancel := context.WithCancel(ctx) + defer cancel() + for _, client := range clients { + wg.Add(1) + go func(client *GrpcClient) { + defer wg.Done() + + inCall, err := client.IsSessionInCall(rpcCtx, recipientSessionId, senderRoom) + if errors.Is(err, context.Canceled) { + return + } else if err != nil { + log.Printf("Error checking session %s in call on %s: %s", recipientSessionId, client.Target(), err) + return + } else if !inCall { + return + } + + cancel() + atomic.StoreInt32(&result, 1) + }(client) + } + wg.Wait() + + return atomic.LoadInt32(&result) != 0 +} + +func (h *Hub) isInSameCall(ctx context.Context, senderSession *ClientSession, recipientSessionId string) bool { if senderSession.ClientType() == HelloClientTypeInternal { // Internal clients may subscribe all streams. return true @@ -1741,7 +1799,7 @@ func (h *Hub) isInSameCall(senderSession *ClientSession, recipientSessionId stri recipientSession := h.GetSessionByPublicId(recipientSessionId) if recipientSession == nil { // Recipient session does not exist. - return false + return h.isInSameCallRemote(ctx, senderSession, senderRoom, recipientSessionId) } recipientRoom := recipientSession.GetRoom() @@ -1770,7 +1828,7 @@ func (h *Hub) processMcuMessage(senderSession *ClientSession, session *ClientSes // A user is only allowed to subscribe a stream if she is in the same room // as the other user and both have their "inCall" flag set. - if !h.allowSubscribeAnyStream && !h.isInSameCall(senderSession, message.Recipient.SessionId) { + if !h.allowSubscribeAnyStream && !h.isInSameCall(ctx, senderSession, message.Recipient.SessionId) { log.Printf("Session %s is not in the same call as session %s, not requesting offer", session.PublicId(), message.Recipient.SessionId) sendNotAllowed(senderSession, client_message, "Not allowed to request offer.") return diff --git a/hub_test.go b/hub_test.go index 39f9a50..8dc8b95 100644 --- a/hub_test.go +++ b/hub_test.go @@ -111,7 +111,7 @@ func CreateHubForTestWithConfig(t *testing.T, getConfigFunc func(*httptest.Serve if err != nil { t.Fatal(err) } - h, err := NewHub(config, events, r, "no-version") + h, err := NewHub(config, events, nil, r, "no-version") if err != nil { t.Fatal(err) } diff --git a/mcu_proxy.go b/mcu_proxy.go index d395c82..1252351 100644 --- a/mcu_proxy.go +++ b/mcu_proxy.go @@ -26,6 +26,7 @@ import ( "crypto/rsa" "crypto/tls" "encoding/json" + "errors" "fmt" "log" "net" @@ -1006,20 +1007,13 @@ func (c *mcuProxyConnection) newPublisher(ctx context.Context, listener McuListe return publisher, nil } -func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) { - c.publishersLock.Lock() - id, found := c.publisherIds[publisher+"|"+streamType] - c.publishersLock.Unlock() - if !found { - return nil, fmt.Errorf("Unknown publisher %s", publisher) - } - +func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuListener, publisherId string, publisherSessionId string, streamType string) (McuSubscriber, error) { msg := &ProxyClientMessage{ Type: "command", Command: &CommandProxyClientMessage{ Type: "create-subscriber", StreamType: streamType, - PublisherId: id, + PublisherId: publisherId, }, } @@ -1030,8 +1024,8 @@ func (c *mcuProxyConnection) newSubscriber(ctx context.Context, listener McuList } proxyId := response.Command.Id - log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisher) - subscriber := newMcuProxySubscriber(publisher, response.Command.Sid, streamType, proxyId, c, listener) + log.Printf("Created %s subscriber %s on %s for %s", streamType, proxyId, c, publisherSessionId) + subscriber := newMcuProxySubscriber(publisherSessionId, response.Command.Sid, streamType, proxyId, c, listener) c.subscribersLock.Lock() c.subscribers[proxyId] = subscriber c.subscribersLock.Unlock() @@ -1075,9 +1069,11 @@ type mcuProxy struct { publisherWaiters map[uint64]chan bool continentsMap atomic.Value + + rpcClients *GrpcClients } -func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient) (Mcu, error) { +func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient, rpcClients *GrpcClients) (Mcu, error) { urlType, _ := config.GetString("mcu", "urltype") if urlType == "" { urlType = proxyUrlTypeStatic @@ -1139,6 +1135,8 @@ func NewMcuProxy(config *goconf.ConfigFile, etcdClient *EtcdClient) (Mcu, error) publishers: make(map[string]*mcuProxyConnection), publisherWaiters: make(map[uint64]chan bool), + + rpcClients: rpcClients, } if err := mcu.loadContinentsMap(config); err != nil { @@ -1850,19 +1848,18 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st return nil, fmt.Errorf("No MCU connection available") } -func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string, streamType string) *mcuProxyConnection { +func (m *mcuProxy) getPublisherConnection(publisher string, streamType string) *mcuProxyConnection { m.mu.RLock() - conn := m.publishers[publisher+"|"+streamType] - m.mu.RUnlock() - if conn != nil { - return conn - } + defer m.mu.RUnlock() - log.Printf("No %s publisher %s found yet, deferring", streamType, publisher) + return m.publishers[publisher+"|"+streamType] +} + +func (m *mcuProxy) waitForPublisherConnection(ctx context.Context, publisher string, streamType string) *mcuProxyConnection { m.mu.Lock() defer m.mu.Unlock() - conn = m.publishers[publisher+"|"+streamType] + conn := m.publishers[publisher+"|"+streamType] if conn != nil { // Publisher was created while waiting for lock. return conn @@ -1890,10 +1887,92 @@ func (m *mcuProxy) getPublisherConnection(ctx context.Context, publisher string, } func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType string) (McuSubscriber, error) { - conn := m.getPublisherConnection(ctx, publisher, streamType) - if conn == nil { - return nil, fmt.Errorf("No %s publisher %s found", streamType, publisher) + if conn := m.getPublisherConnection(publisher, streamType); conn != nil { + // Fast common path: publisher is available locally. + conn.publishersLock.Lock() + id, found := conn.publisherIds[publisher+"|"+streamType] + conn.publishersLock.Unlock() + if !found { + return nil, fmt.Errorf("Unknown publisher %s", publisher) + } + + return conn.newSubscriber(ctx, listener, id, publisher, streamType) } - return conn.newSubscriber(ctx, listener, publisher, streamType) + log.Printf("No %s publisher %s found yet, deferring", streamType, publisher) + ch := make(chan McuSubscriber) + getctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Wait for publisher to be created locally. + go func() { + if conn := m.waitForPublisherConnection(getctx, publisher, streamType); conn != nil { + cancel() // Cancel pending RPC calls. + + conn.publishersLock.Lock() + id, found := conn.publisherIds[publisher+"|"+streamType] + conn.publishersLock.Unlock() + if !found { + log.Printf("Unknown id for local %s publisher %s", streamType, publisher) + return + } + + subscriber, err := conn.newSubscriber(ctx, listener, id, publisher, streamType) + if subscriber != nil { + ch <- subscriber + } else if err != nil { + log.Printf("Error creating local subscriber for %s publisher %s: %s", streamType, publisher, err) + } + } + }() + + // Wait for publisher to be created on one of the other servers in the cluster. + if clients := m.rpcClients.GetClients(); len(clients) > 0 { + for _, client := range clients { + go func(client *GrpcClient) { + id, url, ip, err := client.GetPublisherId(getctx, publisher, streamType) + if errors.Is(err, context.Canceled) { + return + } else if err != nil { + log.Printf("Error getting %s publisher id %s from %s: %s", streamType, publisher, client.Target(), err) + return + } else if id == "" { + // Publisher not found on other server + return + } + + cancel() // Cancel pending RPC calls. + log.Printf("Found publisher id %s through %s on proxy %s", id, client.Target(), url) + + m.connectionsMu.RLock() + connections := m.connections + m.connectionsMu.RUnlock() + for _, conn := range connections { + if conn.rawUrl != url || !ip.Equal(conn.ip) { + continue + } + + // Simple case, signaling server has a connection to the same endpoint + subscriber, err := conn.newSubscriber(ctx, listener, id, publisher, streamType) + if err != nil { + log.Printf("Could not create subscriber for %s publisher %s: %s", streamType, publisher, err) + return + } + + ch <- subscriber + return + } + + // TODO: Create temporary connection to new proxy and tear down when last subscriber left. + log.Printf("Not implemented yet: need new connection to %s (%s) for %s publisher %s (%s)", url, ip, streamType, publisher, id) + }(client) + } + } + + select { + case subscriber := <-ch: + return subscriber, nil + case <-ctx.Done(): + return nil, fmt.Errorf("No %s publisher %s found", streamType, publisher) + } } diff --git a/roomsessions.go b/roomsessions.go index 2bf15bc..cb2c1f1 100644 --- a/roomsessions.go +++ b/roomsessions.go @@ -22,6 +22,7 @@ package signaling import ( + "context" "fmt" ) @@ -34,4 +35,5 @@ type RoomSessions interface { DeleteRoomSession(session Session) GetSessionId(roomSessionId string) (string, error) + LookupSessionId(ctx context.Context, roomSessionId string) (string, error) } diff --git a/roomsessions_builtin.go b/roomsessions_builtin.go index 7b65947..3c85984 100644 --- a/roomsessions_builtin.go +++ b/roomsessions_builtin.go @@ -22,19 +22,27 @@ package signaling import ( + "context" + "errors" + "log" "sync" + "sync/atomic" ) type BuiltinRoomSessions struct { sessionIdToRoomSession map[string]string roomSessionToSessionid map[string]string mu sync.RWMutex + + clients *GrpcClients } -func NewBuiltinRoomSessions() (RoomSessions, error) { +func NewBuiltinRoomSessions(clients *GrpcClients) (RoomSessions, error) { return &BuiltinRoomSessions{ sessionIdToRoomSession: make(map[string]string), roomSessionToSessionid: make(map[string]string), + + clients: clients, }, nil } @@ -78,3 +86,53 @@ func (r *BuiltinRoomSessions) GetSessionId(roomSessionId string) (string, error) return sid, nil } + +func (r *BuiltinRoomSessions) LookupSessionId(ctx context.Context, roomSessionId string) (string, error) { + sid, err := r.GetSessionId(roomSessionId) + if err == nil { + return sid, nil + } + + if r.clients == nil { + return "", ErrNoSuchRoomSession + } + + clients := r.clients.GetClients() + if len(clients) == 0 { + return "", ErrNoSuchRoomSession + } + + lookupctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + var result atomic.Value + for _, client := range clients { + wg.Add(1) + go func(client *GrpcClient) { + defer wg.Done() + + sid, err := client.LookupSessionId(lookupctx, roomSessionId) + if errors.Is(err, context.Canceled) { + return + } else if err != nil { + log.Printf("Received error while checking for room session id %s on %s: %s", roomSessionId, client.Target(), err) + return + } else if sid == "" { + log.Printf("Received empty session id for room session id %s from %s", roomSessionId, client.Target()) + return + } + + cancel() // Cancel pending RPC calls. + result.Store(sid) + }(client) + } + wg.Wait() + + value := result.Load() + if value == nil { + return "", ErrNoSuchRoomSession + } + + return value.(string), nil +} diff --git a/roomsessions_builtin_test.go b/roomsessions_builtin_test.go index 6f212b1..db1394c 100644 --- a/roomsessions_builtin_test.go +++ b/roomsessions_builtin_test.go @@ -26,7 +26,7 @@ import ( ) func TestBuiltinRoomSessions(t *testing.T) { - sessions, err := NewBuiltinRoomSessions() + sessions, err := NewBuiltinRoomSessions(nil) if err != nil { t.Fatal(err) } diff --git a/server.conf.in b/server.conf.in index 7542a55..acf3bed 100644 --- a/server.conf.in +++ b/server.conf.in @@ -234,3 +234,11 @@ connectionsperhost = 8 #clientkey = /path/to/etcd-client.key #clientcert = /path/to/etcd-client.crt #cacert = /path/to/etcd-ca.crt + +[grpc] +# IP and port to listen on for GRPC requests. +# Comment line to disable the listener. +#listen = 0.0.0.0:9090 + +# Comma-separated list of GRPC targets to connect to for clustering mode. +#targets = 192.168.0.1:9090, 192.168.0.1:9091 diff --git a/server/main.go b/server/main.go index 46fa2b6..ec9d84f 100644 --- a/server/main.go +++ b/server/main.go @@ -164,8 +164,14 @@ func main() { } }() + rpcClients, err := signaling.NewGrpcClients(config) + if err != nil { + log.Fatalf("Could not create RPC clients: %s", err) + } + defer rpcClients.Close() + r := mux.NewRouter() - hub, err := signaling.NewHub(config, events, r, version) + hub, err := signaling.NewHub(config, events, rpcClients, r, version) if err != nil { log.Fatal("Could not create hub: ", err) } @@ -192,7 +198,7 @@ func main() { signaling.UnregisterProxyMcuStats() signaling.RegisterJanusMcuStats() case signaling.McuTypeProxy: - mcu, err = signaling.NewMcuProxy(config, etcdClient) + mcu, err = signaling.NewMcuProxy(config, etcdClient, rpcClients) signaling.UnregisterJanusMcuStats() signaling.RegisterProxyMcuStats() default: