Add webhook functionality back in

* Add webhook functionality back in
This commit is contained in:
abraunegg 2023-10-19 06:48:25 +11:00
parent 35edb4c1b0
commit 3a3c954756
3 changed files with 263 additions and 13 deletions

View file

@ -68,7 +68,7 @@ endif
SOURCES = \
src/main.d \
src/config.d \
src/log.d \
src/log.d \
src/util.d \
src/qxor.d \
src/curlEngine.d \
@ -78,7 +78,8 @@ SOURCES = \
src/sqlite.d \
src/clientSideFiltering.d \
src/progress.d \
src/monitor.d
src/monitor.d \
src/arsd/cgi.d
ifeq ($(NOTIFICATIONS),yes)
SOURCES += src/notifications/notify.d src/notifications/dnotify.d

View file

@ -496,7 +496,10 @@ int main(string[] cliArgs) {
return EXIT_FAILURE;
}
// We do not need this instance, as the API was initialised, and individual instances are used during sync process
oneDriveApiInstance.shutdown();
// However we need this instance to hang around if we are using --monitor for handling subscriptions
if (!appConfig.getValueBool("monitor")) {
oneDriveApiInstance.shutdown();
}
} else {
// API could not be initialised
log.error("The OneDrive API could not be initialised");
@ -717,22 +720,24 @@ int main(string[] cliArgs) {
}
}
// Filesystem monitor loop
bool performMonitor = true;
ulong monitorLoopFullCount = 0;
ulong fullScanFrequencyLoopCount = 0;
ulong monitorLogOutputLoopCount = 0;
// Filesystem monitor loop variables
// immutables
immutable auto checkOnlineInterval = dur!"seconds"(appConfig.getValueLong("monitor_interval"));
immutable auto githubCheckInterval = dur!"seconds"(86400);
immutable ulong fullScanFrequency = appConfig.getValueLong("monitor_fullscan_frequency");
immutable ulong logOutputSupressionInterval = appConfig.getValueLong("monitor_log_frequency");
immutable bool webhookEnabled = appConfig.getValueBool("webhook_enabled");
// changables
bool performMonitor = true;
ulong monitorLoopFullCount = 0;
ulong fullScanFrequencyLoopCount = 0;
ulong monitorLogOutputLoopCount = 0;
MonoTime lastCheckTime = MonoTime.currTime();
MonoTime lastGitHubCheckTime = MonoTime.currTime();
string loopStartOutputMessage = "################################################## NEW LOOP ##################################################";
string loopStopOutputMessage = "################################################ LOOP COMPLETE ###############################################";
while (performMonitor) {
// Do we need to validate the runtimeSyncDirectory to check for the presence of a '.nosync' file - the disk may have been ejected ..
checkForNoMountScenario();
@ -747,10 +752,35 @@ int main(string[] cliArgs) {
}
}
// Check for notifications pushed from Microsoft to the webhook
// Webhook Notification Handling
bool notificationReceived = false;
// Check here for a webhook notification
// Check for notifications pushed from Microsoft to the webhook
if (webhookEnabled) {
// Create a subscription on the first run, or renew the subscription
// on subsequent runs when it is about to expire.
oneDriveApiInstance.createOrRenewSubscription();
// Process incoming notifications if any.
// Empirical evidence shows that Microsoft often sends multiple
// notifications for one single change, so we need a loop to exhaust
// all signals that were queued up by the webhook. The notifications
// do not contain any actual changes, and we will always rely do the
// delta endpoint to sync to latest. Therefore, only one sync run is
// good enough to catch up for multiple notifications.
for (int signalCount = 0;; signalCount++) {
const auto signalExists = receiveTimeout(dur!"seconds"(-1), (ulong _) {});
if (signalExists) {
notificationReceived = true;
} else {
if (notificationReceived) {
log.log("Received ", signalCount," refresh signals from the webhook");
}
break;
}
}
}
// Get the current time this loop is starting
auto currentTime = MonoTime.currTime();

View file

@ -20,6 +20,12 @@ import std.conv;
import std.math;
import std.uri;
// Required for webhooks
import arsd.cgi;
import std.concurrency;
import core.atomic : atomicOp;
import std.uuid;
// What other modules that we have created do we need to import?
import config;
import log;
@ -27,6 +33,9 @@ import util;
import curlEngine;
import progress;
// Shared variables between classes
shared bool debugHTTPResponseOutput = false;
class OneDriveException: Exception {
// https://docs.microsoft.com/en-us/onedrive/developer/rest-api/concepts/errors
int httpStatusCode;
@ -47,10 +56,100 @@ class OneDriveException: Exception {
}
}
class OneDriveWebhook {
// We need OneDriveWebhook.serve to be a static function, otherwise we would hit the member function
// "requires a dual-context, which is deprecated" warning. The root cause is described here:
// - https://issues.dlang.org/show_bug.cgi?id=5710
// - https://forum.dlang.org/post/fkyppfxzegenniyzztos@forum.dlang.org
// The problem is deemed a bug and should be fixed in the compilers eventually. The singleton stuff
// could be undone when it is fixed.
//
// Following the singleton pattern described here: https://wiki.dlang.org/Low-Lock_Singleton_Pattern
// Cache instantiation flag in thread-local bool
// Thread local
private static bool instantiated_;
// Thread global
private __gshared OneDriveWebhook instance_;
private string host;
private ushort port;
private Tid parentTid;
private shared uint count;
static OneDriveWebhook getOrCreate(string host, ushort port, Tid parentTid) {
if (!instantiated_) {
synchronized(OneDriveWebhook.classinfo) {
if (!instance_) {
instance_ = new OneDriveWebhook(host, port, parentTid);
}
instantiated_ = true;
}
}
return instance_;
}
private this(string host, ushort port, Tid parentTid) {
this.host = host;
this.port = port;
this.parentTid = parentTid;
this.count = 0;
}
// The static serve() is necessary because spawn() does not like instance methods
static serve() {
// we won't create the singleton instance if it hasn't been created already
// such case is a bug which should crash the program and gets fixed
instance_.serveImpl();
}
// The static handle() is necessary to work around the dual-context warning mentioned above
private static void handle(Cgi cgi) {
// we won't create the singleton instance if it hasn't been created already
// such case is a bug which should crash the program and gets fixed
instance_.handleImpl(cgi);
}
private void serveImpl() {
auto server = new RequestServer(host, port);
server.serveEmbeddedHttp!handle();
}
private void handleImpl(Cgi cgi) {
if (debugHTTPResponseOutput) {
log.log("Webhook request: ", cgi.requestMethod, " ", cgi.requestUri);
if (!cgi.postBody.empty) {
log.log("Webhook post body: ", cgi.postBody);
}
}
cgi.setResponseContentType("text/plain");
if ("validationToken" in cgi.get) {
// For validation requests, respond with the validation token passed in the query string
// https://docs.microsoft.com/en-us/onedrive/developer/rest-api/concepts/webhook-receiver-validation-request
cgi.write(cgi.get["validationToken"]);
log.log("Webhook: handled validation request");
} else {
// Notifications don't include any information about the changes that triggered them.
// Put a refresh signal in the queue and let the main monitor loop process it.
// https://docs.microsoft.com/en-us/onedrive/developer/rest-api/concepts/using-webhooks
count.atomicOp!"+="(1);
send(parentTid, to!ulong(count));
cgi.write("OK");
log.log("Webhook: sent refresh signal #", count);
}
}
}
class OneDriveApi {
// Class variables
ApplicationConfig appConfig;
CurlEngine curlEngine;
OneDriveWebhook webhook;
string clientId = "";
string companyName = "";
string authUrl = "";
@ -63,7 +162,6 @@ class OneDriveApi {
string itemByPathUrl = "";
string siteSearchUrl = "";
string siteDriveUrl = "";
string subscriptionUrl = "";
string tenantId = "";
string authScope = "";
string refreshToken = "";
@ -71,6 +169,13 @@ class OneDriveApi {
bool debugResponse = false;
ulong retryAfterValue = 0;
// Webhook Subscriptions
string subscriptionUrl = "";
string subscriptionId = "";
SysTime subscriptionExpiration;
Duration subscriptionExpirationInterval, subscriptionRenewalInterval;
string notificationUrl = "";
this(ApplicationConfig appConfig) {
// Configure the class varaible to consume the application configuration
this.appConfig = appConfig;
@ -94,6 +199,10 @@ class OneDriveApi {
// Subscriptions
subscriptionUrl = appConfig.globalGraphEndpoint ~ "/v1.0/subscriptions";
subscriptionExpiration = Clock.currTime(UTC());
subscriptionExpirationInterval = dur!"seconds"(appConfig.getValueLong("webhook_expiration_interval"));
subscriptionRenewalInterval = dur!"seconds"(appConfig.getValueLong("webhook_renewal_interval"));
notificationUrl = appConfig.getValueString("webhook_public_url");
}
// Initialise the OneDrive API class
@ -110,6 +219,8 @@ class OneDriveApi {
// Did the user specify --debug-https
debugResponse = appConfig.getValueBool("debug_https");
// Flag this so if webhooks are being used, it can also be consumed
debugHTTPResponseOutput = appConfig.getValueBool("debug_https");
// Set clientId to use the configured 'application_id'
clientId = appConfig.getValueString("application_id");
@ -346,7 +457,7 @@ class OneDriveApi {
// Shutdown OneDrive API Curl Engine
void shutdown() {
// Delete subscription if there exists any
//deleteSubscription();
deleteSubscription();
// Reset any values to defaults, freeing any set objects
curlEngine.http.clearRequestHeaders();
@ -734,6 +845,114 @@ class OneDriveApi {
retryAfterValue = 0;
}
// Webhook functions
void createOrRenewSubscription() {
checkAccessTokenExpired();
// Kick off the webhook server first
if (webhook is null) {
webhook = OneDriveWebhook.getOrCreate(
appConfig.getValueString("webhook_listening_host"),
to!ushort(appConfig.getValueLong("webhook_listening_port")),
thisTid
);
spawn(&OneDriveWebhook.serve);
}
// Is there a valid subscription?
if (!hasValidSubscription()) {
createSubscription();
} else if (isSubscriptionUpForRenewal()) {
try {
renewSubscription();
} catch (OneDriveException e) {
if (e.httpStatusCode == 404) {
log.log("The subscription is not found on the server. Recreating subscription ...");
createSubscription();
}
}
}
}
// Private functions
private bool hasValidSubscription() {
return !subscriptionId.empty && subscriptionExpiration > Clock.currTime(UTC());
}
private bool isSubscriptionUpForRenewal() {
return subscriptionExpiration < Clock.currTime(UTC()) + subscriptionRenewalInterval;
}
private void createSubscription() {
log.log("Initializing subscription for updates ...");
auto expirationDateTime = Clock.currTime(UTC()) + subscriptionExpirationInterval;
string driveId = appConfig.getValueString("drive_id");
string url = subscriptionUrl;
// Create a resource item based on if we have a driveId
string resourceItem;
if (driveId.length) {
resourceItem = "/drives/" ~ driveId ~ "/root";
} else {
resourceItem = "/me/drive/root";
}
// create JSON request to create webhook subscription
const JSONValue request = [
"changeType": "updated",
"notificationUrl": notificationUrl,
"resource": resourceItem,
"expirationDateTime": expirationDateTime.toISOExtString(),
"clientState": randomUUID().toString()
];
curlEngine.http.addRequestHeader("Content-Type", "application/json");
JSONValue response;
try {
response = post(url, request.toString());
} catch (OneDriveException e) {
displayOneDriveErrorMessage(e.msg, getFunctionName!({}));
// We need to exit here, user needs to fix issue
log.error("ERROR: Unable to initialize subscriptions for updates. Please fix this issue.");
shutdown();
exit(-1);
}
// Save important subscription metadata including id and expiration
subscriptionId = response["id"].str;
subscriptionExpiration = SysTime.fromISOExtString(response["expirationDateTime"].str);
}
private void renewSubscription() {
log.log("Renewing subscription for updates ...");
auto expirationDateTime = Clock.currTime(UTC()) + subscriptionExpirationInterval;
string url;
url = subscriptionUrl ~ "/" ~ subscriptionId;
const JSONValue request = [
"expirationDateTime": expirationDateTime.toISOExtString()
];
curlEngine.http.addRequestHeader("Content-Type", "application/json");
JSONValue response = patch(url, request.toString());
// Update subscription expiration from the response
subscriptionExpiration = SysTime.fromISOExtString(response["expirationDateTime"].str);
}
private void deleteSubscription() {
if (!hasValidSubscription()) {
return;
}
string url;
url = subscriptionUrl ~ "/" ~ subscriptionId;
del(url);
log.log("Deleted subscription");
}
private void addAccessTokenHeader() {
curlEngine.http.addRequestHeader("Authorization", appConfig.accessToken);
}