From 800ece1d90095c044603a9ea9c87466e1a5ba3ab Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 12 May 2022 12:13:08 +0200 Subject: [PATCH] Support combining ping requests of different rooms on the same backend. --- hub.go | 10 ++ hub_test.go | 39 ++++++++ room.go | 4 +- room_ping.go | 228 ++++++++++++++++++++++++++++++++++++++++++ room_ping_test.go | 249 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 527 insertions(+), 3 deletions(-) create mode 100644 room_ping.go create mode 100644 room_ping_test.go diff --git a/hub.go b/hub.go index a547200..c7e131c 100644 --- a/hub.go +++ b/hub.go @@ -127,6 +127,7 @@ type Hub struct { rooms map[string]*Room roomSessions RoomSessions + roomPing *RoomPing virtualSessions map[string]uint64 decodeCaches []*LruCache @@ -214,6 +215,11 @@ func NewHub(config *goconf.ConfigFile, nats NatsClient, r *mux.Router, version s return nil, err } + roomPing, err := NewRoomPing(backend, backend.capabilities) + if err != nil { + return nil, err + } + geoipUrl, _ := config.GetString("geoip", "url") if geoipUrl == "default" || geoipUrl == "none" { geoipUrl = "" @@ -313,6 +319,7 @@ func NewHub(config *goconf.ConfigFile, nats NatsClient, r *mux.Router, version s rooms: make(map[string]*Room), roomSessions: roomSessions, + roomPing: roomPing, virtualSessions: make(map[string]uint64), decodeCaches: decodeCaches, @@ -428,6 +435,8 @@ func (h *Hub) updateGeoDatabase() { func (h *Hub) Run() { go h.updateGeoDatabase() + h.roomPing.Start() + defer h.roomPing.Stop() housekeeping := time.NewTicker(housekeepingInterval) geoipUpdater := time.NewTicker(24 * time.Hour) @@ -1125,6 +1134,7 @@ func (h *Hub) removeRoom(room *Room) { statsHubRoomsCurrent.WithLabelValues(room.Backend().Id()).Dec() } h.ru.Unlock() + h.roomPing.DeleteRoom(room) } func (h *Hub) createRoom(id string, properties *json.RawMessage, backend *Backend) (*Room, error) { diff --git a/hub_test.go b/hub_test.go index a4e2e01..24e47f7 100644 --- a/hub_test.go +++ b/hub_test.go @@ -323,6 +323,32 @@ func processSessionRequest(t *testing.T, w http.ResponseWriter, r *http.Request, return response } +var pingRequests map[*testing.T][]*BackendClientRequest + +func getPingRequests(t *testing.T) []*BackendClientRequest { + return pingRequests[t] +} + +func clearPingRequests(t *testing.T) { + delete(pingRequests, t) +} + +func storePingRequest(t *testing.T, request *BackendClientRequest) { + if entries, found := pingRequests[t]; !found { + if pingRequests == nil { + pingRequests = make(map[*testing.T][]*BackendClientRequest) + } + pingRequests[t] = []*BackendClientRequest{ + request, + } + t.Cleanup(func() { + clearPingRequests(t) + }) + } else { + pingRequests[t] = append(entries, request) + } +} + func processPingRequest(t *testing.T, w http.ResponseWriter, r *http.Request, request *BackendClientRequest) *BackendClientResponse { if request.Type != "ping" || request.Ping == nil { t.Fatalf("Expected an ping backend request, got %+v", request) @@ -338,6 +364,8 @@ func processPingRequest(t *testing.T, w http.ResponseWriter, r *http.Request, re } } + storePingRequest(t, request) + response := &BackendClientResponse{ Type: "ping", Ping: &BackendClientRingResponse{ @@ -382,6 +410,16 @@ func registerBackendHandlerUrl(t *testing.T, router *mux.Router, url string) { if strings.Contains(t.Name(), "V3Api") { features = append(features, "signaling-v3") } + signaling := map[string]interface{}{ + "foo": "bar", + "baz": 42, + } + config := map[string]interface{}{ + "signaling": signaling, + } + if strings.Contains(t.Name(), "MultiRoom") { + signaling[ConfigKeySessionPingLimit] = 2 + } response := &CapabilitiesResponse{ Version: CapabilitiesVersion{ Major: 20, @@ -389,6 +427,7 @@ func registerBackendHandlerUrl(t *testing.T, router *mux.Router, url string) { Capabilities: map[string]map[string]interface{}{ "spreed": { "features": features, + "config": config, }, }, } diff --git a/room.go b/room.go index db00b14..1649123 100644 --- a/room.go +++ b/room.go @@ -829,9 +829,7 @@ func (r *Room) publishActiveSessions() (int, *sync.WaitGroup) { ctx, cancel := context.WithTimeout(context.Background(), r.hub.backendTimeout) defer cancel() - request := NewBackendClientPingRequest(r.id, entries) - var response BackendClientResponse - if err := r.hub.backend.PerformJSONRequest(ctx, url, request, &response); err != nil { + if err := r.hub.roomPing.SendPings(ctx, r, url, entries); err != nil { log.Printf("Error pinging room %s for active entries %+v: %s", r.id, entries, err) } }(urls[u], e) diff --git a/room_ping.go b/room_ping.go new file mode 100644 index 0000000..d3cf231 --- /dev/null +++ b/room_ping.go @@ -0,0 +1,228 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "log" + "net/url" + "sync" + "time" +) + +const ( + ConfigGroupSignaling = "signaling" + ConfigKeySessionPingLimit = "session-ping-limit" +) + +type pingEntries struct { + url *url.URL + + entries map[*Room][]BackendPingEntry +} + +func newPingEntries(url *url.URL, room *Room, entries []BackendPingEntry) *pingEntries { + return &pingEntries{ + url: url, + entries: map[*Room][]BackendPingEntry{ + room: entries, + }, + } +} + +func (e *pingEntries) Add(room *Room, entries []BackendPingEntry) { + if existing, found := e.entries[room]; found { + e.entries[room] = append(existing, entries...) + } else { + e.entries[room] = entries + } +} + +func (e *pingEntries) RemoveRoom(room *Room) { + delete(e.entries, room) +} + +// RoomPing sends ping requests for active sessions in rooms. It evaluates the +// capabilities of the Nextcloud server to determine if sessions from different +// rooms can be grouped together. +// +// For that, all ping requests across rooms of enabled instances are combined +// and sent out batched every "updateActiveSessionsInterval" seconds. +type RoomPing struct { + mu sync.Mutex + closeChan chan bool + + backend *BackendClient + capabilities *Capabilities + + entries map[string]*pingEntries +} + +func NewRoomPing(backend *BackendClient, capabilities *Capabilities) (*RoomPing, error) { + result := &RoomPing{ + closeChan: make(chan bool, 1), + backend: backend, + capabilities: capabilities, + } + + return result, nil +} + +func (p *RoomPing) Start() { + go p.run() +} + +func (p *RoomPing) Stop() { + select { + case p.closeChan <- true: + default: + } +} + +func (p *RoomPing) run() { + ticker := time.NewTicker(updateActiveSessionsInterval) +loop: + for { + select { + case <-p.closeChan: + break loop + case <-ticker.C: + p.publishActiveSessions() + } + } +} + +func (p *RoomPing) getAndClearEntries() map[string]*pingEntries { + p.mu.Lock() + defer p.mu.Unlock() + + entries := p.entries + p.entries = nil + return entries +} + +func (p *RoomPing) publishEntries(entries *pingEntries, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + limit, found := p.capabilities.GetIntegerConfig(ctx, entries.url, ConfigGroupSignaling, ConfigKeySessionPingLimit) + if !found || limit <= 0 { + // Limit disabled while waiting for the next iteration, fallback to sending + // one request per room. + for room, e := range entries.entries { + ctx2, cancel2 := context.WithTimeout(context.Background(), timeout) + defer cancel2() + + if err := p.sendPingsDirect(ctx2, room, entries.url, e); err != nil { + log.Printf("Error pinging room %s for active entries %+v: %s", room.Id(), e, err) + } + } + return + } + + var allEntries []BackendPingEntry + for _, e := range entries.entries { + allEntries = append(allEntries, e...) + } + p.sendPingsCombined(entries.url, allEntries, limit, timeout) +} + +func (p *RoomPing) publishActiveSessions() { + var timeout time.Duration + if p.backend.hub != nil { + timeout = p.backend.hub.backendTimeout + } else { + // Running from tests. + timeout = time.Second * time.Duration(defaultBackendTimeoutSeconds) + } + entries := p.getAndClearEntries() + var wg sync.WaitGroup + wg.Add(len(entries)) + for _, e := range entries { + go func(e *pingEntries) { + defer wg.Done() + p.publishEntries(e, timeout) + }(e) + } + wg.Wait() +} + +func (p *RoomPing) sendPingsDirect(ctx context.Context, room *Room, url *url.URL, entries []BackendPingEntry) error { + request := NewBackendClientPingRequest(room.Id(), entries) + var response BackendClientResponse + return p.backend.PerformJSONRequest(ctx, url, request, &response) +} + +func (p *RoomPing) sendPingsCombined(url *url.URL, entries []BackendPingEntry, limit int, timeout time.Duration) { + total := len(entries) + for idx := 0; idx < total; idx += limit { + end := idx + limit + if end > total { + end = total + } + tosend := entries[idx:end] + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + request := NewBackendClientPingRequest("", tosend) + var response BackendClientResponse + if err := p.backend.PerformJSONRequest(ctx, url, request, &response); err != nil { + log.Printf("Error sending combined ping session entries %+v to %s: %s", tosend, url, err) + } + } +} + +func (p *RoomPing) SendPings(ctx context.Context, room *Room, url *url.URL, entries []BackendPingEntry) error { + limit, found := p.capabilities.GetIntegerConfig(ctx, url, ConfigGroupSignaling, ConfigKeySessionPingLimit) + if !found || limit <= 0 { + // Old-style Nextcloud or session limit not configured. Perform one request + // per room. Don't queue to avoid sending all ping requests to old-style + // instances at the same time but distribute across the interval. + return p.sendPingsDirect(ctx, room, url, entries) + } + + key := url.String() + + p.mu.Lock() + defer p.mu.Unlock() + if existing, found := p.entries[key]; found { + existing.Add(room, entries) + return nil + } + + if p.entries == nil { + p.entries = make(map[string]*pingEntries) + } + + p.entries[key] = newPingEntries(url, room, entries) + return nil +} + +func (p *RoomPing) DeleteRoom(room *Room) { + p.mu.Lock() + defer p.mu.Unlock() + + for _, entries := range p.entries { + entries.RemoveRoom(room) + } +} diff --git a/room_ping_test.go b/room_ping_test.go new file mode 100644 index 0000000..a58fb47 --- /dev/null +++ b/room_ping_test.go @@ -0,0 +1,249 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2022 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "context" + "net/http/httptest" + "net/url" + "testing" + + "github.com/gorilla/mux" +) + +func NewRoomPingForTest(t *testing.T) (*url.URL, *RoomPing) { + r := mux.NewRouter() + registerBackendHandler(t, r) + + server := httptest.NewServer(r) + t.Cleanup(func() { + server.Close() + }) + + config, err := getTestConfig(server) + if err != nil { + t.Fatal(err) + } + + backend, err := NewBackendClient(config, 1, "0.0") + if err != nil { + t.Fatal(err) + } + + p, err := NewRoomPing(backend, backend.capabilities) + if err != nil { + t.Fatal(err) + } + + u, err := url.Parse(server.URL) + if err != nil { + t.Fatal(err) + } + + return u, p +} + +func TestSingleRoomPing(t *testing.T) { + u, ping := NewRoomPingForTest(t) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + room1 := &Room{ + id: "sample-room-1", + } + entries1 := []BackendPingEntry{ + { + UserId: "foo", + SessionId: "123", + }, + } + if err := ping.SendPings(ctx, room1, u, entries1); err != nil { + t.Error(err) + } + if requests := getPingRequests(t); len(requests) != 1 { + t.Errorf("expected one ping request, got %+v", requests) + } else if len(requests[0].Ping.Entries) != 1 { + t.Errorf("expected one entry, got %+v", requests[0].Ping.Entries) + } + clearPingRequests(t) + + room2 := &Room{ + id: "sample-room-2", + } + entries2 := []BackendPingEntry{ + { + UserId: "bar", + SessionId: "456", + }, + } + if err := ping.SendPings(ctx, room2, u, entries2); err != nil { + t.Error(err) + } + if requests := getPingRequests(t); len(requests) != 1 { + t.Errorf("expected one ping request, got %+v", requests) + } else if len(requests[0].Ping.Entries) != 1 { + t.Errorf("expected one entry, got %+v", requests[0].Ping.Entries) + } + clearPingRequests(t) + + ping.publishActiveSessions() + if requests := getPingRequests(t); len(requests) != 0 { + t.Errorf("expected no ping requests, got %+v", requests) + } +} + +func TestMultiRoomPing(t *testing.T) { + u, ping := NewRoomPingForTest(t) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + room1 := &Room{ + id: "sample-room-1", + } + entries1 := []BackendPingEntry{ + { + UserId: "foo", + SessionId: "123", + }, + } + if err := ping.SendPings(ctx, room1, u, entries1); err != nil { + t.Error(err) + } + if requests := getPingRequests(t); len(requests) != 0 { + t.Errorf("expected no ping requests, got %+v", requests) + } + + room2 := &Room{ + id: "sample-room-2", + } + entries2 := []BackendPingEntry{ + { + UserId: "bar", + SessionId: "456", + }, + } + if err := ping.SendPings(ctx, room2, u, entries2); err != nil { + t.Error(err) + } + if requests := getPingRequests(t); len(requests) != 0 { + t.Errorf("expected no ping requests, got %+v", requests) + } + + ping.publishActiveSessions() + if requests := getPingRequests(t); len(requests) != 1 { + t.Errorf("expected one ping request, got %+v", requests) + } else if len(requests[0].Ping.Entries) != 2 { + t.Errorf("expected two entries, got %+v", requests[0].Ping.Entries) + } +} + +func TestMultiRoomPing_Separate(t *testing.T) { + u, ping := NewRoomPingForTest(t) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + room1 := &Room{ + id: "sample-room-1", + } + entries1 := []BackendPingEntry{ + { + UserId: "foo", + SessionId: "123", + }, + } + if err := ping.SendPings(ctx, room1, u, entries1); err != nil { + t.Error(err) + } + if requests := getPingRequests(t); len(requests) != 0 { + t.Errorf("expected no ping requests, got %+v", requests) + } + entries2 := []BackendPingEntry{ + { + UserId: "bar", + SessionId: "456", + }, + } + if err := ping.SendPings(ctx, room1, u, entries2); err != nil { + t.Error(err) + } + if requests := getPingRequests(t); len(requests) != 0 { + t.Errorf("expected no ping requests, got %+v", requests) + } + + ping.publishActiveSessions() + if requests := getPingRequests(t); len(requests) != 1 { + t.Errorf("expected one ping request, got %+v", requests) + } else if len(requests[0].Ping.Entries) != 2 { + t.Errorf("expected two entries, got %+v", requests[0].Ping.Entries) + } +} + +func TestMultiRoomPing_DeleteRoom(t *testing.T) { + u, ping := NewRoomPingForTest(t) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + room1 := &Room{ + id: "sample-room-1", + } + entries1 := []BackendPingEntry{ + { + UserId: "foo", + SessionId: "123", + }, + } + if err := ping.SendPings(ctx, room1, u, entries1); err != nil { + t.Error(err) + } + if requests := getPingRequests(t); len(requests) != 0 { + t.Errorf("expected no ping requests, got %+v", requests) + } + + room2 := &Room{ + id: "sample-room-2", + } + entries2 := []BackendPingEntry{ + { + UserId: "bar", + SessionId: "456", + }, + } + if err := ping.SendPings(ctx, room2, u, entries2); err != nil { + t.Error(err) + } + if requests := getPingRequests(t); len(requests) != 0 { + t.Errorf("expected no ping requests, got %+v", requests) + } + + ping.DeleteRoom(room2) + + ping.publishActiveSessions() + if requests := getPingRequests(t); len(requests) != 1 { + t.Errorf("expected one ping request, got %+v", requests) + } else if len(requests[0].Ping.Entries) != 1 { + t.Errorf("expected two entries, got %+v", requests[0].Ping.Entries) + } +}