vfs: fix S3 range off-by-one and part timeouts

wait for all the goroutine before retruning from multipart uploads

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2026-02-06 10:01:43 +01:00
commit fbbea4a1fa
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF
2 changed files with 30 additions and 9 deletions

View file

@ -1025,6 +1025,13 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
poolCtx, poolCancel := context.WithCancel(ctx)
defer poolCancel()
finalizeFailedUpload := func(err error) {
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
hasError.Store(true)
poolError = fmt.Errorf("multipart upload error: %w", err)
poolCancel()
}
for part := 0; !finished; part++ {
buf := pool.getBuffer()
@ -1038,7 +1045,10 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
finished = true
} else if err != nil {
pool.releaseBuffer(buf)
return err
errOnce.Do(func() {
finalizeFailedUpload(err)
})
break
}
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
@ -1046,7 +1056,10 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
generatedUUID, err := uuid.NewRandom()
if err != nil {
pool.releaseBuffer(buf)
return fmt.Errorf("unable to generate block ID: %w", err)
errOnce.Do(func() {
finalizeFailedUpload(err)
})
break
}
blockID := base64.StdEncoding.EncodeToString([]byte(generatedUUID.String()))
blocks = append(blocks, blockID)
@ -1076,9 +1089,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
if err != nil {
errOnce.Do(func() {
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
hasError.Store(true)
poolError = fmt.Errorf("multipart upload error: %w", err)
poolCancel()
finalizeFailedUpload(err)
})
}
}(blockID, buf, n)

View file

@ -793,7 +793,10 @@ func (fs *S3Fs) hasContents(name string) (bool, error) {
}
func (fs *S3Fs) downloadPart(ctx context.Context, name string, buf []byte, w io.WriterAt, start, count, writeOffset int64) error {
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, start+count)
if count == 0 {
return nil
}
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, start+count-1)
resp, err := fs.svc.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(fs.config.Bucket),
@ -832,7 +835,7 @@ func (fs *S3Fs) handleDownload(ctx context.Context, name string, offset int64, w
guard := make(chan struct{}, fs.config.DownloadConcurrency)
var blockCtxTimeout time.Duration
if fs.config.DownloadPartMaxTime > 0 {
blockCtxTimeout = time.Duration(fs.config.DownloadPartSize) * time.Second
blockCtxTimeout = time.Duration(fs.config.DownloadPartMaxTime) * time.Second
} else {
blockCtxTimeout = time.Duration(fs.config.DownloadPartSize/(1024*1024)) * time.Minute
}
@ -921,7 +924,7 @@ func (fs *S3Fs) initiateMultipartUpload(ctx context.Context, name, contentType s
func (fs *S3Fs) uploadPart(ctx context.Context, name, uploadID string, partNumber int32, data []byte) (*string, error) {
timeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute
if fs.config.UploadPartMaxTime > 0 {
timeout = time.Duration(fs.config.UploadPartMaxTime)
timeout = time.Duration(fs.config.UploadPartMaxTime) * time.Second
}
ctx, cancelFn := context.WithDeadline(ctx, time.Now().Add(timeout))
defer cancelFn()
@ -970,6 +973,13 @@ func (fs *S3Fs) abortMultipartUpload(name, uploadID string) error {
}
func (fs *S3Fs) singlePartUpload(ctx context.Context, name, contentType string, data []byte) error {
timeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute
if fs.config.UploadPartMaxTime > 0 {
timeout = time.Duration(fs.config.UploadPartMaxTime) * time.Second
}
ctx, cancelFn := context.WithDeadline(ctx, time.Now().Add(timeout))
defer cancelFn()
contentLength := int64(len(data))
_, err := fs.svc.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(fs.config.Bucket),
@ -1069,7 +1079,7 @@ func (fs *S3Fs) handleUpload(ctx context.Context, reader io.Reader, name, conten
errOnce.Do(func() {
finalizeFailedUpload(err)
})
return err
break
}
guard <- struct{}{}
if hasError.Load() {