Move Goroutines helpers to test package.

This commit is contained in:
Joachim Bauch 2025-12-10 14:47:31 +01:00
commit 98a8465e12
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
8 changed files with 185 additions and 86 deletions

View file

@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/server/v3/embed"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/test"
)
func (s *backendStorageEtcd) getWakeupChannelForTesting() <-chan struct{} {
@ -56,7 +57,7 @@ func (tl *testListener) EtcdClientCreated(client *EtcdClient) {
func Test_BackendStorageEtcdNoLeak(t *testing.T) { // nolint:paralleltest
logger := log.NewLoggerForTest(t)
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
etcd, client := NewEtcdClientForTest(t)
tl := &testListener{
etcd: etcd,

View file

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/test"
)
var (
@ -50,7 +51,7 @@ func TestFileWatcher_NotExist(t *testing.T) {
}
func TestFileWatcher_File(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
tmpdir := t.TempDir()
@ -92,7 +93,7 @@ func TestFileWatcher_File(t *testing.T) { // nolint:paralleltest
}
func TestFileWatcher_CurrentDir(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
tmpdir := t.TempDir()

View file

@ -38,6 +38,7 @@ import (
"go.etcd.io/etcd/server/v3/embed"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/test"
)
func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} {
@ -115,7 +116,7 @@ func waitForEvent(ctx context.Context, t *testing.T, ch <-chan struct{}) {
func Test_GrpcClients_EtcdInitial(t *testing.T) { // nolint:paralleltest
logger := log.NewLoggerForTest(t)
ctx := log.NewLoggerContext(t.Context(), logger)
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, addr1 := NewGrpcServerForTest(t)
_, addr2 := NewGrpcServerForTest(t)
@ -224,7 +225,7 @@ func Test_GrpcClients_EtcdIgnoreSelf(t *testing.T) {
func Test_GrpcClients_DnsDiscovery(t *testing.T) { // nolint:paralleltest
logger := log.NewLoggerForTest(t)
ctx := log.NewLoggerContext(t.Context(), logger)
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
lookup := newMockDnsLookupForTest(t)
@ -307,7 +308,7 @@ func Test_GrpcClients_DnsDiscoveryInitialFailed(t *testing.T) {
}
func Test_GrpcClients_Encryption(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
require := require.New(t)
serverKey, err := rsa.GenerateKey(rand.Reader, 1024)
require.NoError(err)

View file

@ -55,6 +55,7 @@ import (
"github.com/strukturag/nextcloud-spreed-signaling/api"
"github.com/strukturag/nextcloud-spreed-signaling/internal"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/test"
)
const (
@ -320,7 +321,7 @@ func WaitForHub(ctx context.Context, t *testing.T, h *Hub) {
case <-ctx.Done():
h.mu.Lock()
h.ru.Lock()
dumpGoroutines("", os.Stderr)
test.DumpGoroutines("", os.Stderr)
assert.Fail(t, "Error waiting for hub to terminate", "clients %+v / rooms %+v / sessions %+v / remoteSessions %v / federatedSessions %v / federationClients %v / %d read / %d write: %s",
h.clients,
h.rooms,
@ -1752,7 +1753,7 @@ func runGrpcProxyTest(t *testing.T, f func(hub1, hub2 *Hub, server1, server2 *ht
}
func TestClientHelloResumeProxy(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
runGrpcProxyTest(t, func(hub1, hub2 *Hub, server1, server2 *httptest.Server) {
require := require.New(t)
assert := assert.New(t)
@ -1802,7 +1803,7 @@ func TestClientHelloResumeProxy(t *testing.T) { // nolint:paralleltest
}
func TestClientHelloResumeProxy_Takeover(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
runGrpcProxyTest(t, func(hub1, hub2 *Hub, server1, server2 *httptest.Server) {
require := require.New(t)
assert := assert.New(t)
@ -1856,7 +1857,7 @@ func TestClientHelloResumeProxy_Takeover(t *testing.T) { // nolint:paralleltest
}
func TestClientHelloResumeProxy_Disconnect(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
runGrpcProxyTest(t, func(hub1, hub2 *Hub, server1, server2 *httptest.Server) {
require := require.New(t)
assert := assert.New(t)

View file

@ -35,6 +35,7 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/strukturag/nextcloud-spreed-signaling/log"
"github.com/strukturag/nextcloud-spreed-signaling/test"
)
func startLocalNatsServer(t *testing.T) (*server.Server, int) {
@ -113,7 +114,7 @@ func testNatsClient_Subscribe(t *testing.T, client NatsClient) {
}
func TestNatsClient_Subscribe(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, _, client := CreateLocalNatsClientForTest(t)
testNatsClient_Subscribe(t, client)
@ -127,7 +128,7 @@ func testNatsClient_PublishAfterClose(t *testing.T, client NatsClient) {
}
func TestNatsClient_PublishAfterClose(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, _, client := CreateLocalNatsClientForTest(t)
testNatsClient_PublishAfterClose(t, client)
@ -143,7 +144,7 @@ func testNatsClient_SubscribeAfterClose(t *testing.T, client NatsClient) {
}
func TestNatsClient_SubscribeAfterClose(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, _, client := CreateLocalNatsClientForTest(t)
testNatsClient_SubscribeAfterClose(t, client)
@ -165,7 +166,7 @@ func testNatsClient_BadSubjects(t *testing.T, client NatsClient) {
}
func TestNatsClient_BadSubjects(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
_, _, client := CreateLocalNatsClientForTest(t)
testNatsClient_BadSubjects(t, client)
@ -173,7 +174,7 @@ func TestNatsClient_BadSubjects(t *testing.T) { // nolint:paralleltest
}
func TestNatsClient_MaxReconnects(t *testing.T) { // nolint:paralleltest
ensureNoGoroutinesLeak(t, func(t *testing.T) {
test.EnsureNoGoroutinesLeak(t, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
reconnectWait := time.Millisecond

105
test/goroutines.go Normal file
View file

@ -0,0 +1,105 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package test
import (
"bytes"
"io"
"os"
"os/signal"
"runtime/pprof"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
var listenSignalOnce sync.Once
func EnsureNoGoroutinesLeak(t *testing.T, f func(t *testing.T)) {
t.Helper()
ensureNoGoroutinesLeak(t, f, false)
}
func ensureNoGoroutinesLeak(t *testing.T, f func(t *testing.T), fromTest bool) (int, int) {
t.Helper()
// Make sure test is not executed with "t.Parallel()"
t.Setenv("PARALLEL_CHECK", "1")
// The signal package will start a goroutine the first time "signal.Notify"
// is called. Do so outside the function under test so the signal goroutine
// will not be shown as "leaking".
listenSignalOnce.Do(func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
go func() {
for {
<-ch
}
}()
})
profile := pprof.Lookup("goroutine")
// Give time for things to settle before capturing the number of
// go routines
var before int
timeout := time.Now().Add(time.Second)
for time.Now().Before(timeout) {
before = profile.Count()
time.Sleep(10 * time.Millisecond)
if profile.Count() == before {
break
}
}
var prev bytes.Buffer
DumpGoroutines("Before:", &prev)
t.Run("leakcheck", f)
var after int
// Give time for things to settle before capturing the number of
// go routines
timeout = time.Now().Add(time.Second)
for time.Now().Before(timeout) {
after = profile.Count()
if after == before {
break
}
}
if after != before && !fromTest {
io.Copy(os.Stderr, &prev) // nolint
DumpGoroutines("After:", os.Stderr)
require.Equal(t, before, after, "Number of Go routines has changed")
}
return before, after
}
func DumpGoroutines(prefix string, w io.Writer) {
if prefix != "" {
io.WriteString(w, prefix+"\n") // nolint
}
profile := pprof.Lookup("goroutine")
profile.WriteTo(w, 2) // nolint
}

60
test/goroutines_test.go Normal file
View file

@ -0,0 +1,60 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2025 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package test
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestNoGoroutineLeak(t *testing.T) { // nolint:paralleltest
EnsureNoGoroutinesLeak(t, func(t *testing.T) {
stop := make(chan struct{})
stopped := make(chan struct{})
go func() {
defer close(stopped)
<-stop
}()
close(stop)
<-stopped
})
}
func TestLeakGoroutine(t *testing.T) { // nolint:paralleltest
stop := make(chan struct{})
stopped := make(chan struct{})
before, after := ensureNoGoroutinesLeak(t, func(t *testing.T) {
go func() {
defer close(stopped)
<-stop
}()
}, true)
close(stop)
<-stopped
assert.Equal(t, 1, after-before, "wrong number of leaked goroutines")
}

View file

@ -22,84 +22,13 @@
package signaling
import (
"bytes"
"context"
"encoding/json"
"io"
"os"
"os/signal"
"runtime/pprof"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var listenSignalOnce sync.Once
func ensureNoGoroutinesLeak(t *testing.T, f func(t *testing.T)) {
t.Helper()
// Make sure test is not executed with "t.Parallel()"
t.Setenv("PARALLEL_CHECK", "1")
// The signal package will start a goroutine the first time "signal.Notify"
// is called. Do so outside the function under test so the signal goroutine
// will not be shown as "leaking".
listenSignalOnce.Do(func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
go func() {
for {
<-ch
}
}()
})
profile := pprof.Lookup("goroutine")
// Give time for things to settle before capturing the number of
// go routines
var before int
timeout := time.Now().Add(time.Second)
for time.Now().Before(timeout) {
before = profile.Count()
time.Sleep(10 * time.Millisecond)
if profile.Count() == before {
break
}
}
var prev bytes.Buffer
dumpGoroutines("Before:", &prev)
t.Run("leakcheck", f)
var after int
// Give time for things to settle before capturing the number of
// go routines
timeout = time.Now().Add(time.Second)
for time.Now().Before(timeout) {
after = profile.Count()
if after == before {
break
}
}
if after != before {
io.Copy(os.Stderr, &prev) // nolint
dumpGoroutines("After:", os.Stderr)
require.Equal(t, before, after, "Number of Go routines has changed")
}
}
func dumpGoroutines(prefix string, w io.Writer) {
if prefix != "" {
io.WriteString(w, prefix+"\n") // nolint
}
profile := pprof.Lookup("goroutine")
profile.WriteTo(w, 2) // nolint
}
func WaitForUsersJoined(ctx context.Context, t *testing.T, client1 *TestClient, hello1 *ServerMessage, client2 *TestClient, hello2 *ServerMessage) {
t.Helper()
// We will receive "joined" events for all clients. The ordering is not