From fbbea4a1fa72f3e83e944004b52469d3fc326770 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Fri, 6 Feb 2026 10:01:43 +0100 Subject: [PATCH] vfs: fix S3 range off-by-one and part timeouts wait for all the goroutine before retruning from multipart uploads Signed-off-by: Nicola Murino --- internal/vfs/azblobfs.go | 21 ++++++++++++++++----- internal/vfs/s3fs.go | 18 ++++++++++++++---- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/internal/vfs/azblobfs.go b/internal/vfs/azblobfs.go index 72661c90..73d0ae37 100644 --- a/internal/vfs/azblobfs.go +++ b/internal/vfs/azblobfs.go @@ -1025,6 +1025,13 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read poolCtx, poolCancel := context.WithCancel(ctx) defer poolCancel() + finalizeFailedUpload := func(err error) { + fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err) + hasError.Store(true) + poolError = fmt.Errorf("multipart upload error: %w", err) + poolCancel() + } + for part := 0; !finished; part++ { buf := pool.getBuffer() @@ -1038,7 +1045,10 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read finished = true } else if err != nil { pool.releaseBuffer(buf) - return err + errOnce.Do(func() { + finalizeFailedUpload(err) + }) + break } // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks @@ -1046,7 +1056,10 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read generatedUUID, err := uuid.NewRandom() if err != nil { pool.releaseBuffer(buf) - return fmt.Errorf("unable to generate block ID: %w", err) + errOnce.Do(func() { + finalizeFailedUpload(err) + }) + break } blockID := base64.StdEncoding.EncodeToString([]byte(generatedUUID.String())) blocks = append(blocks, blockID) @@ -1076,9 +1089,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read if err != nil { errOnce.Do(func() { fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err) - hasError.Store(true) - poolError = fmt.Errorf("multipart upload error: %w", err) - poolCancel() + finalizeFailedUpload(err) }) } }(blockID, buf, n) diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index 0d0eb760..3efa8586 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -793,7 +793,10 @@ func (fs *S3Fs) hasContents(name string) (bool, error) { } func (fs *S3Fs) downloadPart(ctx context.Context, name string, buf []byte, w io.WriterAt, start, count, writeOffset int64) error { - rangeHeader := fmt.Sprintf("bytes=%d-%d", start, start+count) + if count == 0 { + return nil + } + rangeHeader := fmt.Sprintf("bytes=%d-%d", start, start+count-1) resp, err := fs.svc.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(fs.config.Bucket), @@ -832,7 +835,7 @@ func (fs *S3Fs) handleDownload(ctx context.Context, name string, offset int64, w guard := make(chan struct{}, fs.config.DownloadConcurrency) var blockCtxTimeout time.Duration if fs.config.DownloadPartMaxTime > 0 { - blockCtxTimeout = time.Duration(fs.config.DownloadPartSize) * time.Second + blockCtxTimeout = time.Duration(fs.config.DownloadPartMaxTime) * time.Second } else { blockCtxTimeout = time.Duration(fs.config.DownloadPartSize/(1024*1024)) * time.Minute } @@ -921,7 +924,7 @@ func (fs *S3Fs) initiateMultipartUpload(ctx context.Context, name, contentType s func (fs *S3Fs) uploadPart(ctx context.Context, name, uploadID string, partNumber int32, data []byte) (*string, error) { timeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute if fs.config.UploadPartMaxTime > 0 { - timeout = time.Duration(fs.config.UploadPartMaxTime) + timeout = time.Duration(fs.config.UploadPartMaxTime) * time.Second } ctx, cancelFn := context.WithDeadline(ctx, time.Now().Add(timeout)) defer cancelFn() @@ -970,6 +973,13 @@ func (fs *S3Fs) abortMultipartUpload(name, uploadID string) error { } func (fs *S3Fs) singlePartUpload(ctx context.Context, name, contentType string, data []byte) error { + timeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute + if fs.config.UploadPartMaxTime > 0 { + timeout = time.Duration(fs.config.UploadPartMaxTime) * time.Second + } + ctx, cancelFn := context.WithDeadline(ctx, time.Now().Add(timeout)) + defer cancelFn() + contentLength := int64(len(data)) _, err := fs.svc.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(fs.config.Bucket), @@ -1069,7 +1079,7 @@ func (fs *S3Fs) handleUpload(ctx context.Context, reader io.Reader, name, conten errOnce.Do(func() { finalizeFailedUpload(err) }) - return err + break } guard <- struct{}{} if hasError.Load() {