From 6c5eb78cc2e26ccfd17b99f554f179fc3e86500a Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 21 Nov 2024 14:58:18 +0100 Subject: [PATCH 1/5] Use a single random string per concurrency level. --- concurrentmap_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/concurrentmap_test.go b/concurrentmap_test.go index cca1d29..276b038 100644 --- a/concurrentmap_test.go +++ b/concurrentmap_test.go @@ -82,8 +82,9 @@ func TestConcurrentStringStringMap(t *testing.T) { defer wg.Done() key := "key-" + strconv.Itoa(x) + rnd := newRandomString(32) for y := 0; y < count; y = y + 1 { - value := newRandomString(32) + value := rnd + "-" + strconv.Itoa(y) m.Set(key, value) if v, found := m.Get(key); !assert.True(found, "Expected entry for key %s", key) || !assert.Equal(value, v, "Unexpected value for key %s", key) { From 53ff3d39e794c07785ab2b4d4bafd86a54ccdf2a Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 21 Nov 2024 19:49:28 +0100 Subject: [PATCH 2/5] Add buffer pool helper class. --- buffer_pool.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 buffer_pool.go diff --git a/buffer_pool.go b/buffer_pool.go new file mode 100644 index 0000000..5e13c0b --- /dev/null +++ b/buffer_pool.go @@ -0,0 +1,79 @@ +/** + * Standalone signaling server for the Nextcloud Spreed app. + * Copyright (C) 2024 struktur AG + * + * @author Joachim Bauch + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package signaling + +import ( + "bytes" + "encoding/json" + "io" + "sync" +) + +type BufferPool struct { + buffers sync.Pool + copyBuffers sync.Pool +} + +func (p *BufferPool) Get() *bytes.Buffer { + b := p.buffers.Get() + if b == nil { + return bytes.NewBuffer(nil) + } + + return b.(*bytes.Buffer) +} + +func (p *BufferPool) Put(b *bytes.Buffer) { + if b == nil { + return + } + + b.Reset() + p.buffers.Put(b) +} + +func (p *BufferPool) ReadAll(r io.Reader) (*bytes.Buffer, error) { + buf := p.copyBuffers.Get() + if buf == nil { + buf = make([]byte, 1024) + } + defer p.copyBuffers.Put(buf) + + b := p.Get() + if _, err := io.CopyBuffer(b, r, buf.([]byte)); err != nil { + p.Put(b) + return nil, err + } + + return b, nil +} + +func (p *BufferPool) MarshalAsJSON(v any) (*bytes.Buffer, error) { + b := p.Get() + encoder := json.NewEncoder(b) + if err := encoder.Encode(v); err != nil { + p.Put(b) + return nil, err + } + + return b, nil +} From bfc4d7facf6282f0838196001d2c1f5f3321b29f Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 21 Nov 2024 19:51:14 +0100 Subject: [PATCH 3/5] Set WriteBufferPool for websocket connections. --- federation.go | 8 ++++++-- hub.go | 3 +++ hub_test.go | 2 +- janus_client.go | 5 +++-- testclient_test.go | 6 +++++- 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/federation.go b/federation.go index 9b609c8..d1ac58b 100644 --- a/federation.go +++ b/federation.go @@ -46,6 +46,8 @@ const ( var ( ErrFederationNotSupported = NewError("federation_unsupported", "The target server does not support federation.") + + federationWriteBufferPool = &sync.Pool{} ) func isClosedError(err error) bool { @@ -102,7 +104,9 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession, return nil, fmt.Errorf("expected federation room message, got %+v", message) } - var dialer websocket.Dialer + dialer := &websocket.Dialer{ + WriteBufferPool: federationWriteBufferPool, + } if hub.skipFederationVerify { dialer.TLSClientConfig = &tls.Config{ InsecureSkipVerify: true, @@ -130,7 +134,7 @@ func NewFederationClient(ctx context.Context, hub *Hub, session *ClientSession, reconnectDelay: initialFederationReconnectInterval, - dialer: &dialer, + dialer: dialer, url: url, closer: NewCloser(), } diff --git a/hub.go b/hub.go index e7143c6..4b00a38 100644 --- a/hub.go +++ b/hub.go @@ -105,6 +105,8 @@ var ( websocketReadBufferSize = 4096 websocketWriteBufferSize = 4096 + websocketWriteBufferPool = &sync.Pool{} + // Delay after which a screen publisher should be cleaned up. cleanupScreenPublisherDelay = time.Second @@ -322,6 +324,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer upgrader: websocket.Upgrader{ ReadBufferSize: websocketReadBufferSize, WriteBufferSize: websocketWriteBufferSize, + WriteBufferPool: websocketWriteBufferPool, }, cookie: NewSessionIdCodec([]byte(hashKey), blockBytes), info: NewWelcomeServerMessage(version, DefaultFeatures...), diff --git a/hub_test.go b/hub_test.go index 4213655..b364623 100644 --- a/hub_test.go +++ b/hub_test.go @@ -767,7 +767,7 @@ func TestWebsocketFeatures(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() - conn, response, err := websocket.DefaultDialer.DialContext(ctx, getWebsocketUrl(server.URL), nil) + conn, response, err := testClientDialer.DialContext(ctx, getWebsocketUrl(server.URL), nil) require.NoError(err) defer conn.Close() // nolint diff --git a/janus_client.go b/janus_client.go index 5716bef..33a7ec2 100644 --- a/janus_client.go +++ b/janus_client.go @@ -119,8 +119,9 @@ const ( var ( janusDialer = websocket.Dialer{ - Subprotocols: []string{"janus-protocol"}, - Proxy: http.ProxyFromEnvironment, + Subprotocols: []string{"janus-protocol"}, + Proxy: http.ProxyFromEnvironment, + WriteBufferPool: &sync.Pool{}, } ) diff --git a/testclient_test.go b/testclient_test.go index 9078bcc..836fb6a 100644 --- a/testclient_test.go +++ b/testclient_test.go @@ -49,6 +49,10 @@ var ( testInternalSecret = []byte("internal-secret") ErrNoMessageReceived = fmt.Errorf("no message was received by the server") + + testClientDialer = websocket.Dialer{ + WriteBufferPool: &sync.Pool{}, + } ) type TestBackendClientAuthParams struct { @@ -226,7 +230,7 @@ type TestClient struct { func NewTestClientContext(ctx context.Context, t *testing.T, server *httptest.Server, hub *Hub) *TestClient { // Reference "hub" to prevent compiler error. - conn, _, err := websocket.DefaultDialer.DialContext(ctx, getWebsocketUrl(server.URL), nil) + conn, _, err := testClientDialer.DialContext(ctx, getWebsocketUrl(server.URL), nil) require.NoError(t, err) messageChan := make(chan []byte) From 411cf34437482954108f13ed800a9afdde588748 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 21 Nov 2024 19:54:50 +0100 Subject: [PATCH 4/5] Use buffer pool for reading data. --- backend_client.go | 18 ++++++++++-------- backend_server.go | 7 +++++-- capabilities.go | 13 ++++++++----- client.go | 12 +++--------- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/backend_client.go b/backend_client.go index f39fb15..e3eecef 100644 --- a/backend_client.go +++ b/backend_client.go @@ -27,7 +27,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "log" "net/http" "net/url" @@ -51,6 +50,7 @@ type BackendClient struct { pool *HttpClientPool capabilities *Capabilities + buffers BufferPool } func NewBackendClient(config *goconf.ConfigFile, maxConcurrentRequestsPerHost int, version string, etcdClient *EtcdClient) (*BackendClient, error) { @@ -175,12 +175,14 @@ func (b *BackendClient) PerformJSONRequest(ctx context.Context, u *url.URL, requ return ErrUnsupportedContentType } - body, err := io.ReadAll(resp.Body) + body, err := b.buffers.ReadAll(resp.Body) if err != nil { log.Printf("Could not read response body from %s: %s", req.URL, err) return err } + defer b.buffers.Put(body) + if isOcsRequest(u) || req.Header.Get("OCS-APIRequest") != "" { // OCS response are wrapped in an OCS container that needs to be parsed // to get the actual contents: @@ -191,17 +193,17 @@ func (b *BackendClient) PerformJSONRequest(ctx context.Context, u *url.URL, requ // } // } var ocs OcsResponse - if err := json.Unmarshal(body, &ocs); err != nil { - log.Printf("Could not decode OCS response %s from %s: %s", string(body), req.URL, err) + if err := json.Unmarshal(body.Bytes(), &ocs); err != nil { + log.Printf("Could not decode OCS response %s from %s: %s", body.String(), req.URL, err) return err } else if ocs.Ocs == nil || len(ocs.Ocs.Data) == 0 { - log.Printf("Incomplete OCS response %s from %s", string(body), req.URL) + log.Printf("Incomplete OCS response %s from %s", body.String(), req.URL) return ErrIncompleteResponse } switch ocs.Ocs.Meta.StatusCode { case http.StatusTooManyRequests: - log.Printf("Throttled OCS response %s from %s", string(body), req.URL) + log.Printf("Throttled OCS response %s from %s", body.String(), req.URL) return ErrThrottledResponse } @@ -209,8 +211,8 @@ func (b *BackendClient) PerformJSONRequest(ctx context.Context, u *url.URL, requ log.Printf("Could not decode OCS response body %s from %s: %s", string(ocs.Ocs.Data), req.URL, err) return err } - } else if err := json.Unmarshal(body, response); err != nil { - log.Printf("Could not decode response body %s from %s: %s", string(body), req.URL, err) + } else if err := json.Unmarshal(body.Bytes(), response); err != nil { + log.Printf("Could not decode response body %s from %s: %s", body.String(), req.URL, err) return err } return nil diff --git a/backend_server.go b/backend_server.go index e7aaebd..fa66074 100644 --- a/backend_server.go +++ b/backend_server.go @@ -70,6 +70,8 @@ type BackendServer struct { statsAllowedIps atomic.Pointer[AllowedIps] invalidSecret []byte + + buffers BufferPool } func NewBackendServer(config *goconf.ConfigFile, hub *Hub, version string) (*BackendServer, error) { @@ -284,14 +286,15 @@ func (b *BackendServer) parseRequestBody(f func(http.ResponseWriter, *http.Reque return } - body, err := io.ReadAll(r.Body) + body, err := b.buffers.ReadAll(r.Body) if err != nil { log.Println("Error reading body: ", err) http.Error(w, "Could not read body", http.StatusBadRequest) return } + defer b.buffers.Put(body) - f(w, r, body) + f(w, r, body.Bytes()) } } diff --git a/capabilities.go b/capabilities.go index d77731c..993e60c 100644 --- a/capabilities.go +++ b/capabilities.go @@ -25,7 +25,6 @@ import ( "context" "encoding/json" "errors" - "io" "log" "net/http" "net/url" @@ -182,18 +181,20 @@ func (e *capabilitiesEntry) update(ctx context.Context, u *url.URL, now time.Tim return e.errorIfMustRevalidate(ErrUnsupportedContentType) } - body, err := io.ReadAll(response.Body) + body, err := e.c.buffers.ReadAll(response.Body) if err != nil { log.Printf("Could not read response body from %s: %s", url, err) return e.errorIfMustRevalidate(err) } + defer e.c.buffers.Put(body) + var ocs OcsResponse - if err := json.Unmarshal(body, &ocs); err != nil { - log.Printf("Could not decode OCS response %s from %s: %s", string(body), url, err) + if err := json.Unmarshal(body.Bytes(), &ocs); err != nil { + log.Printf("Could not decode OCS response %s from %s: %s", body.String(), url, err) return e.errorIfMustRevalidate(err) } else if ocs.Ocs == nil || len(ocs.Ocs.Data) == 0 { - log.Printf("Incomplete OCS response %s from %s", string(body), url) + log.Printf("Incomplete OCS response %s from %s", body.String(), url) return e.errorIfMustRevalidate(ErrIncompleteResponse) } @@ -239,6 +240,8 @@ type Capabilities struct { pool *HttpClientPool entries map[string]*capabilitiesEntry nextInvalidate map[string]time.Time + + buffers BufferPool } func NewCapabilities(version string, pool *HttpClientPool) (*Capabilities, error) { diff --git a/client.go b/client.go index 6c534d4..3fd6ce3 100644 --- a/client.go +++ b/client.go @@ -82,11 +82,7 @@ func IsValidCountry(country string) bool { var ( InvalidFormat = NewError("invalid_format", "Invalid data format.") - bufferPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, - } + bufferPool BufferPool ) type WritableClientMessage interface { @@ -391,10 +387,8 @@ func (c *Client) ReadPump() { continue } - decodeBuffer := bufferPool.Get().(*bytes.Buffer) - decodeBuffer.Reset() - if _, err := decodeBuffer.ReadFrom(reader); err != nil { - bufferPool.Put(decodeBuffer) + decodeBuffer, err := bufferPool.ReadAll(reader) + if err != nil { if sessionId := c.GetSessionId(); sessionId != "" { log.Printf("Error reading message from client %s: %v", sessionId, err) } else { From 36f2f5026ffa7a922b89536888227fdbea93506e Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Thu, 21 Nov 2024 20:04:49 +0100 Subject: [PATCH 5/5] Use buffer pool to marshal request body. --- backend_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backend_client.go b/backend_client.go index e3eecef..b027673 100644 --- a/backend_client.go +++ b/backend_client.go @@ -22,7 +22,6 @@ package signaling import ( - "bytes" "context" "encoding/json" "errors" @@ -140,13 +139,14 @@ func (b *BackendClient) PerformJSONRequest(ctx context.Context, u *url.URL, requ } defer pool.Put(c) - data, err := json.Marshal(request) + data, err := b.buffers.MarshalAsJSON(request) if err != nil { log.Printf("Could not marshal request %+v: %s", request, err) return err } - req, err := http.NewRequestWithContext(ctx, "POST", requestUrl.String(), bytes.NewReader(data)) + defer b.buffers.Put(data) + req, err := http.NewRequestWithContext(ctx, "POST", requestUrl.String(), data) if err != nil { log.Printf("Could not create request to %s: %s", requestUrl, err) return err @@ -160,11 +160,11 @@ func (b *BackendClient) PerformJSONRequest(ctx context.Context, u *url.URL, requ } // Add checksum so the backend can validate the request. - AddBackendChecksum(req, data, secret) + AddBackendChecksum(req, data.Bytes(), secret) resp, err := c.Do(req) if err != nil { - log.Printf("Could not send request %s to %s: %s", string(data), req.URL, err) + log.Printf("Could not send request %s to %s: %s", data.String(), req.URL, err) return err } defer resp.Body.Close()