From bcc77b79eb52fd05f8d3170f46e5b36b5379a457 Mon Sep 17 00:00:00 2001 From: abraunegg Date: Thu, 9 Nov 2023 12:20:11 +1100 Subject: [PATCH] Add resumable session uploads * Check the system for any session files, that indicate that a session upload did not complete * Change from using a CRC32 of the file, to a random 16 character alphanumeric string to use as the session filename extension as it is not computationally valid to do a CRC32 on large files --- src/clientSideFiltering.d | 9 ++ src/config.d | 2 +- src/main.d | 11 +- src/onedrive.d | 6 + src/sync.d | 233 +++++++++++++++++++++++++++++++++++--- src/util.d | 32 +++--- 6 files changed, 260 insertions(+), 33 deletions(-) diff --git a/src/clientSideFiltering.d b/src/clientSideFiltering.d index ed67c629..5a97a9f3 100644 --- a/src/clientSideFiltering.d +++ b/src/clientSideFiltering.d @@ -84,6 +84,15 @@ class ClientSideFiltering { return true; } + // Shutdown components + void shutdown() { + object.destroy(appConfig); + object.destroy(paths); + object.destroy(businessSharedItemsList); + object.destroy(fileMask); + object.destroy(directoryMask); + } + // Load sync_list file if it exists void loadSyncList(string filepath) { // open file as read only diff --git a/src/config.d b/src/config.d index ba3aa1bc..8c94682c 100644 --- a/src/config.d +++ b/src/config.d @@ -127,7 +127,7 @@ class ApplicationConfig { // - What is the home path of the actual 'user' that is running the application string defaultHomePath = ""; // - What is the config path for the application. By default, this is ~/.config/onedrive but can be overridden by using --confdir - private string configDirName = defaultConfigDirName; + string configDirName = defaultConfigDirName; // - In case we have to use a system config directory such as '/etc/onedrive' or similar, store that path in this variable private string systemConfigDirName = ""; // - Store the configured converted octal value for directory permissions diff --git a/src/main.d b/src/main.d index 4a01e3b5..817fa4a7 100644 --- a/src/main.d +++ b/src/main.d @@ -285,7 +285,7 @@ int main(string[] cliArgs) { // Force a synchronization of a specific folder, only when using --synchronize --single-directory and ignoring all non-default skip_dir and skip_file rules if (appConfig.getValueBool("force_sync")) { // appConfig.checkForBasicOptionConflicts() has already checked for the basic requirements for --force-sync - log.log("\nWARNING: Overriding application configuration to use application defaults for skip_dir and skip_file due to --synch --single-directory --force-sync being used"); + log.log("\nWARNING: Overriding application configuration to use application defaults for skip_dir and skip_file due to --sync --single-directory --force-sync being used"); bool forceSyncRiskAcceptance = appConfig.displayForceSyncRiskForAcceptance(); log.vdebug("Returned --force-sync risk acceptance: ", forceSyncRiskAcceptance); // Action based on user response @@ -516,6 +516,14 @@ int main(string[] cliArgs) { string localPath = "."; string remotePath = "/"; + // Check if there are interrupted upload session(s) + if (syncEngineInstance.checkForInterruptedSessionUploads) { + // Need to re-process the session upload files to resume the failed session uploads + log.log("There are interrupted session uploads that need to be resumed ..."); + // Process the session upload files + syncEngineInstance.processForInterruptedSessionUploads(); + } + // Are we doing a single directory operation (--single-directory) ? if (!appConfig.getValueString("single_directory").empty) { // Set singleDirectoryPath @@ -915,6 +923,7 @@ void performStandardExitProcess(string scopeCaller) { // Shutdown the client side filtering objects if (selectiveSync !is null) { log.vdebug("Shutdown Client Side Filtering instance"); + selectiveSync.shutdown(); object.destroy(selectiveSync); } diff --git a/src/onedrive.d b/src/onedrive.d index 828c5454..bf8105c3 100644 --- a/src/onedrive.d +++ b/src/onedrive.d @@ -796,6 +796,12 @@ class OneDriveApi { return response; } + // https://dev.onedrive.com/items/upload_large_files.htm + JSONValue requestUploadStatus(string uploadUrl) { + checkAccessTokenExpired(); + return get(uploadUrl, true); + } + // https://docs.microsoft.com/en-us/onedrive/developer/rest-api/api/site_search?view=odsp-graph-online JSONValue o365SiteSearch(string nextLink) { checkAccessTokenExpired(); diff --git a/src/sync.d b/src/sync.d index 3a49a0a6..74771b9d 100644 --- a/src/sync.d +++ b/src/sync.d @@ -91,7 +91,11 @@ class SyncEngine { // List of local paths, that, when using the OneDrive Business Shared Folders feature, then diabling it, folder still exists locally and online // This list of local paths need to be skipped string[] businessSharedFoldersOnlineToSkip; - + // List of interrupted uploads session files that need to be resumed + string[] interruptedUploadsSessionFiles; + // List of validated interrupted uploads session JSON items to resume + JSONValue[] jsonItemsToResumeUpload; + // Flag that there were upload or download failures listed bool syncFailures = false; // Is sync_list configured @@ -303,8 +307,11 @@ class SyncEngine { object.destroy(oneDriveApiInstance); exit(-1); } + + // API was initialised log.log("Sync Engine Initialised with new Onedrive API instance"); - // Shutdown API instance + + // Shutdown this API instance, as we will create API instances as required, when required oneDriveApiInstance.shutdown(); // Free object and memory object.destroy(oneDriveApiInstance); @@ -1669,7 +1676,7 @@ class SyncEngine { // The user has configured to ignore data safety checks and overwrite local data rather than preserve & rename log.vlog("WARNING: Local Data Protection has been disabled. You may experience data loss on this file: ", newItemPath); } else { - // local data protection is configured, rename the local file + // local data protection is configured, rename the local file, passing in if we are performing a --dry-run or not safeBackup(newItemPath, dryRun); } } @@ -1684,7 +1691,7 @@ class SyncEngine { // The user has configured to ignore data safety checks and overwrite local data rather than preserve & rename log.vlog("WARNING: Local Data Protection has been disabled. You may experience data loss on this file: ", newItemPath); } else { - // local data protection is configured, rename the local file + // local data protection is configured, rename the local file, passing in if we are performing a --dry-run or not safeBackup(newItemPath, dryRun); } } @@ -1966,11 +1973,8 @@ class SyncEngine { // local file is different to what we know to be true log.log("The local file to replace (", newItemPath,") has been modified locally since the last download. Renaming it to avoid potential local data loss."); - // do the rename if we are not in a --dry-run scenario - if (!dryRun) { - // Perform the local rename of the existing local file - safeBackup(newItemPath, dryRun); - } + // Perform the local rename of the existing local file, passing in if we are performing a --dry-run or not + safeBackup(newItemPath, dryRun); } } } @@ -3396,7 +3400,7 @@ class SyncEngine { // Evaluate the returned JSON uploadResponse // If there was an error uploading the file, uploadResponse should be empty and invalid - if (uploadResponse.type() != JSONType.object){ + if (uploadResponse.type() != JSONType.object) { uploadFailed = true; skippedExceptionError = true; } @@ -3525,8 +3529,8 @@ class SyncEngine { string currentETag; // As this is a unique thread, the sessionFilePath for where we save the data needs to be unique - // The best way to do this is calculate the CRC32 of the file, and use this as the suffix of the session file we save - string threadUploadSessionFilePath = appConfig.uploadSessionFilePath ~ "." ~ computeCRC32(localFilePath); + // The best way to do this is generate a 10 digit alphanumeric string, and use this as the file extention + string threadUploadSessionFilePath = appConfig.uploadSessionFilePath ~ "." ~ generateAlphanumericString(); // Get the absolute latest object details from online try { @@ -4740,8 +4744,8 @@ class SyncEngine { // - All Business | Office365 | SharePoint files > 0 bytes JSONValue uploadSessionData; // As this is a unique thread, the sessionFilePath for where we save the data needs to be unique - // The best way to do this is calculate the CRC32 of the file, and use this as the suffix of the session file we save - string threadUploadSessionFilePath = appConfig.uploadSessionFilePath ~ "." ~ computeCRC32(fileToUpload); + // The best way to do this is generate a 10 digit alphanumeric string, and use this as the file extention + string threadUploadSessionFilePath = appConfig.uploadSessionFilePath ~ "." ~ generateAlphanumericString(); // Attempt to upload the > 4MB file using an upload session for all account types try { @@ -4856,7 +4860,6 @@ class SyncEngine { log.log("Uploading new file ", fileToUpload, " ... failed."); displayOneDriveErrorMessage(exception.msg, thisFunctionName); } - } } else { // No Upload URL or nextExpectedRanges or localPath .. not a valid JSON we can use @@ -6999,9 +7002,6 @@ class SyncEngine { // Query OneDrive for the quota details void queryOneDriveForQuotaDetails() { // This function is similar to getRemainingFreeSpace() but is different in data being analysed and output method - - - JSONValue currentDriveQuota; string driveId; @@ -7076,4 +7076,201 @@ class SyncEngine { } } + + // Query the system for session_upload.* files + bool checkForInterruptedSessionUploads() { + + bool interruptedUploads = false; + ulong interruptedUploadsCount; + + // Scan the filesystem for the files we are interested in, build up interruptedUploadsSessionFiles array + foreach (sessionFile; dirEntries(appConfig.configDirName, "session_upload.*", SpanMode.shallow)) { + // calculate the full path + string tempPath = buildNormalizedPath(buildPath(appConfig.configDirName, sessionFile)); + // add to array + interruptedUploadsSessionFiles ~= [tempPath]; + } + + // Count all 'session_upload' files in appConfig.configDirName + //interruptedUploadsCount = count(dirEntries(appConfig.configDirName, "session_upload.*", SpanMode.shallow)); + interruptedUploadsCount = count(interruptedUploadsSessionFiles); + if (interruptedUploadsCount != 0) { + interruptedUploads = true; + } + + // return if there are interrupted uploads to process + return interruptedUploads; + } + + // Process interrupted 'session_upload' files + void processForInterruptedSessionUploads() { + // For each upload_session file that has been found, process the data to ensure it is still valid + foreach (sessionFilePath; interruptedUploadsSessionFiles) { + if (!validateUploadSessionFileData(sessionFilePath)) { + // Remove upload_session file as it is invalid + // upload_session file file contains an error - cant resume this session + log.vlog("Restore file upload session failed - cleaning up resumable session data file: ", sessionFilePath); + if (exists(sessionFilePath)) { + if (!dryRun) { + remove(sessionFilePath); + } + } + } + } + + // At this point we should have an array of JSON items to resume uploading + if (count(jsonItemsToResumeUpload) > 0) { + // there are valid items to resume upload + + // Lets deal with all the JSON items that need to be reumed for upload in a batch process + ulong batchSize = appConfig.concurrentThreads; + ulong batchCount = (jsonItemsToResumeUpload.length + batchSize - 1) / batchSize; + ulong batchesProcessed = 0; + + foreach (chunk; jsonItemsToResumeUpload.chunks(batchSize)) { + // send an array containing 'appConfig.concurrentThreads' (16) JSON items to resume upload + resumeSessionUploadsInParallel(chunk); + } + } + } + + bool validateUploadSessionFileData(string sessionFilePath) { + + JSONValue sessionFileData; + + // Try and read the text from the session file as a JSON array + try { + sessionFileData = readText(sessionFilePath).parseJSON(); + } catch (JSONException e) { + log.vdebug("SESSION-RESUME: Invalid JSON data in: ", sessionFilePath); + return false; + } + + // Does the file we wish to resume uploading exist locally still? + if ("localPath" in sessionFileData) { + string sessionLocalFilePath = sessionFileData["localPath"].str; + log.vdebug("SESSION-RESUME: sessionLocalFilePath: ", sessionLocalFilePath); + + // Does the file exist? + if (!exists(sessionLocalFilePath)) { + log.vlog("The local file to upload does not exist locally anymore"); + return false; + } + + // Can we read the file? + if (!readLocalFile(sessionLocalFilePath)) { + // filesystem error already returned if unable to read + return false; + } + + } else { + log.vdebug("SESSION-RESUME: No localPath data in: ", sessionFilePath); + return false; + } + + // Check the session data for expirationDateTime + if ("expirationDateTime" in sessionFileData) { + auto expiration = SysTime.fromISOExtString(sessionFileData["expirationDateTime"].str); + if (expiration < Clock.currTime()) { + log.vlog("The upload session has expired for: ", sessionFilePath); + return false; + } + } else { + log.vdebug("SESSION-RESUME: No expirationDateTime data in: ", sessionFilePath); + return false; + } + + // Check the online upload status, using the uloadURL in sessionFileData + if ("uploadUrl" in sessionFileData) { + JSONValue response; + + // Create a new OneDrive API instance + OneDriveApi validateUploadSessionFileDataApiInstance; + validateUploadSessionFileDataApiInstance = new OneDriveApi(appConfig); + validateUploadSessionFileDataApiInstance.initialise(); + + try { + response = validateUploadSessionFileDataApiInstance.requestUploadStatus(sessionFileData["uploadUrl"].str); + } catch (OneDriveException e) { + // handle any onedrive error response as invalid + log.vdebug("SESSION-RESUME: Invalid response when using uploadUrl in: ", sessionFilePath); + return false; + } + + // Shutdown API instance + validateUploadSessionFileDataApiInstance.shutdown(); + // Free object and memory + object.destroy(validateUploadSessionFileDataApiInstance); + + // Do we have a valid response from OneDrive? + if (response.type() == JSONType.object) { + // Valid JSON object was returned + if (("expirationDateTime" in response) && ("nextExpectedRanges" in response)) { + // The 'uploadUrl' is valid, and the response contains elements we need + sessionFileData["expirationDateTime"] = response["expirationDateTime"]; + sessionFileData["nextExpectedRanges"] = response["nextExpectedRanges"]; + + if (sessionFileData["nextExpectedRanges"].array.length == 0) { + log.vlog("The upload session was already completed"); + return false; + } + } else { + log.vdebug("SESSION-RESUME: No expirationDateTime & nextExpectedRanges data in Microsoft OneDrive API response: ", response); + return false; + } + } else { + // not a JSON object + log.vlog("Restore file upload session failed - invalid response from Microsoft OneDrive"); + return false; + } + } else { + log.vdebug("SESSION-RESUME: No uploadUrl data in: ", sessionFilePath); + return false; + } + + // Add 'sessionFilePath' to 'sessionFileData' so that it can be used when we re-use the JSON data to resume the upload + sessionFileData["sessionFilePath"] = sessionFilePath; + + // Add sessionFileData to jsonItemsToResumeUpload as it is now valid + jsonItemsToResumeUpload ~= sessionFileData; + return true; + } + + void resumeSessionUploadsInParallel(JSONValue[] array) { + // This function recieved an array of 16 JSON items to resume upload + foreach (i, jsonItemToResume; taskPool.parallel(array)) { + // Take each JSON item and resume upload using the JSON data + + JSONValue uploadResponse; + OneDriveApi uploadFileOneDriveApiInstance; + uploadFileOneDriveApiInstance = new OneDriveApi(appConfig); + uploadFileOneDriveApiInstance.initialise(); + + // Pull out data from this JSON element + string threadUploadSessionFilePath = jsonItemToResume["sessionFilePath"].str; + ulong thisFileSizeLocal = getSize(jsonItemToResume["localPath"].str); + + // Try to resume the session upload using the provided data + try { + uploadResponse = performSessionFileUpload(uploadFileOneDriveApiInstance, thisFileSizeLocal, jsonItemToResume, threadUploadSessionFilePath); + } catch (OneDriveException exception) { + writeln("CODING TO DO: Handle an exception when performing a resume session upload"); + } + + // Was the response from the OneDrive API a valid JSON item? + if (uploadResponse.type() == JSONType.object) { + // A valid JSON object was returned - session resumption upload sucessful + // Save JSON item in database + saveItem(uploadResponse); + } else { + // No valid response was returned + writeln("CODING TO DO: what to do when session upload resumption JSON data is not valid ... nothing ? error message ?"); + } + + // Shutdown API instance + uploadFileOneDriveApiInstance.shutdown(); + // Free object and memory + object.destroy(uploadFileOneDriveApiInstance); + } + } } \ No newline at end of file diff --git a/src/util.d b/src/util.d index fad87847..3cb0f7ce 100644 --- a/src/util.d +++ b/src/util.d @@ -23,6 +23,10 @@ import core.stdc.stdlib; import core.thread; import std.math; import std.format; +import std.random; +import std.array; +import std.ascii; +import std.range; // What other modules that we have created do we need to import? import log; @@ -52,11 +56,13 @@ void safeBackup(const(char)[] path, bool dryRun) { } newPath ~= ext; - // Perform the backup + // Log that we are perform the backup by renaming the file log.log("The local item is out-of-sync with OneDrive, renaming to preserve existing file and prevent local data loss: ", path, " -> ", newPath); // Are we in a --dry-run scenario? if (!dryRun) { + // Not a --dry-run scenario - do the file rename + // // There are 2 options to rename a file // rename() - https://dlang.org/library/std/file/rename.html // std.file.copy() - https://dlang.org/library/std/file/copy.html @@ -67,7 +73,7 @@ void safeBackup(const(char)[] path, bool dryRun) { // std.file.copy // Copy file from to file to. File timestamps are preserved. File attributes are preserved, if preserve equals Yes.preserveAttributes // - // Use rename() as it Linux is POSIX compliant, we have an atomic operation where at no point in time the 'to' is missing. + // Use rename() as Linux is POSIX compliant, we have an atomic operation where at no point in time the 'to' is missing. rename(path, newPath); } else { log.vdebug("DRY-RUN: Skipping renaming local file to preserve existing file and prevent data loss: ", path, " -> ", newPath); @@ -79,16 +85,6 @@ void safeRemove(const(char)[] path) { if (exists(path)) remove(path); } -// returns the CRC32 hex string of a file -string computeCRC32(string path) { - CRC32 crc; - auto file = File(path, "rb"); - foreach (ubyte[] data; chunks(file, 4096)) { - crc.put(data); - } - return crc.finish().toHexString().dup; -} - // returns the SHA1 hash hex string of a file string computeSha1Hash(string path) { SHA1 sha; @@ -808,4 +804,14 @@ bool entrypointExists() { string entrypointPath = buildNormalizedPath(buildPath("/", "entrypoint.sh")); // return if path exists return exists(entrypointPath); -} \ No newline at end of file +} + +// Generate a random alphanumeric string +string generateAlphanumericString() { + auto asciiLetters = to!(dchar[])(letters); + auto asciiDigits = to!(dchar[])(digits); + dchar[16] randomString; + fill(randomString[], randomCover(chain(asciiLetters, asciiDigits), rndGen)); + return to!string(randomString); +} +