vfs: log progress after each page iteration

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2024-02-18 10:12:53 +01:00
parent db0a467d33
commit d413775060
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF
3 changed files with 46 additions and 31 deletions

View file

@ -543,11 +543,9 @@ func (fs *AzureBlobFs) GetDirSize(dirname string) (int, int64, error) {
}
numFiles++
size += blobSize
if numFiles%1000 == 0 {
fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
}
}
}
fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
}
metric.AZListObjectsCompleted(nil)
@ -616,6 +614,9 @@ func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
isDir = checkDirectoryMarkers(contentType, blobItem.Metadata)
blobSize = util.GetIntFromPointer(blobItem.Properties.ContentLength)
lastModified = util.GetTimeFromPointer(blobItem.Properties.LastModified)
if val := getAzureLastModified(blobItem.Metadata); val > 0 {
lastModified = util.GetTimeFromMsecSinceEpoch(val)
}
}
err := walkFn(name, NewFileInfo(name, isDir, blobSize, lastModified, false), nil)
if err != nil {

View file

@ -476,21 +476,20 @@ func (fs *GCSFs) GetDirSize(dirname string) (int, int64, error) {
if err != nil {
return numFiles, size, err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
defer cancelFn()
bkt := fs.svc.Bucket(fs.config.Bucket)
it := bkt.Objects(ctx, query)
pager := iterator.NewPager(it, defaultGCSPageSize, "")
iteratePage := func(nextPageToken string) (string, error) {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
bkt := fs.svc.Bucket(fs.config.Bucket)
it := bkt.Objects(ctx, query)
pager := iterator.NewPager(it, defaultGCSPageSize, nextPageToken)
for {
var objects []*storage.ObjectAttrs
pageToken, err := pager.NextPage(&objects)
if err != nil {
metric.GCSListObjectsCompleted(err)
return numFiles, size, err
return pageToken, err
}
for _, attrs := range objects {
if !attrs.Deleted.IsZero() {
continue
@ -501,12 +500,18 @@ func (fs *GCSFs) GetDirSize(dirname string) (int, int64, error) {
}
numFiles++
size += attrs.Size
if numFiles%1000 == 0 {
fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
}
}
return pageToken, nil
}
objects = nil
pageToken := ""
for {
pageToken, err = iteratePage(pageToken)
if err != nil {
metric.GCSListObjectsCompleted(err)
return numFiles, size, err
}
fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
if pageToken == "" {
break
}
@ -556,22 +561,20 @@ func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
return err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
defer cancelFn()
iteratePage := func(nextPageToken string) (string, error) {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
bkt := fs.svc.Bucket(fs.config.Bucket)
it := bkt.Objects(ctx, query)
pager := iterator.NewPager(it, defaultGCSPageSize, "")
bkt := fs.svc.Bucket(fs.config.Bucket)
it := bkt.Objects(ctx, query)
pager := iterator.NewPager(it, defaultGCSPageSize, nextPageToken)
for {
var objects []*storage.ObjectAttrs
pageToken, err := pager.NextPage(&objects)
if err != nil {
walkFn(root, nil, err) //nolint:errcheck
metric.GCSListObjectsCompleted(err)
return err
return pageToken, err
}
for _, attrs := range objects {
if !attrs.Deleted.IsZero() {
continue
@ -580,13 +583,26 @@ func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error {
if name == "" {
continue
}
err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil)
objectModTime := attrs.Updated
if val := getLastModified(attrs.Metadata); val > 0 {
objectModTime = util.GetTimeFromMsecSinceEpoch(val)
}
err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, objectModTime, false), nil)
if err != nil {
return err
return pageToken, err
}
}
objects = nil
return pageToken, nil
}
pageToken := ""
for {
pageToken, err = iteratePage(pageToken)
if err != nil {
metric.GCSListObjectsCompleted(err)
return err
}
if pageToken == "" {
break
}

View file

@ -528,10 +528,8 @@ func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
}
numFiles++
size += objectSize
if numFiles%1000 == 0 {
fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
}
}
fsLog(fs, logger.LevelDebug, "scan in progress for %q, files: %d, size: %d", dirname, numFiles, size)
}
metric.S3ListObjectsCompleted(nil)