Add Real-Time syncing of remote updates via webhooks (#1718)

* Add Real-Time syncing of remote updates via webhooks

Co-authored-by: abraunegg <alex.braunegg@gmail.com>
This commit is contained in:
Yuan Liu 2021-11-23 11:54:28 -08:00 committed by GitHub
parent 2f47beab60
commit 1e2827ad1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 11089 additions and 289 deletions

View File

@ -78,7 +78,8 @@ SOURCES = \
src/sync.d \
src/upload.d \
src/util.d \
src/progress.d
src/progress.d \
src/arsd/cgi.d
ifeq ($(NOTIFICATIONS),yes)
SOURCES += src/notifications/notify.d src/notifications/dnotify.d

View File

@ -13,7 +13,8 @@ This client is a 'fork' of the [skilion](https://github.com/skilion/onedrive) cl
## Features
* State caching
* Real-Time file monitoring with Inotify
* Real-Time local file monitoring with inotify
* Real-Time syncing of remote updates via webhooks
* File upload / download validation to ensure data integrity
* Resumable uploads
* Support OneDrive for Business (part of Office 365)
@ -27,7 +28,6 @@ This client is a 'fork' of the [skilion](https://github.com/skilion/onedrive) cl
* Supports rate limiting of traffic
## What's missing
* While local changes are uploaded right away, remote changes are delayed until next automated sync cycle when using --monitor
* Ability to encrypt/decrypt files on-the-fly when uploading/downloading files from OneDrive
* Support for Windows 'On-Demand' functionality so file is only downloaded when accessed locally
* A GUI for configuration management

6
config
View File

@ -42,3 +42,9 @@
# sync_dir_permissions = "700"
# sync_file_permissions = "600"
# rate_limit = "131072"
# webhook_enabled = "false"
# webhook_public_url = ""
# webhook_listening_host = ""
# webhook_listening_port = "8888"
# webhook_expiration_interval = "86400"
# webhook_renewal_interval = "43200"

View File

@ -36,6 +36,10 @@
* [Shared folders (OneDrive Business or Office 365)](#shared-folders-onedrive-business-or-office-365)
* [SharePoint / Office 365 Shared Libraries](#sharepoint--office-365-shared-libraries)
- [Running 'onedrive' in 'monitor' mode](#running-onedrive-in-monitor-mode)
* [Use webhook to subscribe to remote updates in 'monitor' mode](#use-webhook-to-subscribe-to-remote-updates-in-monitor-mode)
* [More webhook configuration options](#more-webhook-configuration-options)
+ [webhook_listening_host and webhook_listening_port](#webhook_listening_host-and-webhook_listening_port)
+ [webhook_expiration_interval and webhook_renewal_interval](#webhook_expiration_interval-and-webhook_renewal_interval)
- [Running 'onedrive' as a system service](#running-onedrive-as-a-system-service)
* [OneDrive service running as root user via init.d](#onedrive-service-running-as-root-user-via-initd)
* [OneDrive service running as root user via systemd (Arch, Ubuntu, Debian, OpenSuSE, Fedora)](#onedrive-service-running-as-root-user-via-systemd-arch-ubuntu-debian-opensuse-fedora)
@ -353,6 +357,12 @@ See the [config](https://raw.githubusercontent.com/abraunegg/onedrive/master/con
# sync_file_permissions = "600"
# rate_limit = "131072"
# operation_timeout = "3600"
# webhook_enabled = "false"
# webhook_public_url = ""
# webhook_listening_host = ""
# webhook_listening_port = "8888"
# webhook_expiration_interval = "86400"
# webhook_renewal_interval = "43200"
```
@ -682,6 +692,59 @@ sudo sysctl fs.inotify.max_user_watches=<new_value>
To make these changes permanent, refer to your OS reference documentation.
### Use webhook to subscribe to remote updates in 'monitor' mode
A webhook can be optionally enabled in the monitor mode to allow the onedrive process to subscribe to remote updates. Remote changes can be synced to your local file system as soon as possible, without waiting for the next sync cycle.
To enable this feature, you need to configure the following options in the config file:
```
webhook_enabled = "true"
webhook_public_url = "<public-facing url to reach your webhook>"
```
Setting `webhook_enabled` to `true` enables the webhook in 'monitor' mode. The onedrive process will listen for incoming updates at a configurable endpoint, which defaults to `0.0.0.0:8888`. The `webhook_public_url` must be set to an public-facing url for Microsoft to send updates to your webhook. If your host is directly exposed to the Internet, the `webhook_public_url` can be set to `http://<your_host>:8888/` to match the default endpoint. However, the recommended approach is to configure a reverse proxy like nginx.
For example, below is a nginx config snippet to proxy traffic into the webhook:
```
http {
server {
listen 80;
location /webhooks/onedrive {
proxy_pass http://127.0.0.1:8888;
}
}
}
```
With nginx running, you can configure `webhook_public_url` to `http://<your_host>/webhooks/onedrive`.
### More webhook configuration options
Below options can be optionally configured. The default is usually good enough.
#### webhook_listening_host and webhook_listening_port
Set `webhook_listening_host` and `webhook_listening_port` to change the webhook listening endpoint. If `webhook_listening_host` is left empty, which is the default, the webhook will bind to `0.0.0.0`. The default `webhook_listening_port` is `8888`.
```
webhook_listening_host = ""
webhook_listening_port = "8888"
```
#### webhook_expiration_interval and webhook_renewal_interval
Set `webhook_expiration_interval` and `webhook_renewal_interval` to change the frequency of subscription renewal. By default, the webhook asks Microsoft to keep subscriptions alive for 24 hours, and it renews subscriptions when it is less than 12 hours before their expiration.
```
# Default expiration interval is 24 hours
webhook_expiration_interval = "86400"
# Default renewal interval is 12 hours
webhook_renewal_interval = "43200"
```
## Running 'onedrive' as a system service
There are a few ways to use onedrive as a service
* via init.d

8
src/arsd/README.md Normal file
View File

@ -0,0 +1,8 @@
The files in this directory have been obtained form the following places:
cgi.d
https://github.com/adamdruppe/arsd/blob/a870179988b8881b04126856105f0fad2cc0018d/cgi.d
License: Boost Software License - Version 1.0
Copyright 2008-2021, Adam D. Ruppe
see https://github.com/adamdruppe/arsd/blob/a870179988b8881b04126856105f0fad2cc0018d/LICENSE

10481
src/arsd/cgi.d Normal file

File diff suppressed because it is too large Load Diff

View File

@ -123,6 +123,14 @@ final class Config
// This includes dns resolution, connecting, data transfer, etc.
longValues["operation_timeout"] = 3600;
// Webhook options
boolValues["webhook_enabled"] = false;
stringValues["webhook_public_url"] = "";
stringValues["webhook_listening_host"] = "";
longValues["webhook_listening_port"] = 8888;
longValues["webhook_expiration_interval"] = 3600 * 24;
longValues["webhook_renewal_interval"] = 3600 * 12;
// DEVELOPER OPTIONS
// display_memory = true | false
// - It may be desirable to display the memory usage of the application to assist with diagnosing memory issues with the application

View File

@ -5,11 +5,13 @@ import config, itemdb, monitor, onedrive, selective, sync, util;
import std.net.curl: CurlException;
import core.stdc.signal;
import std.traits;
import std.concurrency: receiveTimeout;
static import log;
OneDriveApi oneDrive;
ItemDatabase itemDb;
bool onedriveInitialised = false;
const int EXIT_UNAUTHORIZED = 3;
enum MONITOR_LOG_SILENT = 2;
@ -48,7 +50,6 @@ int main(string[] args)
bool skipDirDifferent = false;
bool online = false;
bool performSyncOK = false;
bool onedriveInitialised = false;
bool displayMemoryUsage = false;
bool displaySyncOptions = false;
@ -1106,6 +1107,7 @@ int main(string[] args)
// sync list is configured
syncListConfiguredFullScanOverride = true;
}
immutable bool webhookEnabled = cfg.getValueBool("webhook_enabled");
while (performMonitor) {
if (!cfg.getValueBool("download_only")) {
@ -1117,6 +1119,34 @@ int main(string[] args)
}
}
// Check for notifications pushed from Microsoft to the webhook
bool notificationReceived = false;
if (webhookEnabled) {
// Create a subscription on the first run, or renew the subscription
// on subsequent runs when it is about to expire.
oneDrive.createOrRenewSubscription();
// Process incoming notifications if any.
// Empirical evidence shows that Microsoft often sends multiple
// notifications for one single change, so we need a loop to exhaust
// all signals that were queued up by the webhook. The notifications
// do not contain any actual changes, and we will always rely do the
// delta endpoint to sync to latest. Therefore, only one sync run is
// good enough to catch up for multiple notifications.
for (int signalCount = 0;; signalCount++) {
const auto signalExists = receiveTimeout(dur!"seconds"(-1), (ulong _) {});
if (signalExists) {
notificationReceived = true;
} else {
if (notificationReceived) {
log.log("Received ", signalCount," refresh signals from the webhook");
}
break;
}
}
}
auto currTime = MonoTime.currTime();
// has monitor_interval elapsed or are we at application startup / monitor startup?
// in a --resync scenario, if we have not 're-populated' the database, valid changes will get skipped:
@ -1126,7 +1156,7 @@ int main(string[] args)
// Moving random_files/2eVPInOMTFNXzRXeNMEoJch5OR9XpGby to target/2eVPInOMTFNXzRXeNMEoJch5OR9XpGby
// Skipping uploading this new file as parent path is not in the database: target/2eVPInOMTFNXzRXeNMEoJch5OR9XpGby
// 'target' should be in the DB, it should also exist online, but because of --resync, it does not exist in the database thus parent check fails
if ((currTime - lastCheckTime > checkInterval) || (monitorLoopFullCount == 0)) {
if (notificationReceived || (currTime - lastCheckTime > checkInterval) || (monitorLoopFullCount == 0)) {
// monitor sync loop
logOutputMessage = "################################################## NEW LOOP ##################################################";
if (displaySyncOptions) {
@ -1535,14 +1565,19 @@ auto assumeNoGC(T) (T t) if (isFunctionPointer!T || isDelegate!T)
extern(C) nothrow @nogc @system void exitHandler(int value) {
try {
assumeNoGC ( () {
log.log("Got termination signal, shutting down db connection");
log.log("Got termination signal, performing clean up");
// if initialised, shut down the HTTP instance
if (onedriveInitialised) {
log.log("Shutting down the HTTP instance");
oneDrive.shutdown();
}
// was itemDb initialised?
if (itemDb !is null) {
// Make sure the .wal file is incorporated into the main db before we exit
log.log("Shutting down db connection");
itemDb.performVacuum();
destroy(itemDb);
}
// Use exit scopes to shutdown OneDrive API
})();
} catch(Exception e) {}
exit(0);

View File

@ -1,14 +1,17 @@
import std.net.curl;
import etc.c.curl: CurlOption;
import std.datetime, std.exception, std.file, std.json, std.path;
import std.stdio, std.string, std.uni, std.uri, std.file;
import std.datetime, std.datetime.systime, std.exception, std.file, std.json, std.path;
import std.stdio, std.string, std.uni, std.uri, std.file, std.uuid;
import std.array: split;
import core.atomic : atomicOp;
import core.stdc.stdlib;
import core.thread, std.conv, std.math;
import std.algorithm.searching;
import std.concurrency;
import progress;
import config;
import util;
import arsd.cgi;
static import log;
shared bool debugResponse = false;
private bool dryRun = false;
@ -74,6 +77,9 @@ private {
// Office 365 / SharePoint Queries
string siteSearchUrl = globalGraphEndpoint ~ "/v1.0/sites?search";
string siteDriveUrl = globalGraphEndpoint ~ "/v1.0/sites/";
// Subscriptions
string subscriptionUrl = globalGraphEndpoint ~ "/v1.0/subscriptions";
}
class OneDriveException: Exception
@ -99,12 +105,104 @@ class OneDriveException: Exception
}
}
class OneDriveWebhook {
// We need OneDriveWebhook.serve to be a static function, otherwise we would hit the member function
// "requires a dual-context, which is deprecated" warning. The root cause is described here:
// - https://issues.dlang.org/show_bug.cgi?id=5710
// - https://forum.dlang.org/post/fkyppfxzegenniyzztos@forum.dlang.org
// The problem is deemed a bug and should be fixed in the compilers eventually. The singleton stuff
// could be undone when it is fixed.
//
// Following the singleton pattern described here: https://wiki.dlang.org/Low-Lock_Singleton_Pattern
// Cache instantiation flag in thread-local bool
// Thread local
private static bool instantiated_;
// Thread global
private __gshared OneDriveWebhook instance_;
private string host;
private ushort port;
private Tid parentTid;
private shared uint count;
static OneDriveWebhook getOrCreate(string host, ushort port, Tid parentTid) {
if (!instantiated_) {
synchronized(OneDriveWebhook.classinfo) {
if (!instance_) {
instance_ = new OneDriveWebhook(host, port, parentTid);
}
instantiated_ = true;
}
}
return instance_;
}
private this(string host, ushort port, Tid parentTid) {
this.host = host;
this.port = port;
this.parentTid = parentTid;
this.count = 0;
}
// The static serve() is necessary because spawn() does not like instance methods
static serve() {
// we won't create the singleton instance if it hasn't been created already
// such case is a bug which should crash the program and gets fixed
instance_.serveImpl();
}
// The static handle() is necessary to work around the dual-context warning mentioned above
private static void handle(Cgi cgi) {
// we won't create the singleton instance if it hasn't been created already
// such case is a bug which should crash the program and gets fixed
instance_.handleImpl(cgi);
}
private void serveImpl() {
auto server = new RequestServer(host, port);
server.serveEmbeddedHttp!handle();
}
private void handleImpl(Cgi cgi) {
if (.debugResponse) {
log.log("Webhook request: ", cgi.requestMethod, " ", cgi.requestUri);
if (!cgi.postBody.empty) {
log.log("Webhook post body: ", cgi.postBody);
}
}
cgi.setResponseContentType("text/plain");
if ("validationToken" in cgi.get) {
// For validation requests, respond with the validation token passed in the query string
// https://docs.microsoft.com/en-us/onedrive/developer/rest-api/concepts/webhook-receiver-validation-request
cgi.write(cgi.get["validationToken"]);
log.log("Webhook: handled validation request");
} else {
// Notifications don't include any information about the changes that triggered them.
// Put a refresh signal in the queue and let the main monitor loop process it.
// https://docs.microsoft.com/en-us/onedrive/developer/rest-api/concepts/using-webhooks
count.atomicOp!"+="(1);
send(parentTid, to!ulong(count));
cgi.write("OK");
log.log("Webhook: sent refresh signal #", count);
}
}
}
final class OneDriveApi
{
private Config cfg;
private string refreshToken, accessToken;
private string refreshToken, accessToken, subscriptionId;
private SysTime accessTokenExpiration;
private HTTP http;
private OneDriveWebhook webhook;
private SysTime subscriptionExpiration;
private Duration subscriptionExpirationInterval, subscriptionRenewalInterval;
private string notificationUrl;
// if true, every new access token is printed
bool printAccessToken;
@ -197,6 +295,8 @@ final class OneDriveApi
siteDriveUrl = usl4GraphEndpoint ~ "/v1.0/sites/";
// Shared With Me
sharedWithMeUrl = usl4GraphEndpoint ~ "/v1.0/me/drive/sharedWithMe";
// Subscriptions
subscriptionUrl = usl4GraphEndpoint ~ "/v1.0/subscriptions";
break;
case "USL5":
log.log("Configuring Azure AD for US Government Endpoints (DOD)");
@ -223,6 +323,8 @@ final class OneDriveApi
siteDriveUrl = usl5GraphEndpoint ~ "/v1.0/sites/";
// Shared With Me
sharedWithMeUrl = usl5GraphEndpoint ~ "/v1.0/me/drive/sharedWithMe";
// Subscriptions
subscriptionUrl = usl5GraphEndpoint ~ "/v1.0/subscriptions";
break;
case "DE":
log.log("Configuring Azure AD Germany");
@ -249,6 +351,8 @@ final class OneDriveApi
siteDriveUrl = deGraphEndpoint ~ "/v1.0/sites/";
// Shared With Me
sharedWithMeUrl = deGraphEndpoint ~ "/v1.0/me/drive/sharedWithMe";
// Subscriptions
subscriptionUrl = deGraphEndpoint ~ "/v1.0/subscriptions";
break;
case "CN":
log.log("Configuring AD China operated by 21Vianet");
@ -275,6 +379,8 @@ final class OneDriveApi
siteDriveUrl = cnGraphEndpoint ~ "/v1.0/sites/";
// Shared With Me
sharedWithMeUrl = cnGraphEndpoint ~ "/v1.0/me/drive/sharedWithMe";
// Subscriptions
subscriptionUrl = cnGraphEndpoint ~ "/v1.0/subscriptions";
break;
// Default - all other entries
default:
@ -357,11 +463,19 @@ final class OneDriveApi
.simulateNoRefreshTokenFile = true;
}
}
subscriptionExpiration = Clock.currTime(UTC());
subscriptionExpirationInterval = dur!"seconds"(cfg.getValueLong("webhook_expiration_interval"));
subscriptionRenewalInterval = dur!"seconds"(cfg.getValueLong("webhook_renewal_interval"));
notificationUrl = cfg.getValueString("webhook_public_url");
}
// Shutdown OneDrive HTTP construct
void shutdown()
{
// delete subscription if there exists any
deleteSubscription();
// reset any values to defaults, freeing any set objects
http.clearRequestHeaders();
http.onSend = null;
@ -830,6 +944,90 @@ final class OneDriveApi
return get(url);
}
// Create a new subscription or renew the existing subscription
void createOrRenewSubscription() {
checkAccessTokenExpired();
// Kick off the webhook server first
if (webhook is null) {
webhook = OneDriveWebhook.getOrCreate(
cfg.getValueString("webhook_listening_host"),
to!ushort(cfg.getValueLong("webhook_listening_port")),
thisTid
);
spawn(&OneDriveWebhook.serve);
}
if (!hasValidSubscription()) {
createSubscription();
} else if (isSubscriptionUpForRenewal()) {
try {
renewSubscription();
} catch (OneDriveException e) {
if (e.httpStatusCode == 404) {
log.log("The subscription is not found on the server. Recreating subscription ...");
createSubscription();
}
}
}
}
private bool hasValidSubscription() {
return !subscriptionId.empty && subscriptionExpiration > Clock.currTime(UTC());
}
private bool isSubscriptionUpForRenewal() {
return subscriptionExpiration < Clock.currTime(UTC()) + subscriptionRenewalInterval;
}
private void createSubscription() {
log.log("Initializing subscription for updates ...");
auto expirationDateTime = Clock.currTime(UTC()) + subscriptionExpirationInterval;
const(char)[] url;
url = subscriptionUrl;
const JSONValue request = [
"changeType": "updated",
"notificationUrl": notificationUrl,
"resource": "/me/drive/root",
"expirationDateTime": expirationDateTime.toISOExtString(),
"clientState": randomUUID().toString()
];
http.addRequestHeader("Content-Type", "application/json");
JSONValue response = post(url, request.toString());
// Save important subscription metadata including id and expiration
subscriptionId = response["id"].str;
subscriptionExpiration = SysTime.fromISOExtString(response["expirationDateTime"].str);
}
private void renewSubscription() {
log.log("Renewing subscription for updates ...");
auto expirationDateTime = Clock.currTime(UTC()) + subscriptionExpirationInterval;
const(char)[] url;
url = subscriptionUrl ~ "/" ~ subscriptionId;
const JSONValue request = [
"expirationDateTime": expirationDateTime.toISOExtString()
];
http.addRequestHeader("Content-Type", "application/json");
JSONValue response = patch(url, request.toString());
// Update subscription expiration from the response
subscriptionExpiration = SysTime.fromISOExtString(response["expirationDateTime"].str);
}
private void deleteSubscription() {
if (!hasValidSubscription()) {
return;
}
const(char)[] url;
url = subscriptionUrl ~ "/" ~ subscriptionId;
del(url);
log.log("Deleted subscription");
}
private void redeemToken(const(char)[] authCode)
{
const(char)[] postData =