add object metadata to notification events

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2023-08-12 18:51:47 +02:00
parent 8ab4a9aa70
commit f03fdd1155
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF
32 changed files with 276 additions and 105 deletions

View file

@ -51,6 +51,7 @@ If the `hook` defines a path to an external program, then this program can read
- `SFTPGO_ACTION_OPEN_FLAGS`, integer. File open flags, can be non-zero for `pre-upload` action. If `SFTPGO_ACTION_FILE_SIZE` is greater than zero and `SFTPGO_ACTION_OPEN_FLAGS&512 == 0` the target file will not be truncated
- `SFTPGO_ACTION_ROLE`, string. Role of the user who executed the action
- `SFTPGO_ACTION_TIMESTAMP`, int64. Event timestamp as nanoseconds since epoch
- `SFTPGO_ACTION_METADATA`, string. Object metadata serialized as JSON. Omitted if there is no metadata
Global environment variables are cleared, for security reasons, when the script is called. You can set additional environment variables in the "command" configuration section.
The program must finish within 30 seconds.
@ -76,6 +77,7 @@ If the `hook` defines an HTTP URL then this URL will be invoked as HTTP POST. Th
- `open_flags`, integer. File open flags, can be non-zero for `pre-upload` action. If `file_size` is greater than zero and `file_size&512 == 0` the target file will not be truncated
- `role`, string. Included if the user who executed the action has a role
- `timestamp`, int64. Event timestamp as nanoseconds since epoch
- `metadata`, struct. Object metadata. Both the keys and the values are string. Omitted if there is no metadata
The HTTP hook will use the global configuration for HTTP clients and will respect the retry, TLS and headers configurations. See the HTTP Clients (`http`) section of the [config reference](./full-configuration.md).
@ -116,7 +118,7 @@ The program must finish within 15 seconds.
If the `hook` defines an HTTP URL then this URL will be invoked as HTTP POST. The action, username, ip, object_type and object_name and timestamp and role are added to the query string, for example `<hook>?action=update&username=admin&ip=127.0.0.1&object_type=user&object_name=user1&timestamp=1633860803249`, and the full object is sent serialized as JSON inside the POST body with sensitive fields removed. The role is added only if not empty.
The HTTP hook will use the global configuration for HTTP clients and will respect the retry, TLS and headers configurations. See the HTTP Clients (`http`) section of the [config reference](./full-configuration.md).
The HTTP hook will use the global configuration for HTTP clients and will respect the retry, TLS and headers configurations. See the HTTP Clients (`http`) section of the [config reference](./full-configuration.md).
The structure for SFTPGo objects can be found within the [OpenAPI schema](../openapi/openapi.yaml).

View file

@ -48,8 +48,11 @@ The following placeholders are supported:
- `{{Timestamp}}`. Event timestamp as nanoseconds since epoch.
- `{{Email}}`. For filesystem events, this is the email associated with the user performing the action. For the provider events, this is the email associated with the affected user or admin. Blank in all other cases.
- `{{ObjectData}}`. Provider object data serialized as JSON with sensitive fields removed.
- `{{ObjectDataString}}`. Provider object data as JSON escaped string with sensitive fields removed.
- `{{RetentionReports}}`. Data retention reports as zip compressed CSV files. Supported as email attachment, file path for multipart HTTP request and as single parameter for HTTP requests body. Data retention reports contain details on the number of files deleted and the total size deleted for each folder.
- `{{IDPField<fieldname>}}`. Identity Provider custom fields containing a string.
- `{{Metadata}}`. Cloud storage metadata for the downloaded file serialized as JSON.
- `{{MetadataString}}`. Cloud storage metadata for the downloaded file as JSON escaped string.
Event rules are based on the premise that an event occours. To each rule you can associate one or more actions.
The following trigger events are supported:

View file

@ -87,6 +87,8 @@ The configuration file contains the following sections:
- `allowlist_status`, integer. Set to `1` to enable the allow list. The allow list can be populated using the WebAdmin or the REST API. If enabled, only the listed IPs/networks can access the configured services, all other client connections will be dropped before they even try to authenticate. Ensure to populate your allow list before enabling this setting. In multi-nodes setups, the list entries propagation between nodes may take some minutes. Default: `0`.
- `allow_self_connections`, integer. Allow users on this instance to use other users/virtual folders on this instance as storage backend. Enable this setting if you know what you are doing. Set to `1` to enable. Default: `0`.
- `umask`, string. Set the file mode creation mask, for example `002`. Leave blank to use the system umask. Supported on *NIX platforms. Default: blank.
- `metadata`, struct containing the configuration for managing the Cloud Storage backends metadata.
- `read`, integer. Set to `1` to read metadata before downloading files from Cloud Storage backends and making them available in notification events. Default: `0`.
- `defender`, struct containing the defender configuration. See [Defender](./defender.md) for more details.
- `enabled`, boolean. Default `false`.
- `driver`, string. Supported drivers are `memory` and `provider`. The `provider` driver will use the configured data provider to store defender events and it is supported for `MySQL`, `PostgreSQL` and `CockroachDB` data providers. Using the `provider` driver you can share the defender events among multiple SFTPGO instances. For a single instance the `memory` driver will be much faster. Default: `memory`.

10
go.mod
View file

@ -16,14 +16,14 @@ require (
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.77
github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.15.2
github.com/aws/aws-sdk-go-v2/service/s3 v1.38.2
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.20.2
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.0
github.com/aws/aws-sdk-go-v2/service/sts v1.21.2
github.com/bmatcuk/doublestar/v4 v4.6.0
github.com/cockroachdb/cockroach-go/v2 v2.3.5
github.com/coreos/go-oidc/v3 v3.6.0
github.com/drakkan/webdav v0.0.0-20230227175313-32996838bcd8
github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001
github.com/fclairamb/ftpserverlib v0.21.1-0.20230719102702-76e3b6785cda
github.com/fclairamb/ftpserverlib v0.22.0
github.com/fclairamb/go-log v0.4.1
github.com/go-acme/lego/v4 v4.13.3
github.com/go-chi/chi/v5 v5.0.10
@ -46,14 +46,14 @@ require (
github.com/minio/sio v0.3.1
github.com/otiai10/copy v1.12.0
github.com/pires/go-proxyproto v0.7.0
github.com/pkg/sftp v1.13.6-0.20230213180117-971c283182b6
github.com/pkg/sftp v1.13.6
github.com/pquerna/otp v1.4.0
github.com/prometheus/client_golang v1.16.0
github.com/robfig/cron/v3 v3.0.1
github.com/rs/cors v1.9.0
github.com/rs/xid v1.5.0
github.com/rs/zerolog v1.30.0
github.com/sftpgo/sdk v0.1.6-0.20230807170339-3178878ce745
github.com/sftpgo/sdk v0.1.6-0.20230812162553-b7d33eb36639
github.com/shirou/gopsutil/v3 v3.23.7
github.com/spf13/afero v1.9.5
github.com/spf13/cobra v1.7.0
@ -113,7 +113,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/s2a-go v0.1.5 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect

20
go.sum
View file

@ -104,8 +104,8 @@ github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.15.2 h1:Sn0OY6ZvpkzD
github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.15.2/go.mod h1:Q+KOs5c1mTtEvycj41l1xy9v7QxojZ/c0NhABlYJthY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.38.2 h1:v346f1h8sUBKXnEbrv43L37MTBlFHyKXQPIZHNAaghA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.38.2/go.mod h1:cwCATiyNrXK9P2FsWdZ89g9mpsYv2rhk0UA/KByl5fY=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.20.2 h1:vlkGQk8JiUo1KmZF4wsZP3qclbyQHSUvLMf8aPOS79g=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.20.2/go.mod h1:Z6Oq1mXqvgwmUxvMrV/jMkQhwm06A9XO015dzGnS8TM=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.0 h1:z9faFYBvadv9HdY+oFBgxqCnew9TK+jp9ccxktB5fl4=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.0/go.mod h1:Z6Oq1mXqvgwmUxvMrV/jMkQhwm06A9XO015dzGnS8TM=
github.com/aws/aws-sdk-go-v2/service/sso v1.13.2 h1:A2RlEMo4SJSwbNoUUgkxTAEMduAy/8wG3eB2b2lP4gY=
github.com/aws/aws-sdk-go-v2/service/sso v1.13.2/go.mod h1:ju+nNXUunfIFamXUIZQiICjnO/TPlOmWcYhZcSy7xaE=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.2 h1:OJELEgyaT2kmaBGZ+myyZbTTLobfe3ox3FSh5eYK9Qs=
@ -174,8 +174,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/fclairamb/ftpserverlib v0.21.1-0.20230719102702-76e3b6785cda h1:yoDuXaChUZkcC+lBTbQcFJq78j9lt+7WyAtuop2UVE8=
github.com/fclairamb/ftpserverlib v0.21.1-0.20230719102702-76e3b6785cda/go.mod h1:y3T8eZqo8jAYQ0/wnDVRrcGeTTHV8S7ex6mAf8vQ8A0=
github.com/fclairamb/ftpserverlib v0.22.0 h1:PqzyD6YxS5sdb4fAdXUFSODTo8DelsVAOh3LgeR4VXs=
github.com/fclairamb/ftpserverlib v0.22.0/go.mod h1:dI9/yw/KfJ0g4wmRK8ZukUfqakLr6ZTf9VDydKoLy90=
github.com/fclairamb/go-log v0.4.1 h1:rLtdSG9x2pK41AIAnE8WYpl05xBJfw1ZyYxZaXFcBsM=
github.com/fclairamb/go-log v0.4.1/go.mod h1:sw1KvnkZ4wKCYkvy4SL3qVZcJSWFP8Ure4pM3z+KNn4=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
@ -278,8 +278,8 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc=
github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
github.com/google/s2a-go v0.1.5 h1:8IYp3w9nysqv3JH+NJgXJzGbDHzLOTj43BmSkp+O7qg=
github.com/google/s2a-go v0.1.5/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@ -407,8 +407,8 @@ github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pkg/sftp v1.13.6-0.20230213180117-971c283182b6 h1:5TvW1dv00Y13njmQ1AWkxSWtPkwE7ZEF6yDuv9q+Als=
github.com/pkg/sftp v1.13.6-0.20230213180117-971c283182b6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk=
github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo=
github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
@ -443,8 +443,8 @@ github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4 h1:PT+ElG/UUFMfqy5HrxJ
github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4/go.mod h1:MnkX001NG75g3p8bhFycnyIjeQoOjGL6CEIsdE/nKSY=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/sftpgo/sdk v0.1.6-0.20230807170339-3178878ce745 h1:IRKfXQ0/P0ON9UzltTmgLKU0HWYSkuafARw3Pv3hDRU=
github.com/sftpgo/sdk v0.1.6-0.20230807170339-3178878ce745/go.mod h1:TjeoMWS0JEXt9RukJveTnaiHj4+MVLtUiDC+mY++Odk=
github.com/sftpgo/sdk v0.1.6-0.20230812162553-b7d33eb36639 h1:KIFQY//0+OslF42WM7Jw24dnkKoHQV0QsW2CWa7Ac2A=
github.com/sftpgo/sdk v0.1.6-0.20230812162553-b7d33eb36639/go.mod h1:TjeoMWS0JEXt9RukJveTnaiHj4+MVLtUiDC+mY++Odk=
github.com/shirou/gopsutil/v3 v3.23.7 h1:C+fHO8hfIppoJ1WdsVm1RoI0RwXoNdfTK7yWXV0wVj4=
github.com/shirou/gopsutil/v3 v3.23.7/go.mod h1:c4gnmoRC0hQuaLqvxnx1//VXQ0Ms/X9UnJF8pddY5z4=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=

View file

@ -92,7 +92,7 @@ func ExecutePreAction(conn *BaseConnection, operation, filePath, virtualPath str
return 0, nil
}
event = newActionNotification(&conn.User, operation, filePath, virtualPath, "", "", "",
conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, openFlags, conn.getNotificationStatus(nil), 0)
conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, openFlags, conn.getNotificationStatus(nil), 0, nil)
if hasNotifiersPlugin {
plugin.Handler.NotifyFsEvent(event)
}
@ -128,7 +128,7 @@ func ExecutePreAction(conn *BaseConnection, operation, filePath, virtualPath str
// ExecuteActionNotification executes the defined hook, if any, for the specified action
func ExecuteActionNotification(conn *BaseConnection, operation, filePath, virtualPath, target, virtualTarget, sshCmd string,
fileSize int64, err error, elapsed int64,
fileSize int64, err error, elapsed int64, metadata map[string]string,
) error {
hasNotifiersPlugin := plugin.Handler.HasNotifiers()
hasHook := util.Contains(Config.Actions.ExecuteOn, operation)
@ -137,7 +137,7 @@ func ExecuteActionNotification(conn *BaseConnection, operation, filePath, virtua
return nil
}
notification := newActionNotification(&conn.User, operation, filePath, virtualPath, target, virtualTarget, sshCmd,
conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, 0, conn.getNotificationStatus(err), elapsed)
conn.protocol, conn.GetRemoteIP(), conn.ID, fileSize, 0, conn.getNotificationStatus(err), elapsed, metadata)
if hasNotifiersPlugin {
plugin.Handler.NotifyFsEvent(notification)
}
@ -160,6 +160,7 @@ func ExecuteActionNotification(conn *BaseConnection, operation, filePath, virtua
Timestamp: notification.Timestamp,
Email: conn.User.Email,
Object: nil,
Metadata: metadata,
}
if err != nil {
params.AddError(fmt.Errorf("%q failed: %w", params.Event, err))
@ -194,6 +195,7 @@ func newActionNotification(
operation, filePath, virtualPath, target, virtualTarget, sshCmd, protocol, ip, sessionID string,
fileSize int64,
openFlags, status int, elapsed int64,
metadata map[string]string,
) *notifier.FsEvent {
var bucket, endpoint string
@ -236,6 +238,7 @@ func newActionNotification(
Role: user.Role,
Timestamp: time.Now().UnixNano(),
Elapsed: elapsed,
Metadata: metadata,
}
}
@ -316,7 +319,7 @@ func (h *defaultActionHandler) handleCommand(event *notifier.FsEvent) error {
}
func notificationAsEnvVars(event *notifier.FsEvent) []string {
return []string{
result := []string{
fmt.Sprintf("SFTPGO_ACTION=%s", event.Action),
fmt.Sprintf("SFTPGO_ACTION_USERNAME=%s", event.Username),
fmt.Sprintf("SFTPGO_ACTION_PATH=%s", event.Path),
@ -337,4 +340,11 @@ func notificationAsEnvVars(event *notifier.FsEvent) []string {
fmt.Sprintf("SFTPGO_ACTION_TIMESTAMP=%d", event.Timestamp),
fmt.Sprintf("SFTPGO_ACTION_ROLE=%s", event.Role),
}
if len(event.Metadata) > 0 {
data, err := json.Marshal(event.Metadata)
if err == nil {
result = append(result, fmt.Sprintf("SFTPGO_ACTION_METADATA=%s", string(data)))
}
}
return result
}

View file

@ -71,7 +71,7 @@ func TestNewActionNotification(t *testing.T) {
c := NewBaseConnection("id", ProtocolSSH, "", "", user)
sessionID := xid.New().String()
a := newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSFTP, "", sessionID,
123, 0, c.getNotificationStatus(errors.New("fake error")), 0)
123, 0, c.getNotificationStatus(errors.New("fake error")), 0, nil)
assert.Equal(t, user.Username, a.Username)
assert.Equal(t, 0, len(a.Bucket))
assert.Equal(t, 0, len(a.Endpoint))
@ -79,38 +79,38 @@ func TestNewActionNotification(t *testing.T) {
user.FsConfig.Provider = sdk.S3FilesystemProvider
a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSSH, "", sessionID,
123, 0, c.getNotificationStatus(nil), 0)
123, 0, c.getNotificationStatus(nil), 0, nil)
assert.Equal(t, "s3bucket", a.Bucket)
assert.Equal(t, "endpoint", a.Endpoint)
assert.Equal(t, 1, a.Status)
user.FsConfig.Provider = sdk.GCSFilesystemProvider
a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSCP, "", sessionID,
123, 0, c.getNotificationStatus(ErrQuotaExceeded), 0)
123, 0, c.getNotificationStatus(ErrQuotaExceeded), 0, nil)
assert.Equal(t, "gcsbucket", a.Bucket)
assert.Equal(t, 0, len(a.Endpoint))
assert.Equal(t, 3, a.Status)
a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSCP, "", sessionID,
123, 0, c.getNotificationStatus(fmt.Errorf("wrapper quota error: %w", ErrQuotaExceeded)), 0)
123, 0, c.getNotificationStatus(fmt.Errorf("wrapper quota error: %w", ErrQuotaExceeded)), 0, nil)
assert.Equal(t, "gcsbucket", a.Bucket)
assert.Equal(t, 0, len(a.Endpoint))
assert.Equal(t, 3, a.Status)
user.FsConfig.Provider = sdk.HTTPFilesystemProvider
a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSSH, "", sessionID,
123, 0, c.getNotificationStatus(nil), 0)
123, 0, c.getNotificationStatus(nil), 0, nil)
assert.Equal(t, "httpendpoint", a.Endpoint)
assert.Equal(t, 1, a.Status)
user.FsConfig.Provider = sdk.AzureBlobFilesystemProvider
a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSCP, "", sessionID,
123, 0, c.getNotificationStatus(nil), 0)
123, 0, c.getNotificationStatus(nil), 0, nil)
assert.Equal(t, "azcontainer", a.Bucket)
assert.Equal(t, "azendpoint", a.Endpoint)
assert.Equal(t, 1, a.Status)
a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSCP, "", sessionID,
123, os.O_APPEND, c.getNotificationStatus(nil), 0)
123, os.O_APPEND, c.getNotificationStatus(nil), 0, nil)
assert.Equal(t, "azcontainer", a.Bucket)
assert.Equal(t, "azendpoint", a.Endpoint)
assert.Equal(t, 1, a.Status)
@ -118,7 +118,7 @@ func TestNewActionNotification(t *testing.T) {
user.FsConfig.Provider = sdk.SFTPFilesystemProvider
a = newActionNotification(&user, operationDownload, "path", "vpath", "target", "", "", ProtocolSFTP, "", sessionID,
123, 0, c.getNotificationStatus(nil), 0)
123, 0, c.getNotificationStatus(nil), 0, nil)
assert.Equal(t, "sftpendpoint", a.Endpoint)
}
@ -135,7 +135,7 @@ func TestActionHTTP(t *testing.T) {
},
}
a := newActionNotification(user, operationDownload, "path", "vpath", "target", "", "", ProtocolSFTP, "",
xid.New().String(), 123, 0, 1, 0)
xid.New().String(), 123, 0, 1, 0, nil)
status, err := actionHandler.Handle(a)
assert.NoError(t, err)
assert.Equal(t, 1, status)
@ -175,16 +175,16 @@ func TestActionCMD(t *testing.T) {
}
sessionID := shortuuid.New()
a := newActionNotification(user, operationDownload, "path", "vpath", "target", "", "", ProtocolSFTP, "", sessionID,
123, 0, 1, 0)
123, 0, 1, 0, map[string]string{"key": "value"})
status, err := actionHandler.Handle(a)
assert.NoError(t, err)
assert.Equal(t, 1, status)
c := NewBaseConnection("id", ProtocolSFTP, "", "", *user)
err = ExecuteActionNotification(c, OperationSSHCmd, "path", "vpath", "target", "vtarget", "sha1sum", 0, nil, 0)
err = ExecuteActionNotification(c, OperationSSHCmd, "path", "vpath", "target", "vtarget", "sha1sum", 0, nil, 0, nil)
assert.NoError(t, err)
err = ExecuteActionNotification(c, operationDownload, "path", "vpath", "", "", "", 0, nil, 0)
err = ExecuteActionNotification(c, operationDownload, "path", "vpath", "", "", "", 0, nil, 0, nil)
assert.NoError(t, err)
Config.Actions = actionsCopy
@ -208,7 +208,7 @@ func TestWrongActions(t *testing.T) {
}
a := newActionNotification(user, operationUpload, "", "", "", "", "", ProtocolSFTP, "", xid.New().String(),
123, 0, 1, 0)
123, 0, 1, 0, nil)
status, err := actionHandler.Handle(a)
assert.Error(t, err, "action with bad command must fail")
assert.Equal(t, 1, status)
@ -307,7 +307,7 @@ func TestUnconfiguredHook(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, status, 0)
err = ExecuteActionNotification(c, operationDownload, "", "", "", "", "", 0, nil, 0)
err = ExecuteActionNotification(c, operationDownload, "", "", "", "", "", 0, nil, 0, nil)
assert.NoError(t, err)
err = plugin.Initialize(nil, "debug")

View file

@ -229,6 +229,7 @@ func Initialize(c Configuration, isShared int) error {
dataprovider.SetTempPath(c.TempPath)
vfs.SetAllowSelfConnections(c.AllowSelfConnections)
vfs.SetRenameMode(c.RenameMode)
vfs.SetReadMetadataMode(c.Metadata.Read)
dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
transfersChecker = getTransfersChecker(isShared)
return nil
@ -487,6 +488,13 @@ func (t *ConnectionTransfer) getConnectionTransferAsString() string {
return result
}
// MetadataConfig defines how to handle metadata for cloud storage backends
type MetadataConfig struct {
// If not zero the metadata will be read before downloads and will be
// available in notifications
Read int `json:"read" mapstructure:"read"`
}
// Configuration defines configuration parameters common to all supported protocols
type Configuration struct {
// Maximum idle timeout as minutes. If a client is idle for a time that exceeds this setting it will be disconnected.
@ -572,7 +580,9 @@ type Configuration struct {
// Rate limiter configurations
RateLimitersConfig []RateLimiterConfig `json:"rate_limiters" mapstructure:"rate_limiters"`
// Umask for new uploads. Leave blank to use the system default.
Umask string `json:"umask" mapstructure:"umask"`
Umask string `json:"umask" mapstructure:"umask"`
// Metadata configuration
Metadata MetadataConfig `json:"metadata" mapstructure:"metadata"`
idleTimeoutAsDuration time.Duration
idleLoginTimeout time.Duration
defender Defender

View file

@ -381,7 +381,7 @@ func (c *BaseConnection) CreateDir(virtualPath string, checkFilePatterns bool) e
logger.CommandLog(mkdirLogSender, fsPath, "", c.User.Username, "", c.ID, c.protocol, -1, -1, "", "", "", -1,
c.localAddr, c.remoteAddr, elapsed)
ExecuteActionNotification(c, operationMkdir, fsPath, virtualPath, "", "", "", 0, nil, elapsed) //nolint:errcheck
ExecuteActionNotification(c, operationMkdir, fsPath, virtualPath, "", "", "", 0, nil, elapsed, nil) //nolint:errcheck
return nil
}
@ -436,7 +436,7 @@ func (c *BaseConnection) RemoveFile(fs vfs.Fs, fsPath, virtualPath string, info
dataprovider.UpdateUserQuota(&c.User, -1, -size, false) //nolint:errcheck
}
}
ExecuteActionNotification(c, operationDelete, fsPath, virtualPath, "", "", "", size, nil, elapsed) //nolint:errcheck
ExecuteActionNotification(c, operationDelete, fsPath, virtualPath, "", "", "", size, nil, elapsed, nil) //nolint:errcheck
return nil
}
@ -502,7 +502,7 @@ func (c *BaseConnection) RemoveDir(virtualPath string) error {
logger.CommandLog(rmdirLogSender, fsPath, "", c.User.Username, "", c.ID, c.protocol, -1, -1, "", "", "", -1,
c.localAddr, c.remoteAddr, elapsed)
ExecuteActionNotification(c, operationRmdir, fsPath, virtualPath, "", "", "", 0, nil, elapsed) //nolint:errcheck
ExecuteActionNotification(c, operationRmdir, fsPath, virtualPath, "", "", "", 0, nil, elapsed, nil) //nolint:errcheck
return nil
}
@ -785,7 +785,7 @@ func (c *BaseConnection) renameInternal(virtualSourcePath, virtualTargetPath str
logger.CommandLog(renameLogSender, fsSourcePath, fsTargetPath, c.User.Username, "", c.ID, c.protocol, -1, -1,
"", "", "", -1, c.localAddr, c.remoteAddr, elapsed)
ExecuteActionNotification(c, operationRename, fsSourcePath, virtualSourcePath, fsTargetPath, //nolint:errcheck
virtualTargetPath, "", 0, nil, elapsed)
virtualTargetPath, "", 0, nil, elapsed, nil)
return nil
}

View file

@ -53,9 +53,10 @@ import (
)
const (
ipBlockedEventName = "IP Blocked"
maxAttachmentsSize = int64(10 * 1024 * 1024)
objDataPlaceholder = "{{ObjectData}}"
ipBlockedEventName = "IP Blocked"
maxAttachmentsSize = int64(10 * 1024 * 1024)
objDataPlaceholder = "{{ObjectData}}"
objDataPlaceholderString = "{{ObjectDataString}}"
)
// Supported IDP login events
@ -554,6 +555,7 @@ type EventParams struct {
Timestamp int64
IDPCustomFields *map[string]string
Object plugin.Renderer
Metadata map[string]string
sender string
updateStatusFromError bool
errors []string
@ -587,7 +589,7 @@ func (p *EventParams) getACopy() *EventParams {
}
func (p *EventParams) addIDPCustomFields(customFields *map[string]any) {
if customFields == nil {
if customFields == nil || len(*customFields) == 0 {
return
}
@ -785,11 +787,14 @@ func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []s
} else {
replacements = append(replacements, "{{ErrorString}}", "")
}
replacements = append(replacements, objDataPlaceholder, "")
replacements = append(replacements, objDataPlaceholder, "{}")
replacements = append(replacements, objDataPlaceholderString, "")
if addObjectData {
data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
if err == nil {
replacements[len(replacements)-1] = p.getStringReplacement(string(data), jsonEscaped)
dataString := string(data)
replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
}
}
if p.IDPCustomFields != nil {
@ -797,6 +802,16 @@ func (p *EventParams) getStringReplacements(addObjectData, jsonEscaped bool) []s
replacements = append(replacements, fmt.Sprintf("{{IDPField%s}}", k), p.getStringReplacement(v, jsonEscaped))
}
}
replacements = append(replacements, "{{Metadata}}", "{}")
replacements = append(replacements, "{{MetadataString}}", "")
if len(p.Metadata) > 0 {
data, err := json.Marshal(p.Metadata)
if err == nil {
dataString := string(data)
replacements[len(replacements)-3] = p.getStringReplacement(dataString, false)
replacements[len(replacements)-1] = p.getStringReplacement(dataString, true)
}
}
return replacements
}
@ -857,7 +872,7 @@ func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualSo
logger.CommandLog(copyLogSender, fsSrcPath, fsDstPath, conn.User.Username, "", conn.ID, conn.protocol, -1, -1,
"", "", "", info.Size(), conn.localAddr, conn.remoteAddr, elapsed)
}
ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed) //nolint:errcheck
ExecuteActionNotification(conn, operation, fsSrcPath, virtualSourcePath, fsDstPath, virtualTargetPath, "", info.Size(), errTransfer, elapsed, nil) //nolint:errcheck
}
} else {
eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", targetPath, err)
@ -1227,7 +1242,7 @@ func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.
}
if part.Body != "" {
cType := h.Get("Content-Type")
if part.Body != objDataPlaceholder && strings.Contains(strings.ToLower(cType), "application/json") {
if strings.Contains(strings.ToLower(cType), "application/json") {
replacements := params.getStringReplacements(addObjectData, true)
jsonReplacer := strings.NewReplacer(replacements...)
_, err = partWriter.Write([]byte(replaceWithReplacer(part.Body, jsonReplacer)))
@ -1260,10 +1275,6 @@ func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.
return nil
}
func jsonEscapeRuleActionBody(c *dataprovider.EventActionHTTPConfig) bool {
return c.Body != objDataPlaceholder && c.HasJSONBody()
}
func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
cancel context.CancelFunc, user dataprovider.User, params *EventParams, addObjectData bool,
) (io.Reader, string, error) {
@ -1279,7 +1290,7 @@ func getHTTPRuleActionBody(c *dataprovider.EventActionHTTPConfig, replacer *stri
}
return bytes.NewBuffer(data), "", nil
}
if jsonEscapeRuleActionBody(c) {
if c.HasJSONBody() {
replacements := params.getStringReplacements(addObjectData, true)
jsonReplacer := strings.NewReplacer(replacements...)
return bytes.NewBufferString(replaceWithReplacer(c.Body, jsonReplacer)), "", nil
@ -1425,7 +1436,7 @@ func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *E
addObjectData := false
if params.Object != nil {
for _, k := range c.EnvVars {
if strings.Contains(k.Value, objDataPlaceholder) {
if strings.Contains(k.Value, objDataPlaceholder) || strings.Contains(k.Value, objDataPlaceholderString) {
addObjectData = true
break
}
@ -1474,7 +1485,7 @@ func getEmailAddressesWithReplacer(addrs []string, replacer *strings.Replacer) [
func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
addObjectData := false
if params.Object != nil {
if strings.Contains(c.Body, objDataPlaceholder) {
if strings.Contains(c.Body, objDataPlaceholder) || strings.Contains(c.Body, objDataPlaceholderString) {
addObjectData = true
}
}

View file

@ -817,7 +817,7 @@ func TestEventRuleActions(t *testing.T) {
HTTPConfig: dataprovider.EventActionHTTPConfig{
Endpoint: "http://foo\x7f.com/", // invalid URL
SkipTLSVerify: true,
Body: `"data": "{{ObjectData}}"`,
Body: `"data": "{{ObjectDataString}}"`,
Method: http.MethodPost,
QueryParameters: []dataprovider.KeyValue{
{
@ -2265,3 +2265,18 @@ func TestHTTPEndpointWithPlaceholders(t *testing.T) {
expected = c.Endpoint + "?p=" + url.QueryEscape(vPath) + "&u=" + url.QueryEscape(name)
assert.Equal(t, expected, u)
}
func TestMetadataReplacement(t *testing.T) {
params := &EventParams{
Metadata: map[string]string{
"key": "value",
},
}
replacements := params.getStringReplacements(false, false)
replacer := strings.NewReplacer(replacements...)
reader, _, err := getHTTPRuleActionBody(&dataprovider.EventActionHTTPConfig{Body: "{{Metadata}} {{MetadataString}}"}, replacer, nil, dataprovider.User{}, params, false)
require.NoError(t, err)
data, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, `{"key":"value"} {\"key\":\"value\"}`, string(data))
}

View file

@ -56,6 +56,7 @@ type BaseTransfer struct { //nolint:maligned
aTime time.Time
mTime time.Time
transferQuota dataprovider.TransferQuota
metadata map[string]string
sync.Mutex
errAbort error
ErrTransfer error
@ -206,6 +207,11 @@ func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
return ""
}
// SetMetadata sets the metadata for the file
func (t *BaseTransfer) SetMetadata(val map[string]string) {
t.metadata = val
}
// SetCancelFn sets the cancel function for the transfer
func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
t.cancelFn = cancelFn
@ -405,7 +411,7 @@ func (t *BaseTransfer) Close() error {
logger.TransferLog(downloadLogSender, t.fsPath, elapsed, t.BytesSent.Load(), t.Connection.User.Username,
t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "", //nolint:errcheck
t.BytesSent.Load(), t.ErrTransfer, elapsed)
t.BytesSent.Load(), t.ErrTransfer, elapsed, t.metadata)
} else {
statSize, deletedFiles, errStat := t.getUploadFileSize()
if errStat == nil {
@ -449,7 +455,7 @@ func (t *BaseTransfer) updateTransferTimestamps(uploadFileSize, elapsed int64) {
if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, true); err == nil {
t.Connection.uploadDone.Store(true)
ExecuteActionNotification(t.Connection, operationFirstUpload, t.fsPath, t.requestPath, "", //nolint:errcheck
"", "", uploadFileSize, t.ErrTransfer, elapsed)
"", "", uploadFileSize, t.ErrTransfer, elapsed, t.metadata)
}
}
return
@ -458,14 +464,14 @@ func (t *BaseTransfer) updateTransferTimestamps(uploadFileSize, elapsed int64) {
if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, false); err == nil {
t.Connection.downloadDone.Store(true)
ExecuteActionNotification(t.Connection, operationFirstDownload, t.fsPath, t.requestPath, "", //nolint:errcheck
"", "", t.BytesSent.Load(), t.ErrTransfer, elapsed)
"", "", t.BytesSent.Load(), t.ErrTransfer, elapsed, t.metadata)
}
}
}
func (t *BaseTransfer) executeUploadHook(numFiles int, fileSize, elapsed int64) (int, int64) {
err := ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "",
fileSize, t.ErrTransfer, elapsed)
fileSize, t.ErrTransfer, elapsed, t.metadata)
if err != nil {
if t.ErrTransfer == nil {
t.ErrTransfer = err

View file

@ -232,6 +232,7 @@ func TestTransferErrors(t *testing.T) {
assert.ErrorIs(t, err, sftp.ErrSSHFxPermissionDenied)
assert.Nil(t, transfer.cancelFn)
assert.Equal(t, testFile, transfer.GetFsPath())
transfer.SetMetadata(map[string]string{"key": "val"})
transfer.SetCancelFn(cancelFn)
errFake := errors.New("err fake")
transfer.BytesReceived.Store(9)

View file

@ -229,6 +229,10 @@ func Init() {
EntriesHardLimit: 150,
},
RateLimitersConfig: []common.RateLimiterConfig{defaultRateLimiter},
Umask: "",
Metadata: common.MetadataConfig{
Read: 0,
},
},
ACME: acme.Configuration{
Email: "",
@ -2006,6 +2010,8 @@ func setViperDefaults() {
viper.SetDefault("common.defender.observation_time", globalConf.Common.DefenderConfig.ObservationTime)
viper.SetDefault("common.defender.entries_soft_limit", globalConf.Common.DefenderConfig.EntriesSoftLimit)
viper.SetDefault("common.defender.entries_hard_limit", globalConf.Common.DefenderConfig.EntriesHardLimit)
viper.SetDefault("common.umask", globalConf.Common.Umask)
viper.SetDefault("common.metadata.read", globalConf.Common.Metadata.Read)
viper.SetDefault("acme.email", globalConf.ACME.Email)
viper.SetDefault("acme.key_type", globalConf.ACME.KeyType)
viper.SetDefault("acme.certs_path", globalConf.ACME.CertsPath)

View file

@ -853,7 +853,7 @@ func TestTransferErrors(t *testing.T) {
assert.NoError(t, err)
baseTransfer = common.NewBaseTransfer(nil, connection.BaseConnection, nil, testfile, testfile, testfile,
common.TransferUpload, 0, 0, 0, 0, false, fs, dataprovider.TransferQuota{})
tr = newTransfer(baseTransfer, nil, r, 10)
tr = newTransfer(baseTransfer, nil, vfs.NewPipeReader(r), 10)
pos, err := tr.Seek(10, 0)
assert.NoError(t, err)
assert.Equal(t, pos, tr.expectedOffset)

View file

@ -18,8 +18,6 @@ import (
"errors"
"io"
"github.com/eikenb/pipeat"
"github.com/drakkan/sftpgo/v2/internal/common"
"github.com/drakkan/sftpgo/v2/internal/vfs"
)
@ -34,7 +32,7 @@ type transfer struct {
expectedOffset int64
}
func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader,
expectedOffset int64) *transfer {
var writer io.WriteCloser
var reader io.ReadCloser
@ -137,6 +135,9 @@ func (t *transfer) closeIO() error {
t.Unlock()
} else if t.reader != nil {
err = t.reader.Close()
if metadater, ok := t.reader.(vfs.Metadater); ok {
t.BaseTransfer.SetMetadata(metadater.Metadata())
}
}
return err
}

View file

@ -17,8 +17,6 @@ package httpd
import (
"io"
"github.com/eikenb/pipeat"
"github.com/drakkan/sftpgo/v2/internal/common"
"github.com/drakkan/sftpgo/v2/internal/vfs"
)
@ -30,7 +28,7 @@ type httpdFile struct {
isFinished bool
}
func newHTTPDFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt) *httpdFile {
func newHTTPDFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader) *httpdFile {
var writer io.WriteCloser
var reader io.ReadCloser
if baseTransfer.File != nil {
@ -127,6 +125,9 @@ func (f *httpdFile) closeIO() error {
f.Unlock()
} else if f.reader != nil {
err = f.reader.Close()
if metadater, ok := f.reader.(vfs.Metadater); ok {
f.BaseTransfer.SetMetadata(metadater.Metadata())
}
}
return err
}

View file

@ -233,7 +233,7 @@ func TestReadWriteErrors(t *testing.T) {
assert.NoError(t, err)
baseTransfer = common.NewBaseTransfer(nil, conn, nil, file.Name(), file.Name(), testfile, common.TransferDownload,
0, 0, 0, 0, false, fs, dataprovider.TransferQuota{})
transfer = newTransfer(baseTransfer, nil, r, nil)
transfer = newTransfer(baseTransfer, nil, vfs.NewPipeReader(r), nil)
err = transfer.Close()
assert.NoError(t, err)
_, err = transfer.ReadAt(buf, 0)
@ -1807,7 +1807,7 @@ func TestTransferFailingReader(t *testing.T) {
baseTransfer := common.NewBaseTransfer(nil, connection.BaseConnection, nil, fsPath, fsPath, filepath.Base(fsPath),
common.TransferUpload, 0, 0, 0, 0, false, fs, dataprovider.TransferQuota{})
errRead := errors.New("read is not allowed")
tr := newTransfer(baseTransfer, nil, r, errRead)
tr := newTransfer(baseTransfer, nil, vfs.NewPipeReader(r), errRead)
_, err = tr.ReadAt(buf, 0)
assert.EqualError(t, err, errRead.Error())

View file

@ -568,7 +568,7 @@ func (c *sshCommand) sendExitStatus(err error) {
}
}
common.ExecuteActionNotification(c.connection.BaseConnection, common.OperationSSHCmd, cmdPath, vCmdPath, //nolint:errcheck
targetPath, vTargetPath, c.command, 0, err, elapsed)
targetPath, vTargetPath, c.command, 0, err, elapsed, nil)
if err == nil {
logger.CommandLog(sshCommandLogSender, cmdPath, targetPath, c.connection.User.Username, "", c.connection.ID,
common.ProtocolSSH, -1, -1, "", "", c.connection.command, -1, c.connection.GetLocalAddress(),

View file

@ -18,8 +18,6 @@ import (
"fmt"
"io"
"github.com/eikenb/pipeat"
"github.com/drakkan/sftpgo/v2/internal/common"
"github.com/drakkan/sftpgo/v2/internal/metric"
"github.com/drakkan/sftpgo/v2/internal/vfs"
@ -60,7 +58,7 @@ type transfer struct {
isFinished bool
}
func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader,
errForRead error) *transfer {
var writer writerAtCloser
var reader readerAtCloser
@ -178,6 +176,9 @@ func (t *transfer) closeIO() error {
t.Unlock()
} else if t.readerAt != nil {
err = t.readerAt.Close()
if metadater, ok := t.readerAt.(vfs.Metadater); ok {
t.BaseTransfer.SetMetadata(metadater.Metadata())
}
}
return err
}

View file

@ -206,24 +206,25 @@ func (fs *AzureBlobFs) Lstat(name string) (os.FileInfo, error) {
}
// Open opens the named file for reading
func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
func (fs *AzureBlobFs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
return nil, nil, nil, err
}
p := NewPipeReader(r)
ctx, cancelFn := context.WithCancel(context.Background())
go func() {
defer cancelFn()
blockBlob := fs.containerClient.NewBlockBlobClient(name)
err := fs.handleMultipartDownload(ctx, blockBlob, offset, w)
err := fs.handleMultipartDownload(ctx, blockBlob, offset, w, p)
w.CloseWithError(err) //nolint:errcheck
fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, w.GetWrittenBytes(), err)
metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err)
}()
return nil, r, cancelFn, nil
return nil, p, cancelFn, nil
}
// Create creates or opens the named file for writing
@ -960,13 +961,17 @@ func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob *blockblob.Cl
}
func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *blockblob.Client,
offset int64, writer io.WriterAt,
offset int64, writer io.WriterAt, pipeReader *PipeReader,
) error {
props, err := blockBlob.GetProperties(ctx, &blob.GetPropertiesOptions{})
metric.AZHeadObjectCompleted(err)
if err != nil {
fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err)
return err
}
if readMetadata > 0 {
pipeReader.setMetadataFromPointerVal(props.Metadata)
}
contentLength := util.GetIntFromPointer(props.ContentLength)
sizeToDownload := contentLength - offset
if sizeToDownload < 0 {

View file

@ -79,7 +79,7 @@ func (fs *CryptFs) Name() string {
}
// Open opens the named file for reading
func (fs *CryptFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
func (fs *CryptFs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
f, key, err := fs.getFileAndEncryptionKey(name)
if err != nil {
return nil, nil, nil, err
@ -94,6 +94,7 @@ func (fs *CryptFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt,
f.Close()
return nil, nil, nil, err
}
p := NewPipeReader(r)
go func() {
if isZeroDownload {
@ -149,7 +150,7 @@ func (fs *CryptFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt,
fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %v", name, n, err)
}()
return nil, r, nil, nil
return nil, p, nil, nil
}
// Create creates or opens the named file for writing

View file

@ -129,17 +129,27 @@ func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) {
}
// Open opens the named file for reading
func (fs *GCSFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
func (fs *GCSFs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
return nil, nil, nil, err
}
p := NewPipeReader(r)
if readMetadata > 0 {
attrs, err := fs.headObject(name)
if err != nil {
r.Close()
w.Close()
return nil, nil, nil, err
}
p.setMetadata(attrs.Metadata)
}
bkt := fs.svc.Bucket(fs.config.Bucket)
obj := bkt.Object(name)
ctx, cancelFn := context.WithCancel(context.Background())
objectReader, err := obj.NewRangeReader(ctx, offset, -1)
if err == nil && offset > 0 && objectReader.Attrs.ContentEncoding == "gzip" {
err = fmt.Errorf("range request is not possible for gzip content encoding, requested offset %v", offset)
err = fmt.Errorf("range request is not possible for gzip content encoding, requested offset %d", offset)
objectReader.Close()
}
if err != nil {
@ -157,7 +167,7 @@ func (fs *GCSFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fu
fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
metric.GCSTransferCompleted(n, 1, err)
}()
return nil, r, cancelFn, nil
return nil, p, cancelFn, nil
}
// Create creates or opens the named file for writing

View file

@ -297,11 +297,12 @@ func (fs *HTTPFs) Lstat(name string) (os.FileInfo, error) {
}
// Open opens the named file for reading
func (fs *HTTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
func (fs *HTTPFs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
return nil, nil, nil, err
}
p := NewPipeReader(r)
ctx, cancelFn := context.WithCancel(context.Background())
var queryString string
@ -326,7 +327,7 @@ func (fs *HTTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, f
metric.HTTPFsTransferCompleted(n, 1, err)
}()
return nil, r, cancelFn, nil
return nil, p, cancelFn, nil
}
// Create creates or opens the named file for writing

View file

@ -107,7 +107,7 @@ func (fs *OsFs) Lstat(name string) (os.FileInfo, error) {
}
// Open opens the named file for reading
func (fs *OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
func (fs *OsFs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
f, err := os.Open(name)
if err != nil {
return nil, nil, nil, err
@ -127,6 +127,7 @@ func (fs *OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun
f.Close()
return nil, nil, nil, err
}
p := NewPipeReader(r)
go func() {
br := bufio.NewReaderSize(f, fs.readBufferSize)
n, err := doCopy(w, br, nil)
@ -135,7 +136,7 @@ func (fs *OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun
fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %v", name, n, err)
}()
return nil, r, nil, nil
return nil, p, nil, nil
}
// Create creates or opens the named file for writing

View file

@ -197,11 +197,22 @@ func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
}
// Open opens the named file for reading
func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
func (fs *S3Fs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
return nil, nil, nil, err
}
p := NewPipeReader(r)
if readMetadata > 0 {
attrs, err := fs.headObject(name)
if err != nil {
r.Close()
w.Close()
return nil, nil, nil, err
}
p.setMetadata(attrs.Metadata)
}
ctx, cancelFn := context.WithCancel(context.Background())
downloader := manager.NewDownloader(fs.svc, func(d *manager.Downloader) {
d.Concurrency = fs.config.DownloadConcurrency
@ -230,7 +241,7 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun
fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %+v", name, n, err)
metric.S3TransferCompleted(n, 1, err)
}()
return nil, r, cancelFn, nil
return nil, p, cancelFn, nil
}
// Create creates or opens the named file for writing

View file

@ -336,7 +336,7 @@ func (fs *SFTPFs) Lstat(name string) (os.FileInfo, error) {
}
// Open opens the named file for reading
func (fs *SFTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
func (fs *SFTPFs) Open(name string, offset int64) (File, *PipeReader, func(), error) {
client, err := fs.conn.getClient()
if err != nil {
return nil, nil, nil, err
@ -360,6 +360,8 @@ func (fs *SFTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, f
f.Close()
return nil, nil, nil, err
}
p := NewPipeReader(r)
go func() {
// if we enable buffering the client stalls
//br := bufio.NewReaderSize(f, int(fs.config.BufferSize)*1024*1024)
@ -370,7 +372,7 @@ func (fs *SFTPFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, f
fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %v", name, n, err)
}()
return nil, r, nil, nil
return nil, p, nil, nil
}
// Create creates or opens the named file for writing

View file

@ -25,6 +25,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/eikenb/pipeat"
@ -60,6 +61,7 @@ var (
sftpFingerprints []string
allowSelfConnections int
renameMode int
readMetadata int
)
// SetAllowSelfConnections sets the desired behaviour for self connections
@ -87,13 +89,18 @@ func SetRenameMode(val int) {
renameMode = val
}
// SetReadMetadataMode sets the read metadata mode
func SetReadMetadataMode(val int) {
readMetadata = val
}
// Fs defines the interface for filesystem backends
type Fs interface {
Name() string
ConnectionID() string
Stat(name string) (os.FileInfo, error)
Lstat(name string) (os.FileInfo, error)
Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error)
Open(name string, offset int64) (File, *PipeReader, func(), error)
Create(name string, flag, checks int) (File, *PipeWriter, func(), error)
Rename(source, target string) (int, int64, error)
Remove(name string, isDir bool) error
@ -157,6 +164,11 @@ type File interface {
Truncate(size int64) error
}
// Metadater defines an interface to implement to return metadata for a file
type Metadater interface {
Metadata() map[string]string
}
// QuotaCheckResult defines the result for a quota check
type QuotaCheckResult struct {
HasSpace bool
@ -687,23 +699,23 @@ func (c *CryptFsConfig) validate() error {
// PipeWriter defines a wrapper for pipeat.PipeWriterAt.
type PipeWriter struct {
writer *pipeat.PipeWriterAt
err error
done chan bool
*pipeat.PipeWriterAt
err error
done chan bool
}
// NewPipeWriter initializes a new PipeWriter
func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter {
return &PipeWriter{
writer: w,
err: nil,
done: make(chan bool),
PipeWriterAt: w,
err: nil,
done: make(chan bool),
}
}
// Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
func (p *PipeWriter) Close() error {
p.writer.Close() //nolint:errcheck // the returned error is always null
p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null
<-p.done
return p.err
}
@ -715,14 +727,58 @@ func (p *PipeWriter) Done(err error) {
p.done <- true
}
// WriteAt is a wrapper for pipeat WriteAt
func (p *PipeWriter) WriteAt(data []byte, off int64) (int, error) {
return p.writer.WriteAt(data, off)
// NewPipeReader initializes a new PipeReader
func NewPipeReader(r *pipeat.PipeReaderAt) *PipeReader {
return &PipeReader{
PipeReaderAt: r,
}
}
// Write is a wrapper for pipeat Write
func (p *PipeWriter) Write(data []byte) (int, error) {
return p.writer.Write(data)
// PipeReader defines a wrapper for pipeat.PipeReaderAt.
type PipeReader struct {
*pipeat.PipeReaderAt
mu sync.RWMutex
metadata map[string]string
}
func (p *PipeReader) setMetadata(value map[string]string) {
p.mu.Lock()
defer p.mu.Unlock()
p.metadata = value
}
func (p *PipeReader) setMetadataFromPointerVal(value map[string]*string) {
p.mu.Lock()
defer p.mu.Unlock()
if len(value) == 0 {
p.metadata = nil
return
}
p.metadata = map[string]string{}
for k, v := range value {
val := util.GetStringFromPointer(v)
if val != "" {
p.metadata[k] = val
}
}
}
// Metadata implements the Metadater interface
func (p *PipeReader) Metadata() map[string]string {
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.metadata) == 0 {
return nil
}
result := make(map[string]string)
for k, v := range p.metadata {
result[k] = v
}
return result
}
func isEqualityCheckModeValid(mode int) bool {

View file

@ -400,6 +400,9 @@ func (f *webDavFile) closeIO() error {
f.Unlock()
} else if f.reader != nil {
err = f.reader.Close()
if metadater, ok := f.reader.(vfs.Metadater); ok {
f.BaseTransfer.SetMetadata(metadater.Metadata())
}
}
return err
}

View file

@ -286,9 +286,9 @@ func (fs *MockOsFs) Name() string {
}
// Open returns nil
func (fs *MockOsFs) Open(name string, offset int64) (vfs.File, *pipeat.PipeReaderAt, func(), error) {
func (fs *MockOsFs) Open(name string, offset int64) (vfs.File, *vfs.PipeReader, func(), error) {
if fs.reader != nil {
return nil, fs.reader, nil, nil
return nil, vfs.NewPipeReader(fs.reader), nil, nil
}
return fs.Fs.Open(name, offset)
}

View file

@ -22,6 +22,9 @@
"allowlist_status": 0,
"allow_self_connections": 0,
"umask": "",
"metadata": {
"read": 0
},
"defender": {
"enabled": false,
"driver": "memory",

View file

@ -834,12 +834,21 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
<p>
<span class="shortcut"><b>{{`{{ObjectData}}`}}</b></span> => Provider object data serialized as JSON with sensitive fields removed.
</p>
<p>
<span class="shortcut"><b>{{`{{ObjectDataString}}`}}</b></span> => Provider object data as JSON escaped string with sensitive fields removed.
</p>
<p>
<span class="shortcut"><b>{{`{{RetentionReports}}`}}</b></span> => Data retention reports as zip compressed CSV files. Supported as email attachment, file path for multipart HTTP request and as single parameter for HTTP requests body.
</p>
<p>
<span class="shortcut"><b>{{`{{IDPField<fieldname>}}`}}</b></span> => Identity Provider custom fields containing a string.
</p>
<p>
<span class="shortcut"><b>{{`{{Metadata}}`}}</b></span> => Cloud storage metadata for the downloaded file serialized as JSON.
</p>
<p>
<span class="shortcut"><b>{{`{{MetadataString}}`}}</b></span> => Cloud storage metadata for the downloaded file as JSON escaped string.
</p>
</div>
<div class="modal-footer">
<button class="btn btn-primary" type="button" data-dismiss="modal">OK</button>