vfs: fix S3 range off-by-one and part timeouts

wait for all the goroutine before retruning from multipart uploads

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2026-02-07 20:40:09 +01:00
commit 817f6f6cb1
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF
2 changed files with 25 additions and 7 deletions

View file

@ -1026,6 +1026,13 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
poolCtx, poolCancel := context.WithCancel(ctx)
defer poolCancel()
finalizeFailedUpload := func(err error) {
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
hasError.Store(true)
poolError = fmt.Errorf("multipart upload error: %w", err)
poolCancel()
}
for part := 0; !finished; part++ {
buf := pool.getBuffer()
@ -1039,7 +1046,10 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
finished = true
} else if err != nil {
pool.releaseBuffer(buf)
return err
errOnce.Do(func() {
finalizeFailedUpload(err)
})
break
}
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
@ -1047,7 +1057,10 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
generatedUUID, err := uuid.NewRandom()
if err != nil {
pool.releaseBuffer(buf)
return fmt.Errorf("unable to generate block ID: %w", err)
errOnce.Do(func() {
finalizeFailedUpload(err)
})
break
}
blockID := base64.StdEncoding.EncodeToString([]byte(generatedUUID.String()))
blocks = append(blocks, blockID)
@ -1077,9 +1090,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
if err != nil {
errOnce.Do(func() {
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
hasError.Store(true)
poolError = fmt.Errorf("multipart upload error: %w", err)
poolCancel()
finalizeFailedUpload(err)
})
}
}(blockID, buf, n)

View file

@ -841,7 +841,7 @@ func (fs *S3Fs) initiateMultipartUpload(ctx context.Context, name, contentType s
func (fs *S3Fs) uploadPart(ctx context.Context, name, uploadID string, partNumber int32, data []byte) (*string, error) {
timeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute
if fs.config.UploadPartMaxTime > 0 {
timeout = time.Duration(fs.config.UploadPartMaxTime)
timeout = time.Duration(fs.config.UploadPartMaxTime) * time.Second
}
ctx, cancelFn := context.WithDeadline(ctx, time.Now().Add(timeout))
defer cancelFn()
@ -890,6 +890,13 @@ func (fs *S3Fs) abortMultipartUpload(name, uploadID string) error {
}
func (fs *S3Fs) singlePartUpload(ctx context.Context, name, contentType string, data []byte) error {
timeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute
if fs.config.UploadPartMaxTime > 0 {
timeout = time.Duration(fs.config.UploadPartMaxTime) * time.Second
}
ctx, cancelFn := context.WithDeadline(ctx, time.Now().Add(timeout))
defer cancelFn()
contentLength := int64(len(data))
_, err := fs.svc.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(fs.config.Bucket),
@ -989,7 +996,7 @@ func (fs *S3Fs) handleUpload(ctx context.Context, reader io.Reader, name, conten
errOnce.Do(func() {
finalizeFailedUpload(err)
})
return err
break
}
guard <- struct{}{}
if hasError.Load() {