Add resumable session uploads

* Check the system for any session files, that indicate that a session upload did not complete
* Change from using a CRC32 of the file, to a random 16 character alphanumeric string to use as the session filename extension as it is not computationally valid to do a CRC32 on large files
This commit is contained in:
abraunegg 2023-11-09 12:20:11 +11:00
parent 423bf07b77
commit bcc77b79eb
6 changed files with 260 additions and 33 deletions

View file

@ -84,6 +84,15 @@ class ClientSideFiltering {
return true;
}
// Shutdown components
void shutdown() {
object.destroy(appConfig);
object.destroy(paths);
object.destroy(businessSharedItemsList);
object.destroy(fileMask);
object.destroy(directoryMask);
}
// Load sync_list file if it exists
void loadSyncList(string filepath) {
// open file as read only

View file

@ -127,7 +127,7 @@ class ApplicationConfig {
// - What is the home path of the actual 'user' that is running the application
string defaultHomePath = "";
// - What is the config path for the application. By default, this is ~/.config/onedrive but can be overridden by using --confdir
private string configDirName = defaultConfigDirName;
string configDirName = defaultConfigDirName;
// - In case we have to use a system config directory such as '/etc/onedrive' or similar, store that path in this variable
private string systemConfigDirName = "";
// - Store the configured converted octal value for directory permissions

View file

@ -285,7 +285,7 @@ int main(string[] cliArgs) {
// Force a synchronization of a specific folder, only when using --synchronize --single-directory and ignoring all non-default skip_dir and skip_file rules
if (appConfig.getValueBool("force_sync")) {
// appConfig.checkForBasicOptionConflicts() has already checked for the basic requirements for --force-sync
log.log("\nWARNING: Overriding application configuration to use application defaults for skip_dir and skip_file due to --synch --single-directory --force-sync being used");
log.log("\nWARNING: Overriding application configuration to use application defaults for skip_dir and skip_file due to --sync --single-directory --force-sync being used");
bool forceSyncRiskAcceptance = appConfig.displayForceSyncRiskForAcceptance();
log.vdebug("Returned --force-sync risk acceptance: ", forceSyncRiskAcceptance);
// Action based on user response
@ -516,6 +516,14 @@ int main(string[] cliArgs) {
string localPath = ".";
string remotePath = "/";
// Check if there are interrupted upload session(s)
if (syncEngineInstance.checkForInterruptedSessionUploads) {
// Need to re-process the session upload files to resume the failed session uploads
log.log("There are interrupted session uploads that need to be resumed ...");
// Process the session upload files
syncEngineInstance.processForInterruptedSessionUploads();
}
// Are we doing a single directory operation (--single-directory) ?
if (!appConfig.getValueString("single_directory").empty) {
// Set singleDirectoryPath
@ -915,6 +923,7 @@ void performStandardExitProcess(string scopeCaller) {
// Shutdown the client side filtering objects
if (selectiveSync !is null) {
log.vdebug("Shutdown Client Side Filtering instance");
selectiveSync.shutdown();
object.destroy(selectiveSync);
}

View file

@ -796,6 +796,12 @@ class OneDriveApi {
return response;
}
// https://dev.onedrive.com/items/upload_large_files.htm
JSONValue requestUploadStatus(string uploadUrl) {
checkAccessTokenExpired();
return get(uploadUrl, true);
}
// https://docs.microsoft.com/en-us/onedrive/developer/rest-api/api/site_search?view=odsp-graph-online
JSONValue o365SiteSearch(string nextLink) {
checkAccessTokenExpired();

View file

@ -91,7 +91,11 @@ class SyncEngine {
// List of local paths, that, when using the OneDrive Business Shared Folders feature, then diabling it, folder still exists locally and online
// This list of local paths need to be skipped
string[] businessSharedFoldersOnlineToSkip;
// List of interrupted uploads session files that need to be resumed
string[] interruptedUploadsSessionFiles;
// List of validated interrupted uploads session JSON items to resume
JSONValue[] jsonItemsToResumeUpload;
// Flag that there were upload or download failures listed
bool syncFailures = false;
// Is sync_list configured
@ -303,8 +307,11 @@ class SyncEngine {
object.destroy(oneDriveApiInstance);
exit(-1);
}
// API was initialised
log.log("Sync Engine Initialised with new Onedrive API instance");
// Shutdown API instance
// Shutdown this API instance, as we will create API instances as required, when required
oneDriveApiInstance.shutdown();
// Free object and memory
object.destroy(oneDriveApiInstance);
@ -1669,7 +1676,7 @@ class SyncEngine {
// The user has configured to ignore data safety checks and overwrite local data rather than preserve & rename
log.vlog("WARNING: Local Data Protection has been disabled. You may experience data loss on this file: ", newItemPath);
} else {
// local data protection is configured, rename the local file
// local data protection is configured, rename the local file, passing in if we are performing a --dry-run or not
safeBackup(newItemPath, dryRun);
}
}
@ -1684,7 +1691,7 @@ class SyncEngine {
// The user has configured to ignore data safety checks and overwrite local data rather than preserve & rename
log.vlog("WARNING: Local Data Protection has been disabled. You may experience data loss on this file: ", newItemPath);
} else {
// local data protection is configured, rename the local file
// local data protection is configured, rename the local file, passing in if we are performing a --dry-run or not
safeBackup(newItemPath, dryRun);
}
}
@ -1966,11 +1973,8 @@ class SyncEngine {
// local file is different to what we know to be true
log.log("The local file to replace (", newItemPath,") has been modified locally since the last download. Renaming it to avoid potential local data loss.");
// do the rename if we are not in a --dry-run scenario
if (!dryRun) {
// Perform the local rename of the existing local file
safeBackup(newItemPath, dryRun);
}
// Perform the local rename of the existing local file, passing in if we are performing a --dry-run or not
safeBackup(newItemPath, dryRun);
}
}
}
@ -3396,7 +3400,7 @@ class SyncEngine {
// Evaluate the returned JSON uploadResponse
// If there was an error uploading the file, uploadResponse should be empty and invalid
if (uploadResponse.type() != JSONType.object){
if (uploadResponse.type() != JSONType.object) {
uploadFailed = true;
skippedExceptionError = true;
}
@ -3525,8 +3529,8 @@ class SyncEngine {
string currentETag;
// As this is a unique thread, the sessionFilePath for where we save the data needs to be unique
// The best way to do this is calculate the CRC32 of the file, and use this as the suffix of the session file we save
string threadUploadSessionFilePath = appConfig.uploadSessionFilePath ~ "." ~ computeCRC32(localFilePath);
// The best way to do this is generate a 10 digit alphanumeric string, and use this as the file extention
string threadUploadSessionFilePath = appConfig.uploadSessionFilePath ~ "." ~ generateAlphanumericString();
// Get the absolute latest object details from online
try {
@ -4740,8 +4744,8 @@ class SyncEngine {
// - All Business | Office365 | SharePoint files > 0 bytes
JSONValue uploadSessionData;
// As this is a unique thread, the sessionFilePath for where we save the data needs to be unique
// The best way to do this is calculate the CRC32 of the file, and use this as the suffix of the session file we save
string threadUploadSessionFilePath = appConfig.uploadSessionFilePath ~ "." ~ computeCRC32(fileToUpload);
// The best way to do this is generate a 10 digit alphanumeric string, and use this as the file extention
string threadUploadSessionFilePath = appConfig.uploadSessionFilePath ~ "." ~ generateAlphanumericString();
// Attempt to upload the > 4MB file using an upload session for all account types
try {
@ -4856,7 +4860,6 @@ class SyncEngine {
log.log("Uploading new file ", fileToUpload, " ... failed.");
displayOneDriveErrorMessage(exception.msg, thisFunctionName);
}
}
} else {
// No Upload URL or nextExpectedRanges or localPath .. not a valid JSON we can use
@ -6999,9 +7002,6 @@ class SyncEngine {
// Query OneDrive for the quota details
void queryOneDriveForQuotaDetails() {
// This function is similar to getRemainingFreeSpace() but is different in data being analysed and output method
JSONValue currentDriveQuota;
string driveId;
@ -7076,4 +7076,201 @@ class SyncEngine {
}
}
// Query the system for session_upload.* files
bool checkForInterruptedSessionUploads() {
bool interruptedUploads = false;
ulong interruptedUploadsCount;
// Scan the filesystem for the files we are interested in, build up interruptedUploadsSessionFiles array
foreach (sessionFile; dirEntries(appConfig.configDirName, "session_upload.*", SpanMode.shallow)) {
// calculate the full path
string tempPath = buildNormalizedPath(buildPath(appConfig.configDirName, sessionFile));
// add to array
interruptedUploadsSessionFiles ~= [tempPath];
}
// Count all 'session_upload' files in appConfig.configDirName
//interruptedUploadsCount = count(dirEntries(appConfig.configDirName, "session_upload.*", SpanMode.shallow));
interruptedUploadsCount = count(interruptedUploadsSessionFiles);
if (interruptedUploadsCount != 0) {
interruptedUploads = true;
}
// return if there are interrupted uploads to process
return interruptedUploads;
}
// Process interrupted 'session_upload' files
void processForInterruptedSessionUploads() {
// For each upload_session file that has been found, process the data to ensure it is still valid
foreach (sessionFilePath; interruptedUploadsSessionFiles) {
if (!validateUploadSessionFileData(sessionFilePath)) {
// Remove upload_session file as it is invalid
// upload_session file file contains an error - cant resume this session
log.vlog("Restore file upload session failed - cleaning up resumable session data file: ", sessionFilePath);
if (exists(sessionFilePath)) {
if (!dryRun) {
remove(sessionFilePath);
}
}
}
}
// At this point we should have an array of JSON items to resume uploading
if (count(jsonItemsToResumeUpload) > 0) {
// there are valid items to resume upload
// Lets deal with all the JSON items that need to be reumed for upload in a batch process
ulong batchSize = appConfig.concurrentThreads;
ulong batchCount = (jsonItemsToResumeUpload.length + batchSize - 1) / batchSize;
ulong batchesProcessed = 0;
foreach (chunk; jsonItemsToResumeUpload.chunks(batchSize)) {
// send an array containing 'appConfig.concurrentThreads' (16) JSON items to resume upload
resumeSessionUploadsInParallel(chunk);
}
}
}
bool validateUploadSessionFileData(string sessionFilePath) {
JSONValue sessionFileData;
// Try and read the text from the session file as a JSON array
try {
sessionFileData = readText(sessionFilePath).parseJSON();
} catch (JSONException e) {
log.vdebug("SESSION-RESUME: Invalid JSON data in: ", sessionFilePath);
return false;
}
// Does the file we wish to resume uploading exist locally still?
if ("localPath" in sessionFileData) {
string sessionLocalFilePath = sessionFileData["localPath"].str;
log.vdebug("SESSION-RESUME: sessionLocalFilePath: ", sessionLocalFilePath);
// Does the file exist?
if (!exists(sessionLocalFilePath)) {
log.vlog("The local file to upload does not exist locally anymore");
return false;
}
// Can we read the file?
if (!readLocalFile(sessionLocalFilePath)) {
// filesystem error already returned if unable to read
return false;
}
} else {
log.vdebug("SESSION-RESUME: No localPath data in: ", sessionFilePath);
return false;
}
// Check the session data for expirationDateTime
if ("expirationDateTime" in sessionFileData) {
auto expiration = SysTime.fromISOExtString(sessionFileData["expirationDateTime"].str);
if (expiration < Clock.currTime()) {
log.vlog("The upload session has expired for: ", sessionFilePath);
return false;
}
} else {
log.vdebug("SESSION-RESUME: No expirationDateTime data in: ", sessionFilePath);
return false;
}
// Check the online upload status, using the uloadURL in sessionFileData
if ("uploadUrl" in sessionFileData) {
JSONValue response;
// Create a new OneDrive API instance
OneDriveApi validateUploadSessionFileDataApiInstance;
validateUploadSessionFileDataApiInstance = new OneDriveApi(appConfig);
validateUploadSessionFileDataApiInstance.initialise();
try {
response = validateUploadSessionFileDataApiInstance.requestUploadStatus(sessionFileData["uploadUrl"].str);
} catch (OneDriveException e) {
// handle any onedrive error response as invalid
log.vdebug("SESSION-RESUME: Invalid response when using uploadUrl in: ", sessionFilePath);
return false;
}
// Shutdown API instance
validateUploadSessionFileDataApiInstance.shutdown();
// Free object and memory
object.destroy(validateUploadSessionFileDataApiInstance);
// Do we have a valid response from OneDrive?
if (response.type() == JSONType.object) {
// Valid JSON object was returned
if (("expirationDateTime" in response) && ("nextExpectedRanges" in response)) {
// The 'uploadUrl' is valid, and the response contains elements we need
sessionFileData["expirationDateTime"] = response["expirationDateTime"];
sessionFileData["nextExpectedRanges"] = response["nextExpectedRanges"];
if (sessionFileData["nextExpectedRanges"].array.length == 0) {
log.vlog("The upload session was already completed");
return false;
}
} else {
log.vdebug("SESSION-RESUME: No expirationDateTime & nextExpectedRanges data in Microsoft OneDrive API response: ", response);
return false;
}
} else {
// not a JSON object
log.vlog("Restore file upload session failed - invalid response from Microsoft OneDrive");
return false;
}
} else {
log.vdebug("SESSION-RESUME: No uploadUrl data in: ", sessionFilePath);
return false;
}
// Add 'sessionFilePath' to 'sessionFileData' so that it can be used when we re-use the JSON data to resume the upload
sessionFileData["sessionFilePath"] = sessionFilePath;
// Add sessionFileData to jsonItemsToResumeUpload as it is now valid
jsonItemsToResumeUpload ~= sessionFileData;
return true;
}
void resumeSessionUploadsInParallel(JSONValue[] array) {
// This function recieved an array of 16 JSON items to resume upload
foreach (i, jsonItemToResume; taskPool.parallel(array)) {
// Take each JSON item and resume upload using the JSON data
JSONValue uploadResponse;
OneDriveApi uploadFileOneDriveApiInstance;
uploadFileOneDriveApiInstance = new OneDriveApi(appConfig);
uploadFileOneDriveApiInstance.initialise();
// Pull out data from this JSON element
string threadUploadSessionFilePath = jsonItemToResume["sessionFilePath"].str;
ulong thisFileSizeLocal = getSize(jsonItemToResume["localPath"].str);
// Try to resume the session upload using the provided data
try {
uploadResponse = performSessionFileUpload(uploadFileOneDriveApiInstance, thisFileSizeLocal, jsonItemToResume, threadUploadSessionFilePath);
} catch (OneDriveException exception) {
writeln("CODING TO DO: Handle an exception when performing a resume session upload");
}
// Was the response from the OneDrive API a valid JSON item?
if (uploadResponse.type() == JSONType.object) {
// A valid JSON object was returned - session resumption upload sucessful
// Save JSON item in database
saveItem(uploadResponse);
} else {
// No valid response was returned
writeln("CODING TO DO: what to do when session upload resumption JSON data is not valid ... nothing ? error message ?");
}
// Shutdown API instance
uploadFileOneDriveApiInstance.shutdown();
// Free object and memory
object.destroy(uploadFileOneDriveApiInstance);
}
}
}

View file

@ -23,6 +23,10 @@ import core.stdc.stdlib;
import core.thread;
import std.math;
import std.format;
import std.random;
import std.array;
import std.ascii;
import std.range;
// What other modules that we have created do we need to import?
import log;
@ -52,11 +56,13 @@ void safeBackup(const(char)[] path, bool dryRun) {
}
newPath ~= ext;
// Perform the backup
// Log that we are perform the backup by renaming the file
log.log("The local item is out-of-sync with OneDrive, renaming to preserve existing file and prevent local data loss: ", path, " -> ", newPath);
// Are we in a --dry-run scenario?
if (!dryRun) {
// Not a --dry-run scenario - do the file rename
//
// There are 2 options to rename a file
// rename() - https://dlang.org/library/std/file/rename.html
// std.file.copy() - https://dlang.org/library/std/file/copy.html
@ -67,7 +73,7 @@ void safeBackup(const(char)[] path, bool dryRun) {
// std.file.copy
// Copy file from to file to. File timestamps are preserved. File attributes are preserved, if preserve equals Yes.preserveAttributes
//
// Use rename() as it Linux is POSIX compliant, we have an atomic operation where at no point in time the 'to' is missing.
// Use rename() as Linux is POSIX compliant, we have an atomic operation where at no point in time the 'to' is missing.
rename(path, newPath);
} else {
log.vdebug("DRY-RUN: Skipping renaming local file to preserve existing file and prevent data loss: ", path, " -> ", newPath);
@ -79,16 +85,6 @@ void safeRemove(const(char)[] path) {
if (exists(path)) remove(path);
}
// returns the CRC32 hex string of a file
string computeCRC32(string path) {
CRC32 crc;
auto file = File(path, "rb");
foreach (ubyte[] data; chunks(file, 4096)) {
crc.put(data);
}
return crc.finish().toHexString().dup;
}
// returns the SHA1 hash hex string of a file
string computeSha1Hash(string path) {
SHA1 sha;
@ -808,4 +804,14 @@ bool entrypointExists() {
string entrypointPath = buildNormalizedPath(buildPath("/", "entrypoint.sh"));
// return if path exists
return exists(entrypointPath);
}
}
// Generate a random alphanumeric string
string generateAlphanumericString() {
auto asciiLetters = to!(dchar[])(letters);
auto asciiDigits = to!(dchar[])(digits);
dchar[16] randomString;
fill(randomString[], randomCover(chain(asciiLetters, asciiDigits), rndGen));
return to!string(randomString);
}