Allow running file watcher tests to run in parallel.

This commit is contained in:
Joachim Bauch 2025-12-05 12:15:26 +01:00
commit b86d05de08
No known key found for this signature in database
GPG key ID: 77C1D22D53E15F02
5 changed files with 41 additions and 51 deletions

View file

@ -27,6 +27,7 @@ import (
"fmt"
"os"
"sync/atomic"
"testing"
)
type CertificateReloader struct {
@ -49,17 +50,22 @@ func NewCertificateReloader(logger Logger, certFile string, keyFile string) (*Ce
return nil, fmt.Errorf("could not load certificate / key: %w", err)
}
deduplicate := defaultDeduplicateWatchEvents
if testing.Testing() {
deduplicate = 0
}
reloader := &CertificateReloader{
logger: logger,
certFile: certFile,
keyFile: keyFile,
}
reloader.certificate.Store(&pair)
reloader.certWatcher, err = NewFileWatcher(reloader.logger, certFile, reloader.reload)
reloader.certWatcher, err = NewFileWatcher(reloader.logger, certFile, reloader.reload, deduplicate)
if err != nil {
return nil, err
}
reloader.keyWatcher, err = NewFileWatcher(reloader.logger, keyFile, reloader.reload)
reloader.keyWatcher, err = NewFileWatcher(reloader.logger, keyFile, reloader.reload, deduplicate)
if err != nil {
reloader.certWatcher.Close() // nolint
return nil, err
@ -132,12 +138,17 @@ func NewCertPoolReloader(logger Logger, certFile string) (*CertPoolReloader, err
return nil, err
}
deduplicate := defaultDeduplicateWatchEvents
if testing.Testing() {
deduplicate = 0
}
reloader := &CertPoolReloader{
logger: logger,
certFile: certFile,
}
reloader.pool.Store(pool)
reloader.certWatcher, err = NewFileWatcher(reloader.logger, certFile, reloader.reload)
reloader.certWatcher, err = NewFileWatcher(reloader.logger, certFile, reloader.reload, deduplicate)
if err != nil {
return nil, err
}

View file

@ -23,22 +23,9 @@ package signaling
import (
"context"
"testing"
"time"
)
func UpdateCertificateCheckIntervalForTest(t *testing.T, interval time.Duration) {
t.Helper()
// Make sure test is not executed with "t.Parallel()"
t.Setenv("PARALLEL_CHECK", "1")
old := deduplicateWatchEvents.Load()
t.Cleanup(func() {
deduplicateWatchEvents.Store(old)
})
deduplicateWatchEvents.Store(int64(interval))
}
func (r *CertificateReloader) WaitForReload(ctx context.Context, counter uint64) error {
for counter == r.GetReloadCounter() {
if err := ctx.Err(); err != nil {

View file

@ -29,7 +29,6 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/fsnotify/fsnotify"
@ -39,28 +38,21 @@ const (
defaultDeduplicateWatchEvents = 100 * time.Millisecond
)
var (
deduplicateWatchEvents atomic.Int64
)
func init() {
deduplicateWatchEvents.Store(int64(defaultDeduplicateWatchEvents))
}
type FileWatcherCallback func(filename string)
type FileWatcher struct {
logger Logger
filename string
target string
callback FileWatcherCallback
logger Logger
filename string
target string
callback FileWatcherCallback
deduplicate time.Duration
watcher *fsnotify.Watcher
closeCtx context.Context
closeFunc context.CancelFunc
}
func NewFileWatcher(logger Logger, filename string, callback FileWatcherCallback) (*FileWatcher, error) {
func NewFileWatcher(logger Logger, filename string, callback FileWatcherCallback, deduplicate time.Duration) (*FileWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
@ -74,10 +66,11 @@ func NewFileWatcher(logger Logger, filename string, callback FileWatcherCallback
closeCtx, closeFunc := context.WithCancel(context.Background())
w := &FileWatcher{
logger: logger,
filename: filename,
callback: callback,
watcher: watcher,
logger: logger,
filename: filename,
callback: callback,
deduplicate: deduplicate,
watcher: watcher,
closeCtx: closeCtx,
closeFunc: closeFunc,
@ -115,8 +108,7 @@ func (f *FileWatcher) run() {
timers := make(map[string]*time.Timer)
triggerEvent := func(event fsnotify.Event) {
deduplicate := time.Duration(deduplicateWatchEvents.Load())
if deduplicate <= 0 {
if f.deduplicate <= 0 {
f.callback(f.filename)
return
}
@ -128,7 +120,7 @@ func (f *FileWatcher) run() {
t, found := timers[filename]
mu.Unlock()
if !found {
t = time.AfterFunc(deduplicate, func() {
t = time.AfterFunc(f.deduplicate, func() {
f.callback(f.filename)
mu.Lock()
@ -139,7 +131,7 @@ func (f *FileWatcher) run() {
timers[filename] = t
mu.Unlock()
} else {
t.Reset(deduplicate)
t.Reset(f.deduplicate)
}
}

View file

@ -40,7 +40,7 @@ func TestFileWatcher_NotExist(t *testing.T) {
assert := assert.New(t)
tmpdir := t.TempDir()
logger := NewLoggerForTest(t)
if w, err := NewFileWatcher(logger, path.Join(tmpdir, "test.txt"), func(filename string) {}); !assert.ErrorIs(err, os.ErrNotExist) {
if w, err := NewFileWatcher(logger, path.Join(tmpdir, "test.txt"), func(filename string) {}, defaultDeduplicateWatchEvents); !assert.ErrorIs(err, os.ErrNotExist) {
if w != nil {
assert.NoError(w.Close())
}
@ -59,7 +59,7 @@ func TestFileWatcher_File(t *testing.T) { // nolint:paralleltest
modified := make(chan struct{})
w, err := NewFileWatcher(logger, filename, func(filename string) {
modified <- struct{}{}
})
}, defaultDeduplicateWatchEvents)
require.NoError(err)
defer w.Close()
@ -102,7 +102,7 @@ func TestFileWatcher_CurrentDir(t *testing.T) { // nolint:paralleltest
modified := make(chan struct{})
w, err := NewFileWatcher(logger, "./"+path.Base(filename), func(filename string) {
modified <- struct{}{}
})
}, defaultDeduplicateWatchEvents)
require.NoError(err)
defer w.Close()
@ -144,7 +144,7 @@ func TestFileWatcher_Rename(t *testing.T) {
modified := make(chan struct{})
w, err := NewFileWatcher(logger, filename, func(filename string) {
modified <- struct{}{}
})
}, defaultDeduplicateWatchEvents)
require.NoError(err)
defer w.Close()
@ -188,7 +188,7 @@ func TestFileWatcher_Symlink(t *testing.T) {
modified := make(chan struct{})
w, err := NewFileWatcher(logger, filename, func(filename string) {
modified <- struct{}{}
})
}, defaultDeduplicateWatchEvents)
require.NoError(err)
defer w.Close()
@ -223,7 +223,7 @@ func TestFileWatcher_ChangeSymlinkTarget(t *testing.T) {
modified := make(chan struct{})
w, err := NewFileWatcher(logger, filename, func(filename string) {
modified <- struct{}{}
})
}, defaultDeduplicateWatchEvents)
require.NoError(err)
defer w.Close()
@ -260,7 +260,7 @@ func TestFileWatcher_OtherSymlink(t *testing.T) {
modified := make(chan struct{})
w, err := NewFileWatcher(logger, filename, func(filename string) {
modified <- struct{}{}
})
}, defaultDeduplicateWatchEvents)
require.NoError(err)
defer w.Close()
@ -291,7 +291,7 @@ func TestFileWatcher_RenameSymlinkTarget(t *testing.T) {
modified := make(chan struct{})
w, err := NewFileWatcher(logger, filename, func(filename string) {
modified <- struct{}{}
})
}, defaultDeduplicateWatchEvents)
require.NoError(err)
defer w.Close()
@ -345,7 +345,7 @@ func TestFileWatcher_UpdateSymlinkFolder(t *testing.T) {
modified := make(chan struct{})
w, err := NewFileWatcher(logger, filename, func(filename string) {
modified <- struct{}{}
})
}, defaultDeduplicateWatchEvents)
require.NoError(err)
defer w.Close()

View file

@ -97,7 +97,8 @@ func NewGrpcServerForTest(t *testing.T) (server *GrpcServer, addr string) {
return NewGrpcServerForTestWithConfig(t, config)
}
func Test_GrpcServer_ReloadCerts(t *testing.T) { // nolint:paralleltest
func Test_GrpcServer_ReloadCerts(t *testing.T) {
t.Parallel()
require := require.New(t)
assert := assert.New(t)
key, err := rsa.GenerateKey(rand.Reader, 1024)
@ -118,7 +119,6 @@ func Test_GrpcServer_ReloadCerts(t *testing.T) { // nolint:paralleltest
config.AddOption("grpc", "servercertificate", certFile)
config.AddOption("grpc", "serverkey", privkeyFile)
UpdateCertificateCheckIntervalForTest(t, 0)
server, addr := NewGrpcServerForTestWithConfig(t, config)
cp1 := x509.NewCertPool()
@ -167,7 +167,8 @@ func Test_GrpcServer_ReloadCerts(t *testing.T) { // nolint:paralleltest
}
}
func Test_GrpcServer_ReloadCA(t *testing.T) { // nolint:paralleltest
func Test_GrpcServer_ReloadCA(t *testing.T) {
t.Parallel()
logger := NewLoggerForTest(t)
require := require.New(t)
serverKey, err := rsa.GenerateKey(rand.Reader, 1024)
@ -194,7 +195,6 @@ func Test_GrpcServer_ReloadCA(t *testing.T) { // nolint:paralleltest
config.AddOption("grpc", "serverkey", privkeyFile)
config.AddOption("grpc", "clientca", caFile)
UpdateCertificateCheckIntervalForTest(t, 0)
server, addr := NewGrpcServerForTestWithConfig(t, config)
pool := x509.NewCertPool()