add support for conditional resuming of uploads

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2023-10-22 16:09:30 +02:00
parent f1e52d99ba
commit e3c4ee0833
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF
23 changed files with 260 additions and 53 deletions

View file

@ -62,13 +62,14 @@ The configuration file contains the following sections:
- **"common"**, configuration parameters shared among all the supported protocols
- `idle_timeout`, integer. Time in minutes after which an idle client will be disconnected. 0 means disabled. Default: 15
- `upload_mode` integer. 0 means standard: the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode, if there is an upload error, the temporary file is deleted and so the requested upload path will not contain a partial file. 2 means atomic with resume support: same as atomic but if there is an upload error, the temporary file is renamed to the requested path and not deleted. This way, a client can reconnect and resume the upload. Ignored for cloud-based storage backends (uploads are always atomic and resume is not supported for these backends) and for SFTP backend if buffering is enabled. Default: 0
- `upload_mode` integer. 0 means standard: the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode, if there is an upload error, the temporary file is deleted and so the requested upload path will not contain a partial file. 2 means atomic with resume support: same as atomic but if there is an upload error, the temporary file is renamed to the requested path and not deleted. This way, a client can reconnect and resume the upload. Ignored for cloud-based storage backends (uploads are always atomic and upload resume is not supported, by default, for these backends) and for SFTP backend if buffering is enabled. Default: `0`
- `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions. See [Custom Actions](./custom-actions.md) for more details
- `execute_on`, list of strings. Valid values are `pre-download`, `download`, `first-download`, `pre-upload`, `upload`, `first-upload`, `pre-delete`, `delete`, `rename`, `mkdir`, `rmdir`, `ssh_cmd`, `copy`. Leave empty to disable actions.
- `execute_sync`, list of strings. Actions, defined in the `execute_on` list above, to be performed synchronously. The `pre-*` actions are always executed synchronously while the other ones are asynchronous. Executing an action synchronously means that SFTPGo will not return a result code to the client (which is waiting for it) until your hook have completed its execution. Leave empty to execute only the defined `pre-*` hook synchronously
- `hook`, string. Absolute path to the command to execute or HTTP URL to notify.
- `setstat_mode`, integer. 0 means "normal mode": requests for changing permissions, owner/group and access/modification times are executed. 1 means "ignore mode": requests for changing permissions, owner/group and access/modification times are silently ignored. 2 means "ignore mode if not supported": requests for changing permissions and owner/group are silently ignored for cloud filesystems and executed for local/SFTP filesystem. Requests for changing modification times are always executed for local/SFTP filesystems and are executed for cloud based filesystems if the target is a file and there is a metadata plugin available. A metadata plugin can be found [here](https://github.com/sftpgo/sftpgo-plugin-metadata).
- `rename_mode`, integer. By default (`0`), renaming of non-empty directories is not allowed for cloud storage providers (S3, GCS, Azure Blob). Set to `1` to enable recursive renames for these providers, they may be slow, there is no atomic rename API like for local filesystem, so SFTPGo will recursively list the directory contents and do a rename for each entry (partial renaming and incorrect disk quota updates are possible in error cases). Default `0`.
- `resume_max_size`, integer. defines the maximum size allowed, in bytes, to resume uploads on storage backends with immutable objects. By default, resuming uploads is not allowed for cloud storage providers (S3, GCS, Azure Blob) because SFTPGo must rewrite the entire file. Set to a value greater than 0 to allow resuming uploads of files smaller than or equal to the defined size. Please note that uploads for these backends are still atomic, the client must intentionally upload a portion of the target file and then resume uploading.. Default `0`.
- `temp_path`, string. Defines the path for temporary files such as those used for atomic uploads or file pipes. If you set this option you must make sure that the defined path exists, is accessible for writing by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise the renaming for atomic uploads will become a copy and therefore may take a long time. The temporary files are not namespaced. The default is generally fine. Leave empty for the default.
- `proxy_protocol`, integer. Support for [HAProxy PROXY protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt). If you are running SFTPGo behind a proxy server such as HAProxy, AWS ELB or NGINX, you can enable the proxy protocol. It provides a convenient way to safely transport connection information such as a client's address across multiple layers of NAT or TCP proxies to get the real client IP address instead of the proxy IP. Both protocol versions 1 and 2 are supported. If the proxy protocol is enabled in SFTPGo then you have to enable the protocol in your proxy configuration too. For example, for HAProxy, add `send-proxy` or `send-proxy-v2` to each server configuration line. The PROXY protocol is supported for SSH/SFTP and FTP/S. The following modes are supported:
- 0, disabled

6
go.mod
View file

@ -7,7 +7,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.8.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0
github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5
github.com/alexedwards/argon2id v0.0.0-20231016161201-27bf9713919b
github.com/alexedwards/argon2id v1.0.0
github.com/amoghe/go-crypt v0.0.0-20220222110647-20eada5f5964
github.com/aws/aws-sdk-go-v2 v1.21.2
github.com/aws/aws-sdk-go-v2/config v1.19.0
@ -18,7 +18,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.40.2
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.6
github.com/aws/aws-sdk-go-v2/service/sts v1.23.2
github.com/bmatcuk/doublestar/v4 v4.6.0
github.com/bmatcuk/doublestar/v4 v4.6.1
github.com/cockroachdb/cockroach-go/v2 v2.3.5
github.com/coreos/go-oidc/v3 v3.7.0
github.com/drakkan/webdav v0.0.0-20230227175313-32996838bcd8
@ -106,7 +106,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect

13
go.sum
View file

@ -67,8 +67,8 @@ github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5 h1:IEjq88XO4PuBDcv
github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5/go.mod h1:exZ0C/1emQJAw5tHOaUDyY1ycttqBAPcxuzf7QbY6ec=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/alexedwards/argon2id v0.0.0-20231016161201-27bf9713919b h1:11BUm+H/afKXfPQ7byMNdYxHlXmBP9g1Smo72gz3c4o=
github.com/alexedwards/argon2id v0.0.0-20231016161201-27bf9713919b/go.mod h1:mTeFRcTdnpzOlRjMoFYC/80HwVUreupyAiqPkCZQOXc=
github.com/alexedwards/argon2id v1.0.0 h1:wJzDx66hqWX7siL/SRUmgz3F8YMrd/nfX/xHHcQQP0w=
github.com/alexedwards/argon2id v1.0.0/go.mod h1:tYKkqIjzXvZdzPvADMWOEZ+l6+BD6CtBXMj5fnJppiw=
github.com/amoghe/go-crypt v0.0.0-20220222110647-20eada5f5964 h1:I9YN9WMo3SUh7p/4wKeNvD/IQla3U3SUa61U7ul+xM4=
github.com/amoghe/go-crypt v0.0.0-20220222110647-20eada5f5964/go.mod h1:eFiR01PwTcpbzXtdMces7zxg6utvFM5puiWHpWB8D/k=
github.com/aws/aws-sdk-go-v2 v1.21.2 h1:+LXZ0sgo8quN9UOKXXzAWRT3FWd4NxeXWOZom9pE7GA=
@ -115,8 +115,8 @@ github.com/aws/smithy-go v1.15.0 h1:PS/durmlzvAFpQHDs4wi4sNNP9ExsqZh6IlfdHXgKK8=
github.com/aws/smithy-go v1.15.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar/v4 v4.6.0 h1:HTuxyug8GyFbRkrffIpzNCSK4luc0TY3wzXvzIZhEXc=
github.com/bmatcuk/doublestar/v4 v4.6.0/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/bmatcuk/doublestar/v4 v4.6.1 h1:FH9SifrbvJhnlQpztAx++wlkk70QBf0iBWDwNy7PA4I=
github.com/bmatcuk/doublestar/v4 v4.6.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/boombuler/barcode v1.0.1 h1:NDBbPmhS+EqABEs5Kg3n/5ZNjy73Pz7SIV+KCeqyXcs=
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
@ -177,8 +177,8 @@ github.com/fclairamb/go-log v0.4.1 h1:rLtdSG9x2pK41AIAnE8WYpl05xBJfw1ZyYxZaXFcBs
github.com/fclairamb/go-log v0.4.1/go.mod h1:sw1KvnkZ4wKCYkvy4SL3qVZcJSWFP8Ure4pM3z+KNn4=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-acme/lego/v4 v4.14.2 h1:/D/jqRgLi8Cbk33sLGtu2pX2jEg3bGJWHyV8kFuUHGM=
github.com/go-acme/lego/v4 v4.14.2/go.mod h1:kBXxbeTg0x9AgaOYjPSwIeJy3Y33zTz+tMD16O4MO6c=
github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk=
@ -670,7 +670,6 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View file

@ -230,6 +230,7 @@ func Initialize(c Configuration, isShared int) error {
vfs.SetAllowSelfConnections(c.AllowSelfConnections)
vfs.SetRenameMode(c.RenameMode)
vfs.SetReadMetadataMode(c.Metadata.Read)
vfs.SetResumeMaxSize(c.ResumeMaxSize)
dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
transfersChecker = getTransfersChecker(isShared)
return nil
@ -523,6 +524,12 @@ type Configuration struct {
// renames for these providers, they may be slow, there is no atomic rename API like for local
// filesystem, so SFTPGo will recursively list the directory contents and do a rename for each entry
RenameMode int `json:"rename_mode" mapstructure:"rename_mode"`
// ResumeMaxSize defines the maximum size allowed, in bytes, to resume uploads on storage backends
// with immutable objects. By default, resuming uploads is not allowed for cloud storage providers
// (S3, GCS, Azure Blob) because SFTPGo must rewrite the entire file.
// Set to a value greater than 0 to allow resuming uploads of files smaller than or equal to the
// defined size.
ResumeMaxSize int64 `json:"resume_max_size" mapstructure:"resume_max_size"`
// TempPath defines the path for temporary files such as those used for atomic uploads or file pipes.
// If you set this option you must make sure that the defined path exists, is accessible for writing
// by the user running SFTPGo, and is on the same filesystem as the users home directories otherwise

View file

@ -343,14 +343,19 @@ func (c *BaseConnection) CheckParentDirs(virtualPath string) error {
}
// GetCreateChecks returns the checks for creating new files
func (c *BaseConnection) GetCreateChecks(virtualPath string, isNewFile bool) int {
func (c *BaseConnection) GetCreateChecks(virtualPath string, isNewFile bool, isResume bool) int {
result := 0
if !isNewFile {
return 0
if isResume {
result += vfs.CheckResume
}
return result
}
if !c.User.HasPerm(dataprovider.PermCreateDirs, path.Dir(virtualPath)) {
return vfs.CheckParentDir
result += vfs.CheckParentDir
return result
}
return 0
return result
}
// CreateDir creates a new directory at the specified fsPath

View file

@ -955,7 +955,7 @@ func getFileWriter(conn *BaseConnection, virtualPath string, expectedSize int64)
if err := checkWriterPermsAndQuota(conn, virtualPath, numFiles, expectedSize, truncatedSize); err != nil {
return nil, numFiles, truncatedSize, nil, err
}
f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1))
f, w, cancelFn, err := fs.Create(fsPath, 0, conn.GetCreateChecks(virtualPath, numFiles == 1, false))
if err != nil {
return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
}

View file

@ -203,6 +203,7 @@ func Init() {
},
SetstatMode: 0,
RenameMode: 0,
ResumeMaxSize: 0,
TempPath: "",
ProxyProtocol: 0,
ProxyAllowed: []string{},
@ -1993,6 +1994,7 @@ func setViperDefaults() {
viper.SetDefault("common.actions.hook", globalConf.Common.Actions.Hook)
viper.SetDefault("common.setstat_mode", globalConf.Common.SetstatMode)
viper.SetDefault("common.rename_mode", globalConf.Common.RenameMode)
viper.SetDefault("common.resume_max_size", globalConf.Common.ResumeMaxSize)
viper.SetDefault("common.temp_path", globalConf.Common.TempPath)
viper.SetDefault("common.proxy_protocol", globalConf.Common.ProxyProtocol)
viper.SetDefault("common.proxy_allowed", globalConf.Common.ProxyAllowed)

View file

@ -408,7 +408,7 @@ func (c *Connection) handleFTPUploadToNewFile(fs vfs.Fs, flags int, resolvedPath
c.Log(logger.LevelDebug, "upload for file %q denied by pre action: %v", requestPath, err)
return nil, ftpserver.ErrFileNameNotAllowed
}
file, w, cancelFn, err := fs.Create(filePath, flags, c.GetCreateChecks(requestPath, true))
file, w, cancelFn, err := fs.Create(filePath, flags, c.GetCreateChecks(requestPath, true, false))
if err != nil {
c.Log(logger.LevelError, "error creating file %q, flags %v: %+v", resolvedPath, flags, err)
return nil, c.GetFsError(fs, err)
@ -444,7 +444,7 @@ func (c *Connection) handleFTPUploadToExistingFile(fs vfs.Fs, flags int, resolve
isResume := flags&os.O_TRUNC == 0
// if there is a size limit remaining size cannot be 0 here, since quotaResult.HasSpace
// will return false in this case and we deny the upload before
maxWriteSize, err := c.GetMaxWriteSize(diskQuota, isResume, fileSize, fs.IsUploadResumeSupported())
maxWriteSize, err := c.GetMaxWriteSize(diskQuota, isResume, fileSize, vfs.IsUploadResumeSupported(fs, fileSize))
if err != nil {
c.Log(logger.LevelDebug, "unable to get max write size: %v", err)
return nil, err
@ -463,7 +463,7 @@ func (c *Connection) handleFTPUploadToExistingFile(fs vfs.Fs, flags int, resolve
}
}
file, w, cancelFn, err := fs.Create(filePath, flags, c.GetCreateChecks(requestPath, false))
file, w, cancelFn, err := fs.Create(filePath, flags, c.GetCreateChecks(requestPath, false, isResume))
if err != nil {
c.Log(logger.LevelError, "error opening existing file, flags: %v, source: %q, err: %+v", flags, filePath, err)
return nil, c.GetFsError(fs, err)

View file

@ -367,6 +367,12 @@ func (MockOsFs) IsUploadResumeSupported() bool {
return false
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (MockOsFs) IsConditionalUploadResumeSupported(_ int64) bool {
return false
}
// IsAtomicUploadSupported returns true if atomic upload is supported
func (fs MockOsFs) IsAtomicUploadSupported() bool {
return fs.isAtomicUploadSupported

View file

@ -201,7 +201,7 @@ func (c *Connection) handleUploadFile(fs vfs.Fs, resolvedPath, filePath, request
maxWriteSize, _ := c.GetMaxWriteSize(diskQuota, false, fileSize, fs.IsUploadResumeSupported())
file, w, cancelFn, err := fs.Create(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.GetCreateChecks(requestPath, isNewFile))
file, w, cancelFn, err := fs.Create(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.GetCreateChecks(requestPath, isNewFile, false))
if err != nil {
c.Log(logger.LevelError, "error opening existing file, source: %q, err: %+v", filePath, err)
return nil, c.GetFsError(fs, err)

View file

@ -407,7 +407,7 @@ func (c *Connection) handleSFTPUploadToNewFile(fs vfs.Fs, pflags sftp.FileOpenFl
}
osFlags := getOSOpenFlags(pflags)
file, w, cancelFn, err := fs.Create(filePath, osFlags, c.GetCreateChecks(requestPath, true))
file, w, cancelFn, err := fs.Create(filePath, osFlags, c.GetCreateChecks(requestPath, true, false))
if err != nil {
c.Log(logger.LevelError, "error creating file %q, os flags %d, pflags %+v: %+v", resolvedPath, osFlags, pflags, err)
return nil, c.GetFsError(fs, err)
@ -443,7 +443,7 @@ func (c *Connection) handleSFTPUploadToExistingFile(fs vfs.Fs, pflags sftp.FileO
// if there is a size limit the remaining size cannot be 0 here, since quotaResult.HasSpace
// will return false in this case and we deny the upload before.
// For Cloud FS GetMaxWriteSize will return unsupported operation
maxWriteSize, err := c.GetMaxWriteSize(diskQuota, isResume, fileSize, fs.IsUploadResumeSupported())
maxWriteSize, err := c.GetMaxWriteSize(diskQuota, isResume, fileSize, vfs.IsUploadResumeSupported(fs, fileSize))
if err != nil {
c.Log(logger.LevelDebug, "unable to get max write size for file %q is resume? %t: %v",
requestPath, isResume, err)
@ -464,7 +464,7 @@ func (c *Connection) handleSFTPUploadToExistingFile(fs vfs.Fs, pflags sftp.FileO
}
}
file, w, cancelFn, err := fs.Create(filePath, osFlags, c.GetCreateChecks(requestPath, false))
file, w, cancelFn, err := fs.Create(filePath, osFlags, c.GetCreateChecks(requestPath, false, isResume))
if err != nil {
c.Log(logger.LevelError, "error opening existing file, os flags %v, pflags: %+v, source: %q, err: %+v",
osFlags, pflags, filePath, err)
@ -476,22 +476,15 @@ func (c *Connection) handleSFTPUploadToExistingFile(fs vfs.Fs, pflags sftp.FileO
if isResume {
c.Log(logger.LevelDebug, "resuming upload requested, file path %q initial size: %d, has append flag %t",
filePath, fileSize, pflags.Append)
// enforce min write offset only if the client passed the APPEND flag
if pflags.Append {
// enforce min write offset only if the client passed the APPEND flag or the filesystem
// supports emulated resume
if pflags.Append || !fs.IsUploadResumeSupported() {
minWriteOffset = fileSize
}
initialSize = fileSize
} else {
if isTruncate && vfs.HasTruncateSupport(fs) {
vfolder, err := c.User.GetVirtualFolderForPath(path.Dir(requestPath))
if err == nil {
dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, 0, -fileSize, false) //nolint:errcheck
if vfolder.IsIncludedInUserQuota() {
dataprovider.UpdateUserQuota(&c.User, 0, -fileSize, false) //nolint:errcheck
}
} else {
dataprovider.UpdateUserQuota(&c.User, 0, -fileSize, false) //nolint:errcheck
}
c.updateQuotaAfterTruncate(requestPath, fileSize)
} else {
initialSize = fileSize
truncatedSize = fileSize
@ -562,6 +555,18 @@ func (c *Connection) getStatVFSFromQuotaResult(fs vfs.Fs, name string, quotaResu
}, nil
}
func (c *Connection) updateQuotaAfterTruncate(requestPath string, fileSize int64) {
vfolder, err := c.User.GetVirtualFolderForPath(path.Dir(requestPath))
if err == nil {
dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, 0, -fileSize, false) //nolint:errcheck
if vfolder.IsIncludedInUserQuota() {
dataprovider.UpdateUserQuota(&c.User, 0, -fileSize, false) //nolint:errcheck
}
} else {
dataprovider.UpdateUserQuota(&c.User, 0, -fileSize, false) //nolint:errcheck
}
}
func getOSOpenFlags(requestFlags sftp.FileOpenFlags) (flags int) {
var osFlags int
if requestFlags.Read && requestFlags.Write {

View file

@ -109,6 +109,12 @@ func (MockOsFs) IsUploadResumeSupported() bool {
return false
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (MockOsFs) IsConditionalUploadResumeSupported(_ int64) bool {
return false
}
// IsAtomicUploadSupported returns true if atomic upload is supported
func (fs MockOsFs) IsAtomicUploadSupported() bool {
return fs.isAtomicUploadSupported

View file

@ -245,7 +245,7 @@ func (c *scpCommand) handleUploadFile(fs vfs.Fs, resolvedPath, filePath string,
maxWriteSize, _ := c.connection.GetMaxWriteSize(diskQuota, false, fileSize, fs.IsUploadResumeSupported())
file, w, cancelFn, err := fs.Create(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.connection.GetCreateChecks(requestPath, isNewFile))
file, w, cancelFn, err := fs.Create(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.connection.GetCreateChecks(requestPath, isNewFile, false))
if err != nil {
c.connection.Log(logger.LevelError, "error creating file %q: %v", resolvedPath, err)
c.sendErrorMessage(fs, err)

View file

@ -264,6 +264,23 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, *PipeWriter,
metric.AZTransferCompleted(r.GetReadedBytes(), 0, err)
}()
if checks&CheckResume != 0 {
readCh := make(chan error, 1)
go func() {
err = fs.downloadToWriter(name, p)
readCh <- err
}()
err = <-readCh
if err != nil {
cancelFn()
p.Close()
fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
return nil, nil, nil, err
}
}
return nil, p, cancelFn, nil
}
@ -458,6 +475,12 @@ func (*AzureBlobFs) IsUploadResumeSupported() bool {
return false
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (*AzureBlobFs) IsConditionalUploadResumeSupported(size int64) bool {
return size <= resumeMaxSize
}
// IsAtomicUploadSupported returns true if atomic upload is supported.
// Azure Blob uploads are already atomic, we don't need to upload to a temporary
// file
@ -965,7 +988,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *b
fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err)
return err
}
if readMetadata > 0 {
if readMetadata > 0 && pipeReader != nil {
pipeReader.setMetadataFromPointerVal(props.Metadata)
}
contentLength := util.GetIntFromPointer(props.ContentLength)
@ -1172,6 +1195,19 @@ func (fs *AzureBlobFs) getCopyOptions() *blob.StartCopyFromURLOptions {
return copyOptions
}
func (fs *AzureBlobFs) downloadToWriter(name string, w *PipeWriter) error {
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
defer cancelFn()
blockBlob := fs.containerClient.NewBlockBlobClient(name)
err := fs.handleMultipartDownload(ctx, blockBlob, 0, w, nil)
fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
name, w.GetWrittenBytes(), err)
metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err)
return err
}
func (fs *AzureBlobFs) getStorageID() string {
if fs.config.Endpoint != "" {
if !strings.HasSuffix(fs.config.Endpoint, "/") {

View file

@ -243,6 +243,12 @@ func (*CryptFs) IsUploadResumeSupported() bool {
return false
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (*CryptFs) IsConditionalUploadResumeSupported(_ int64) bool {
return false
}
// GetMimeType returns the content type
func (fs *CryptFs) GetMimeType(name string) (string, error) {
f, key, err := fs.getFileAndEncryptionKey(name)

View file

@ -178,7 +178,6 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, *PipeWriter, func(
if err != nil {
return nil, nil, nil, err
}
p := NewPipeWriter(w)
bkt := fs.svc.Bucket(fs.config.Bucket)
obj := bkt.Object(name)
if flag == -1 {
@ -193,6 +192,7 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, *PipeWriter, func(
fsLog(fs, logger.LevelWarn, "unable to set precondition for %q, stat err: %v", name, statErr)
}
}
p := NewPipeWriter(w)
ctx, cancelFn := context.WithCancel(context.Background())
objectWriter := obj.NewWriter(ctx)
@ -208,15 +208,8 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, *PipeWriter, func(
} else {
contentType = mime.TypeByExtension(path.Ext(name))
}
if contentType != "" {
objectWriter.ObjectAttrs.ContentType = contentType
}
if fs.config.StorageClass != "" {
objectWriter.ObjectAttrs.StorageClass = fs.config.StorageClass
}
if fs.config.ACL != "" {
objectWriter.PredefinedACL = fs.config.ACL
}
fs.setWriterAttrs(objectWriter, contentType)
go func() {
defer cancelFn()
@ -231,6 +224,24 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, *PipeWriter, func(
name, fs.config.ACL, n, err)
metric.GCSTransferCompleted(n, 0, err)
}()
if checks&CheckResume != 0 {
readCh := make(chan error, 1)
go func() {
err = fs.downloadToWriter(name, p)
readCh <- err
}()
err = <-readCh
if err != nil {
cancelFn()
p.Close()
fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
return nil, nil, nil, err
}
}
return nil, p, cancelFn, nil
}
@ -429,6 +440,12 @@ func (*GCSFs) IsUploadResumeSupported() bool {
return false
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (*GCSFs) IsConditionalUploadResumeSupported(size int64) bool {
return size <= resumeMaxSize
}
// IsAtomicUploadSupported returns true if atomic upload is supported.
// S3 uploads are already atomic, we don't need to upload to a temporary
// file
@ -748,6 +765,37 @@ func (fs *GCSFs) getObjectStat(name string) (os.FileInfo, error) {
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, attrs.Size, attrs.Updated, false))
}
func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, contentType string) {
if contentType != "" {
objectWriter.ObjectAttrs.ContentType = contentType
}
if fs.config.StorageClass != "" {
objectWriter.ObjectAttrs.StorageClass = fs.config.StorageClass
}
if fs.config.ACL != "" {
objectWriter.PredefinedACL = fs.config.ACL
}
}
func (fs *GCSFs) downloadToWriter(name string, w *PipeWriter) error {
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
defer cancelFn()
bkt := fs.svc.Bucket(fs.config.Bucket)
obj := bkt.Object(name)
objectReader, err := obj.NewRangeReader(ctx, 0, -1)
if err != nil {
fsLog(fs, logger.LevelDebug, "unable to start download before resuming upload, path %q, err: %v", name, err)
return err
}
n, err := io.Copy(w, objectReader)
fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
name, n, err)
metric.GCSTransferCompleted(n, 1, err)
return err
}
func (fs *GCSFs) copyFileInternal(source, target string) error {
src := fs.svc.Bucket(fs.config.Bucket).Object(source)
dst := fs.svc.Bucket(fs.config.Bucket).Object(target)

View file

@ -501,6 +501,12 @@ func (*HTTPFs) IsUploadResumeSupported() bool {
return false
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (*HTTPFs) IsConditionalUploadResumeSupported(_ int64) bool {
return false
}
// IsAtomicUploadSupported returns true if atomic upload is supported.
func (*HTTPFs) IsAtomicUploadSupported() bool {
return false

View file

@ -279,6 +279,12 @@ func (*OsFs) IsUploadResumeSupported() bool {
return true
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (*OsFs) IsConditionalUploadResumeSupported(_ int64) bool {
return true
}
// IsAtomicUploadSupported returns true if atomic upload is supported
func (*OsFs) IsAtomicUploadSupported() bool {
return true

View file

@ -283,10 +283,28 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, *PipeWriter, func()
})
r.CloseWithError(err) //nolint:errcheck
p.Done(err)
fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %v, err: %+v",
fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
name, fs.config.ACL, r.GetReadedBytes(), err)
metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
}()
if checks&CheckResume != 0 {
readCh := make(chan error, 1)
go func() {
err = fs.downloadToWriter(name, p)
readCh <- err
}()
err = <-readCh
if err != nil {
cancelFn()
p.Close()
fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
return nil, nil, nil, err
}
}
return nil, p, cancelFn, nil
}
@ -460,6 +478,12 @@ func (*S3Fs) IsUploadResumeSupported() bool {
return false
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (*S3Fs) IsConditionalUploadResumeSupported(size int64) bool {
return size <= resumeMaxSize
}
// IsAtomicUploadSupported returns true if atomic upload is supported.
// S3 uploads are already atomic, we don't need to upload to a temporary
// file
@ -1026,6 +1050,31 @@ func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
return nil, ErrStorageSizeUnavailable
}
func (fs *S3Fs) downloadToWriter(name string, w *PipeWriter) error {
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
defer cancelFn()
downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
d.Concurrency = fs.config.DownloadConcurrency
d.PartSize = fs.config.DownloadPartSize
if fs.config.DownloadPartMaxTime > 0 {
d.ClientOptions = append(d.ClientOptions, func(o *s3.Options) {
o.HTTPClient = getAWSHTTPClient(fs.config.DownloadPartMaxTime, 100*time.Millisecond)
})
}
})
n, err := downloader.Download(ctx, w, &s3.GetObjectInput{
Bucket: aws.String(fs.config.Bucket),
Key: aws.String(name),
})
fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
name, n, err)
metric.S3TransferCompleted(n, 1, err)
return err
}
func (fs *S3Fs) getStorageID() string {
if fs.config.Endpoint != "" {
if !strings.HasSuffix(fs.config.Endpoint, "/") {

View file

@ -540,6 +540,12 @@ func (fs *SFTPFs) IsUploadResumeSupported() bool {
return fs.config.BufferSize == 0
}
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
// for the specified size
func (fs *SFTPFs) IsConditionalUploadResumeSupported(_ int64) bool {
return fs.IsUploadResumeSupported()
}
// IsAtomicUploadSupported returns true if atomic upload is supported.
func (fs *SFTPFs) IsAtomicUploadSupported() bool {
return fs.config.BufferSize == 0

View file

@ -40,15 +40,17 @@ import (
)
const (
dirMimeType = "inode/directory"
s3fsName = "S3Fs"
gcsfsName = "GCSFs"
azBlobFsName = "AzureBlobFs"
dirMimeType = "inode/directory"
s3fsName = "S3Fs"
gcsfsName = "GCSFs"
azBlobFsName = "AzureBlobFs"
preResumeTimeout = 90 * time.Second
)
// Additional checks for files
const (
CheckParentDir = 1
CheckResume = 2
)
var (
@ -62,6 +64,7 @@ var (
allowSelfConnections int
renameMode int
readMetadata int
resumeMaxSize int64
)
// SetAllowSelfConnections sets the desired behaviour for self connections
@ -94,6 +97,12 @@ func SetReadMetadataMode(val int) {
readMetadata = val
}
// SetResumeMaxSize sets the max size allowed for resuming uploads for backends
// with immutable objects
func SetResumeMaxSize(val int64) {
resumeMaxSize = val
}
// Fs defines the interface for filesystem backends
type Fs interface {
Name() string
@ -113,6 +122,7 @@ type Fs interface {
ReadDir(dirname string) ([]os.FileInfo, error)
Readlink(name string) (string, error)
IsUploadResumeSupported() bool
IsConditionalUploadResumeSupported(size int64) bool
IsAtomicUploadSupported() bool
CheckRootPath(username string, uid int, gid int) bool
ResolvePath(virtualPath string) (string, error)
@ -893,6 +903,14 @@ func SetPathPermissions(fs Fs, path string, uid int, gid int) {
}
}
// IsUploadResumeSupported returns true if resuming uploads is supported
func IsUploadResumeSupported(fs Fs, size int64) bool {
if fs.IsUploadResumeSupported() {
return true
}
return fs.IsConditionalUploadResumeSupported(size)
}
func updateFileInfoModTime(storageID, objectPath string, info *FileInfo) (*FileInfo, error) {
if !plugin.Handler.HasMetadater() {
return info, nil

View file

@ -215,7 +215,7 @@ func (c *Connection) handleUploadToNewFile(fs vfs.Fs, resolvedPath, filePath, re
c.Log(logger.LevelDebug, "upload for file %q denied by pre action: %v", requestPath, err)
return nil, c.GetPermissionDeniedError()
}
file, w, cancelFn, err := fs.Create(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.GetCreateChecks(requestPath, true))
file, w, cancelFn, err := fs.Create(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.GetCreateChecks(requestPath, true, false))
if err != nil {
c.Log(logger.LevelError, "error creating file %q: %+v", resolvedPath, err)
return nil, c.GetFsError(fs, err)
@ -262,7 +262,7 @@ func (c *Connection) handleUploadToExistingFile(fs vfs.Fs, resolvedPath, filePat
}
}
file, w, cancelFn, err := fs.Create(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.GetCreateChecks(requestPath, false))
file, w, cancelFn, err := fs.Create(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.GetCreateChecks(requestPath, false, false))
if err != nil {
c.Log(logger.LevelError, "error creating file %q: %+v", resolvedPath, err)
return nil, c.GetFsError(fs, err)

View file

@ -9,6 +9,7 @@
},
"setstat_mode": 0,
"rename_mode": 0,
"resume_max_size": 0,
"temp_path": "",
"proxy_protocol": 0,
"proxy_allowed": [],