From a92221bb50d54c952d8df05ffc28e9c05e620840 Mon Sep 17 00:00:00 2001 From: JC-comp <147694781+JC-comp@users.noreply.github.com> Date: Mon, 12 Feb 2024 14:12:20 +0800 Subject: [PATCH] Add support for batched local monitor processing (#2609) * Fix file upload fallback for all scenario * Add support for batched monitor * Add recursive match * Adjust logging output * Add error handling --- src/main.d | 11 +- src/monitor.d | 374 +++++++++++++++++++++++++++++++++----------------- src/sync.d | 374 ++++++++++++++++++++++++++------------------------ 3 files changed, 448 insertions(+), 311 deletions(-) diff --git a/src/main.d b/src/main.d index 32bed8c2..67f36a65 100644 --- a/src/main.d +++ b/src/main.d @@ -748,16 +748,11 @@ int main(string[] cliArgs) { }; // Delegated function for when inotify detects a local file has been changed - filesystemMonitor.onFileChanged = delegate(string path) { + filesystemMonitor.onFileChanged = delegate(string[] changedLocalFilesToUploadToOneDrive) { // Handle a potentially locally changed file // Logging for this event moved to handleLocalFileTrigger() due to threading and false triggers from scanLocalFilesystemPathForNewData() above - try { - syncEngineInstance.handleLocalFileTrigger(path); - } catch (CurlException e) { - addLogEntry("Offline, cannot upload changed item: " ~ path, ["verbose"]); - } catch(Exception e) { - addLogEntry("Cannot upload file changes/creation: " ~ e.msg, ["info", "notify"]); - } + addLogEntry("[M] Total number of local file changed: " ~ to!string(changedLocalFilesToUploadToOneDrive.length)); + syncEngineInstance.handleLocalFileTrigger(changedLocalFilesToUploadToOneDrive); }; // Delegated function for when inotify detects a delete event diff --git a/src/monitor.d b/src/monitor.d index d046829b..694771e6 100644 --- a/src/monitor.d +++ b/src/monitor.d @@ -8,6 +8,7 @@ import core.sys.linux.sys.inotify; import core.sys.posix.poll; import core.sys.posix.unistd; import core.sys.posix.sys.select; +import core.thread; import core.time; import std.algorithm; import std.concurrency; @@ -135,7 +136,6 @@ shared class MonitorBackgroundWorker { } } - void startMonitorJob(shared(MonitorBackgroundWorker) worker, Tid callerTid) { try { @@ -146,6 +146,96 @@ void startMonitorJob(shared(MonitorBackgroundWorker) worker, Tid callerTid) worker.shutdown(); } +enum ActionType { + moved, + deleted, + changed, + createDir +} + +struct Action { + ActionType type; + bool skipped; + string src; + string dst; +} + +struct ActionHolder { + Action[] actions; + ulong[string] srcMap; + + void append(ActionType type, string src, string dst=null) { + ulong[] pendingTargets; + switch (type) { + case ActionType.changed: + if (src in srcMap && actions[srcMap[src]].type == ActionType.changed) { + // skip duplicate operations + return; + } + break; + case ActionType.createDir: + break; + case ActionType.deleted: + if (src in srcMap) { + ulong pendingTarget = srcMap[src]; + // Skip operations require reading local file that is gone + switch (actions[pendingTarget].type) { + case ActionType.changed: + case ActionType.createDir: + actions[srcMap[src]].skipped = true; + srcMap.remove(src); + break; + default: + break; + } + } + break; + case ActionType.moved: + for(int i = 0; i < actions.length; i++) { + // Only match for latest operation + if (actions[i].src in srcMap) { + switch (actions[i].type) { + case ActionType.changed: + case ActionType.createDir: + // check if the source is the prefix of the target + string prefix = src ~ "/"; + string target = actions[i].src; + if (prefix[0] != '.') + prefix = "./" ~ prefix; + if (target[0] != '.') + target = "./" ~ target; + string comm = commonPrefix(prefix, target); + if (src == actions[i].src || comm.length == prefix.length) { + // Hold operations require reading local file that is moved after the target is moved online + pendingTargets ~= i; + actions[i].skipped = true; + srcMap.remove(actions[i].src); + if (comm.length == target.length) + actions[i].src = dst; + else + actions[i].src = dst ~ target[comm.length - 1 .. target.length]; + } + break; + default: + break; + } + } + } + break; + default: + break; + } + actions ~= Action(type, false, src, dst); + srcMap[src] = actions.length - 1; + + foreach (pendingTarget; pendingTargets) { + actions ~= actions[pendingTarget]; + actions[$-1].skipped = false; + srcMap[actions[$-1].src] = actions.length - 1; + } + } +} + final class Monitor { // Class variables ApplicationConfig appConfig; @@ -171,12 +261,14 @@ final class Monitor { // Configure function delegates void delegate(string path) onDirCreated; - void delegate(string path) onFileChanged; + void delegate(string[] path) onFileChanged; void delegate(string path) onDelete; void delegate(string from, string to) onMove; // List of paths that were moved, not deleted bool[string] movedNotDeleted; + + ActionHolder actionHolder; // Configure the class varaible to consume the application configuration including selective sync this(ApplicationConfig appConfig, ClientSideFiltering selectiveSync) { @@ -371,138 +463,145 @@ final class Monitor { }; while (true) { - int ret = poll(&fds, 1, 0); - if (ret == -1) throw new MonitorException("poll failed"); - else if (ret == 0) break; // no events available + bool hasNotification = false; + while (true) { + int ret = poll(&fds, 1, 0); + if (ret == -1) throw new MonitorException("poll failed"); + else if (ret == 0) break; // no events available + hasNotification = true; + size_t length = read(worker.fd, buffer.ptr, buffer.length); + if (length == -1) throw new MonitorException("read failed"); - size_t length = read(worker.fd, buffer.ptr, buffer.length); - if (length == -1) throw new MonitorException("read failed"); - - int i = 0; - while (i < length) { - inotify_event *event = cast(inotify_event*) &buffer[i]; - string path; - string evalPath; - - // inotify event debug - addLogEntry("inotify event wd: " ~ to!string(event.wd), ["debug"]); - addLogEntry("inotify event mask: " ~ to!string(event.mask), ["debug"]); - addLogEntry("inotify event cookie: " ~ to!string(event.cookie), ["debug"]); - addLogEntry("inotify event len: " ~ to!string(event.len), ["debug"]); - addLogEntry("inotify event name: " ~ to!string(event.name), ["debug"]); - - // inotify event handling - if (event.mask & IN_ACCESS) addLogEntry("inotify event flag: IN_ACCESS", ["debug"]); - if (event.mask & IN_MODIFY) addLogEntry("inotify event flag: IN_MODIFY", ["debug"]); - if (event.mask & IN_ATTRIB) addLogEntry("inotify event flag: IN_ATTRIB", ["debug"]); - if (event.mask & IN_CLOSE_WRITE) addLogEntry("inotify event flag: IN_CLOSE_WRITE", ["debug"]); - if (event.mask & IN_CLOSE_NOWRITE) addLogEntry("inotify event flag: IN_CLOSE_NOWRITE", ["debug"]); - if (event.mask & IN_MOVED_FROM) addLogEntry("inotify event flag: IN_MOVED_FROM", ["debug"]); - if (event.mask & IN_MOVED_TO) addLogEntry("inotify event flag: IN_MOVED_TO", ["debug"]); - if (event.mask & IN_CREATE) addLogEntry("inotify event flag: IN_CREATE", ["debug"]); - if (event.mask & IN_DELETE) addLogEntry("inotify event flag: IN_DELETE", ["debug"]); - if (event.mask & IN_DELETE_SELF) addLogEntry("inotify event flag: IN_DELETE_SELF", ["debug"]); - if (event.mask & IN_MOVE_SELF) addLogEntry("inotify event flag: IN_MOVE_SELF", ["debug"]); - if (event.mask & IN_UNMOUNT) addLogEntry("inotify event flag: IN_UNMOUNT", ["debug"]); - if (event.mask & IN_Q_OVERFLOW) addLogEntry("inotify event flag: IN_Q_OVERFLOW", ["debug"]); - if (event.mask & IN_IGNORED) addLogEntry("inotify event flag: IN_IGNORED", ["debug"]); - if (event.mask & IN_CLOSE) addLogEntry("inotify event flag: IN_CLOSE", ["debug"]); - if (event.mask & IN_MOVE) addLogEntry("inotify event flag: IN_MOVE", ["debug"]); - if (event.mask & IN_ONLYDIR) addLogEntry("inotify event flag: IN_ONLYDIR", ["debug"]); - if (event.mask & IN_DONT_FOLLOW) addLogEntry("inotify event flag: IN_DONT_FOLLOW", ["debug"]); - if (event.mask & IN_EXCL_UNLINK) addLogEntry("inotify event flag: IN_EXCL_UNLINK", ["debug"]); - if (event.mask & IN_MASK_ADD) addLogEntry("inotify event flag: IN_MASK_ADD", ["debug"]); - if (event.mask & IN_ISDIR) addLogEntry("inotify event flag: IN_ISDIR", ["debug"]); - if (event.mask & IN_ONESHOT) addLogEntry("inotify event flag: IN_ONESHOT", ["debug"]); - if (event.mask & IN_ALL_EVENTS) addLogEntry("inotify event flag: IN_ALL_EVENTS", ["debug"]); - - // skip events that need to be ignored - if (event.mask & IN_IGNORED) { - // forget the directory associated to the watch descriptor - wdToDirName.remove(event.wd); - goto skip; - } else if (event.mask & IN_Q_OVERFLOW) { - throw new MonitorException("inotify overflow, inotify events will be missing"); - } - - // if the event is not to be ignored, obtain path - path = getPath(event); - // configure the skip_dir & skip skip_file comparison item - evalPath = path.strip('.'); - - // Skip events that should be excluded based on application configuration - // We cant use isDir or isFile as this information is missing from the inotify event itself - // Thus this causes a segfault when attempting to query this - https://github.com/abraunegg/onedrive/issues/995 - - // Based on the 'type' of event & object type (directory or file) check that path against the 'right' user exclusions - // Directory events should only be compared against skip_dir and file events should only be compared against skip_file - if (event.mask & IN_ISDIR) { - // The event in question contains IN_ISDIR event mask, thus highly likely this is an event on a directory - // This due to if the user has specified in skip_dir an exclusive path: '/path' - that is what must be matched - if (selectiveSync.isDirNameExcluded(evalPath)) { - // The path to evaluate matches a path that the user has configured to skip + int i = 0; + while (i < length) { + inotify_event *event = cast(inotify_event*) &buffer[i]; + string path; + string evalPath; + + // inotify event debug + addLogEntry("inotify event wd: " ~ to!string(event.wd), ["debug"]); + addLogEntry("inotify event mask: " ~ to!string(event.mask), ["debug"]); + addLogEntry("inotify event cookie: " ~ to!string(event.cookie), ["debug"]); + addLogEntry("inotify event len: " ~ to!string(event.len), ["debug"]); + addLogEntry("inotify event name: " ~ to!string(event.name), ["debug"]); + + // inotify event handling + if (event.mask & IN_ACCESS) addLogEntry("inotify event flag: IN_ACCESS", ["debug"]); + if (event.mask & IN_MODIFY) addLogEntry("inotify event flag: IN_MODIFY", ["debug"]); + if (event.mask & IN_ATTRIB) addLogEntry("inotify event flag: IN_ATTRIB", ["debug"]); + if (event.mask & IN_CLOSE_WRITE) addLogEntry("inotify event flag: IN_CLOSE_WRITE", ["debug"]); + if (event.mask & IN_CLOSE_NOWRITE) addLogEntry("inotify event flag: IN_CLOSE_NOWRITE", ["debug"]); + if (event.mask & IN_MOVED_FROM) addLogEntry("inotify event flag: IN_MOVED_FROM", ["debug"]); + if (event.mask & IN_MOVED_TO) addLogEntry("inotify event flag: IN_MOVED_TO", ["debug"]); + if (event.mask & IN_CREATE) addLogEntry("inotify event flag: IN_CREATE", ["debug"]); + if (event.mask & IN_DELETE) addLogEntry("inotify event flag: IN_DELETE", ["debug"]); + if (event.mask & IN_DELETE_SELF) addLogEntry("inotify event flag: IN_DELETE_SELF", ["debug"]); + if (event.mask & IN_MOVE_SELF) addLogEntry("inotify event flag: IN_MOVE_SELF", ["debug"]); + if (event.mask & IN_UNMOUNT) addLogEntry("inotify event flag: IN_UNMOUNT", ["debug"]); + if (event.mask & IN_Q_OVERFLOW) addLogEntry("inotify event flag: IN_Q_OVERFLOW", ["debug"]); + if (event.mask & IN_IGNORED) addLogEntry("inotify event flag: IN_IGNORED", ["debug"]); + if (event.mask & IN_CLOSE) addLogEntry("inotify event flag: IN_CLOSE", ["debug"]); + if (event.mask & IN_MOVE) addLogEntry("inotify event flag: IN_MOVE", ["debug"]); + if (event.mask & IN_ONLYDIR) addLogEntry("inotify event flag: IN_ONLYDIR", ["debug"]); + if (event.mask & IN_DONT_FOLLOW) addLogEntry("inotify event flag: IN_DONT_FOLLOW", ["debug"]); + if (event.mask & IN_EXCL_UNLINK) addLogEntry("inotify event flag: IN_EXCL_UNLINK", ["debug"]); + if (event.mask & IN_MASK_ADD) addLogEntry("inotify event flag: IN_MASK_ADD", ["debug"]); + if (event.mask & IN_ISDIR) addLogEntry("inotify event flag: IN_ISDIR", ["debug"]); + if (event.mask & IN_ONESHOT) addLogEntry("inotify event flag: IN_ONESHOT", ["debug"]); + if (event.mask & IN_ALL_EVENTS) addLogEntry("inotify event flag: IN_ALL_EVENTS", ["debug"]); + + // skip events that need to be ignored + if (event.mask & IN_IGNORED) { + // forget the directory associated to the watch descriptor + wdToDirName.remove(event.wd); goto skip; + } else if (event.mask & IN_Q_OVERFLOW) { + throw new MonitorException("inotify overflow, inotify events will be missing"); } - } else { - // The event in question missing the IN_ISDIR event mask, thus highly likely this is an event on a file - // This due to if the user has specified in skip_file an exclusive path: '/path/file' - that is what must be matched - if (selectiveSync.isFileNameExcluded(evalPath)) { - // The path to evaluate matches a file that the user has configured to skip - goto skip; - } - } - - // is the path, excluded via sync_list - if (selectiveSync.isPathExcludedViaSyncList(path)) { - // The path to evaluate matches a directory or file that the user has configured not to include in the sync - goto skip; - } - - // handle the inotify events - if (event.mask & IN_MOVED_FROM) { - addLogEntry("event IN_MOVED_FROM: " ~ path, ["debug"]); - cookieToPath[event.cookie] = path; - movedNotDeleted[path] = true; // Mark as moved, not deleted - } else if (event.mask & IN_MOVED_TO) { - addLogEntry("event IN_MOVED_TO: " ~ path, ["debug"]); - if (event.mask & IN_ISDIR) addRecursive(path); - auto from = event.cookie in cookieToPath; - if (from) { - cookieToPath.remove(event.cookie); - if (useCallbacks) onMove(*from, path); - movedNotDeleted.remove(*from); // Clear moved status + + // if the event is not to be ignored, obtain path + path = getPath(event); + // configure the skip_dir & skip skip_file comparison item + evalPath = path.strip('.'); + + // Skip events that should be excluded based on application configuration + // We cant use isDir or isFile as this information is missing from the inotify event itself + // Thus this causes a segfault when attempting to query this - https://github.com/abraunegg/onedrive/issues/995 + + // Based on the 'type' of event & object type (directory or file) check that path against the 'right' user exclusions + // Directory events should only be compared against skip_dir and file events should only be compared against skip_file + if (event.mask & IN_ISDIR) { + // The event in question contains IN_ISDIR event mask, thus highly likely this is an event on a directory + // This due to if the user has specified in skip_dir an exclusive path: '/path' - that is what must be matched + if (selectiveSync.isDirNameExcluded(evalPath)) { + // The path to evaluate matches a path that the user has configured to skip + goto skip; + } } else { - // Handle file moved in from outside - if (event.mask & IN_ISDIR) { - if (useCallbacks) onDirCreated(path); - } else { - if (useCallbacks) onFileChanged(path); + // The event in question missing the IN_ISDIR event mask, thus highly likely this is an event on a file + // This due to if the user has specified in skip_file an exclusive path: '/path/file' - that is what must be matched + if (selectiveSync.isFileNameExcluded(evalPath)) { + // The path to evaluate matches a file that the user has configured to skip + goto skip; } } - } else if (event.mask & IN_CREATE) { - addLogEntry("event IN_CREATE: " ~ path, ["debug"]); - if (event.mask & IN_ISDIR) { - addRecursive(path); - if (useCallbacks) onDirCreated(path); + + // is the path, excluded via sync_list + if (selectiveSync.isPathExcludedViaSyncList(path)) { + // The path to evaluate matches a directory or file that the user has configured not to include in the sync + goto skip; } - } else if (event.mask & IN_DELETE) { - if (path in movedNotDeleted) { - movedNotDeleted.remove(path); // Ignore delete for moved files + + // handle the inotify events + if (event.mask & IN_MOVED_FROM) { + addLogEntry("event IN_MOVED_FROM: " ~ path, ["debug"]); + cookieToPath[event.cookie] = path; + movedNotDeleted[path] = true; // Mark as moved, not deleted + } else if (event.mask & IN_MOVED_TO) { + addLogEntry("event IN_MOVED_TO: " ~ path, ["debug"]); + if (event.mask & IN_ISDIR) addRecursive(path); + auto from = event.cookie in cookieToPath; + if (from) { + cookieToPath.remove(event.cookie); + if (useCallbacks) actionHolder.append(ActionType.moved, *from, path); + movedNotDeleted.remove(*from); // Clear moved status + } else { + // Handle file moved in from outside + if (event.mask & IN_ISDIR) { + if (useCallbacks) actionHolder.append(ActionType.createDir, path); + } else { + if (useCallbacks) actionHolder.append(ActionType.changed, path); + } + } + } else if (event.mask & IN_CREATE) { + addLogEntry("event IN_CREATE: " ~ path, ["debug"]); + if (event.mask & IN_ISDIR) { + addRecursive(path); + if (useCallbacks) actionHolder.append(ActionType.createDir, path); + } + } else if (event.mask & IN_DELETE) { + if (path in movedNotDeleted) { + movedNotDeleted.remove(path); // Ignore delete for moved files + } else { + addLogEntry("event IN_DELETE: " ~ path, ["debug"]); + if (useCallbacks) actionHolder.append(ActionType.deleted, path); + } + } else if ((event.mask & IN_CLOSE_WRITE) && !(event.mask & IN_ISDIR)) { + addLogEntry("event IN_CLOSE_WRITE and not IN_ISDIR: " ~ path, ["debug"]); + if (useCallbacks) actionHolder.append(ActionType.changed, path); } else { - addLogEntry("event IN_DELETE: " ~ path, ["debug"]); - if (useCallbacks) onDelete(path); + addLogEntry("event unhandled: " ~ path, ["debug"]); + assert(0); } - } else if ((event.mask & IN_CLOSE_WRITE) && !(event.mask & IN_ISDIR)) { - addLogEntry("event IN_CLOSE_WRITE and not IN_ISDIR: " ~ path, ["debug"]); - if (useCallbacks) onFileChanged(path); - } else { - addLogEntry("event unhandled: " ~ path, ["debug"]); - assert(0); - } - skip: - i += inotify_event.sizeof + event.len; + skip: + i += inotify_event.sizeof + event.len; + } + Thread.sleep(dur!"seconds"(1)); } + if (!hasNotification) break; + processChanges(); + // Assume that the items moved outside the watched directory have been deleted foreach (cookie, path; cookieToPath) { addLogEntry("Deleting cookie|watch (post loop): " ~ path, ["debug"]); @@ -515,6 +614,35 @@ final class Monitor { } } + private void processChanges() { + string[] changes; + + foreach(action; actionHolder.actions) { + if (action.skipped) + continue; + switch (action.type) { + case ActionType.changed: + changes ~= action.src; + break; + case ActionType.deleted: + onDelete(action.src); + break; + case ActionType.createDir: + onDirCreated(action.src); + break; + case ActionType.moved: + onMove(action.src, action.dst); + break; + default: + break; + } + } + if (!changes.empty) + onFileChanged(changes); + + object.destroy(actionHolder); + } + Tid watch() { initialised = true; return spawn(&startMonitorJob, worker, thisTid); diff --git a/src/sync.d b/src/sync.d index b0d8490f..7bdf296e 100644 --- a/src/sync.d +++ b/src/sync.d @@ -3486,154 +3486,157 @@ class SyncEngine { // For each batch of files to upload, upload the changed data to OneDrive foreach (chunk; databaseItemsWhereContentHasChanged.chunks(batchSize)) { - uploadChangedLocalFileToOneDrive(chunk); + processChangedLocalItemsToUploadInParallel(chunk); + } + } + + // Upload the changed file batches in parallel + void processChangedLocalItemsToUploadInParallel(string[3][] array) { + foreach (i, localItemDetails; taskPool.parallel(array)) { + addLogEntry("Upload Thread " ~ to!string(i) ~ " Starting: " ~ to!string(Clock.currTime()), ["debug"]); + uploadChangedLocalFileToOneDrive(localItemDetails); + addLogEntry("Upload Thread " ~ to!string(i) ~ " Finished: " ~ to!string(Clock.currTime()), ["debug"]); } } // Upload changed local files to OneDrive in parallel - void uploadChangedLocalFileToOneDrive(string[3][] array) { - - foreach (i, localItemDetails; taskPool.parallel(array)) { + void uploadChangedLocalFileToOneDrive(string[3] localItemDetails) { - addLogEntry("Thread " ~ to!string(i) ~ " Starting: " ~ to!string(Clock.currTime()), ["debug"]); + // These are the details of the item we need to upload + string changedItemParentId = localItemDetails[0]; + string changedItemId = localItemDetails[1]; + string localFilePath = localItemDetails[2]; - // These are the details of the item we need to upload - string changedItemParentId = localItemDetails[0]; - string changedItemId = localItemDetails[1]; - string localFilePath = localItemDetails[2]; - - // How much space is remaining on OneDrive - ulong remainingFreeSpace; - // Did the upload fail? - bool uploadFailed = false; - // Did we skip due to exceeding maximum allowed size? - bool skippedMaxSize = false; - // Did we skip to an exception error? - bool skippedExceptionError = false; - - // Unfortunatly, we cant store an array of Item's ... so we have to re-query the DB again - unavoidable extra processing here - // This is because the Item[] has no other functions to allow is to parallel process those elements, so we have to use a string array as input to this function - Item dbItem; - itemDB.selectById(changedItemParentId, changedItemId, dbItem); + addLogEntry("uploadChangedLocalFileToOneDrive: " ~ localFilePath, ["debug"]); - // Fetch the details from cachedOnlineDriveData - // - cachedOnlineDriveData.quotaRestricted; - // - cachedOnlineDriveData.quotaAvailable; - // - cachedOnlineDriveData.quotaRemaining; - driveDetailsCache cachedOnlineDriveData; - cachedOnlineDriveData = getDriveDetails(dbItem.driveId); - remainingFreeSpace = cachedOnlineDriveData.quotaRemaining; - - // Get the file size from the actual file - ulong thisFileSizeLocal = getSize(localFilePath); - // Get the file size from the DB data - ulong thisFileSizeFromDB; - if (!dbItem.size.empty) { - thisFileSizeFromDB = to!ulong(dbItem.size); - } else { - thisFileSizeFromDB = 0; - } - - // 'remainingFreeSpace' online includes the current file online - // We need to remove the online file (add back the existing file size) then take away the new local file size to get a new approximate value - ulong calculatedSpaceOnlinePostUpload = (remainingFreeSpace + thisFileSizeFromDB) - thisFileSizeLocal; - - // Based on what we know, for this thread - can we safely upload this modified local file? - addLogEntry("This Thread Estimated Free Space Online: " ~ to!string(remainingFreeSpace), ["debug"]); - addLogEntry("This Thread Calculated Free Space Online Post Upload: " ~ to!string(calculatedSpaceOnlinePostUpload), ["debug"]); - JSONValue uploadResponse; - - bool spaceAvailableOnline = false; - // If 'personal' accounts, if driveId == defaultDriveId, then we will have quota data - cachedOnlineDriveData.quotaRemaining will be updated so it can be reused - // If 'personal' accounts, if driveId != defaultDriveId, then we will not have quota data - cachedOnlineDriveData.quotaRestricted will be set as true - // If 'business' accounts, if driveId == defaultDriveId, then we will potentially have quota data - cachedOnlineDriveData.quotaRemaining will be updated so it can be reused - // If 'business' accounts, if driveId != defaultDriveId, then we will potentially have quota data, but it most likely will be a 0 value - cachedOnlineDriveData.quotaRestricted will be set as true - - // Is there quota available for the given drive where we are uploading to? - if (cachedOnlineDriveData.quotaAvailable) { - // Our query told us we have free space online .. if we upload this file, will we exceed space online - thus upload will fail during upload? - if (calculatedSpaceOnlinePostUpload > 0) { - // Based on this thread action, we beleive that there is space available online to upload - proceed - spaceAvailableOnline = true; - } - } - - // Is quota being restricted? - if (cachedOnlineDriveData.quotaRestricted) { - // Space available online is being restricted - so we have no way to really know if there is space available online + // How much space is remaining on OneDrive + ulong remainingFreeSpace; + // Did the upload fail? + bool uploadFailed = false; + // Did we skip due to exceeding maximum allowed size? + bool skippedMaxSize = false; + // Did we skip to an exception error? + bool skippedExceptionError = false; + + // Unfortunatly, we cant store an array of Item's ... so we have to re-query the DB again - unavoidable extra processing here + // This is because the Item[] has no other functions to allow is to parallel process those elements, so we have to use a string array as input to this function + Item dbItem; + itemDB.selectById(changedItemParentId, changedItemId, dbItem); + + // Fetch the details from cachedOnlineDriveData + // - cachedOnlineDriveData.quotaRestricted; + // - cachedOnlineDriveData.quotaAvailable; + // - cachedOnlineDriveData.quotaRemaining; + driveDetailsCache cachedOnlineDriveData; + cachedOnlineDriveData = getDriveDetails(dbItem.driveId); + remainingFreeSpace = cachedOnlineDriveData.quotaRemaining; + + // Get the file size from the actual file + ulong thisFileSizeLocal = getSize(localFilePath); + // Get the file size from the DB data + ulong thisFileSizeFromDB; + if (!dbItem.size.empty) { + thisFileSizeFromDB = to!ulong(dbItem.size); + } else { + thisFileSizeFromDB = 0; + } + + // 'remainingFreeSpace' online includes the current file online + // We need to remove the online file (add back the existing file size) then take away the new local file size to get a new approximate value + ulong calculatedSpaceOnlinePostUpload = (remainingFreeSpace + thisFileSizeFromDB) - thisFileSizeLocal; + + // Based on what we know, for this thread - can we safely upload this modified local file? + addLogEntry("This Thread Estimated Free Space Online: " ~ to!string(remainingFreeSpace), ["debug"]); + addLogEntry("This Thread Calculated Free Space Online Post Upload: " ~ to!string(calculatedSpaceOnlinePostUpload), ["debug"]); + JSONValue uploadResponse; + + bool spaceAvailableOnline = false; + // If 'personal' accounts, if driveId == defaultDriveId, then we will have quota data - cachedOnlineDriveData.quotaRemaining will be updated so it can be reused + // If 'personal' accounts, if driveId != defaultDriveId, then we will not have quota data - cachedOnlineDriveData.quotaRestricted will be set as true + // If 'business' accounts, if driveId == defaultDriveId, then we will potentially have quota data - cachedOnlineDriveData.quotaRemaining will be updated so it can be reused + // If 'business' accounts, if driveId != defaultDriveId, then we will potentially have quota data, but it most likely will be a 0 value - cachedOnlineDriveData.quotaRestricted will be set as true + + // Is there quota available for the given drive where we are uploading to? + if (cachedOnlineDriveData.quotaAvailable) { + // Our query told us we have free space online .. if we upload this file, will we exceed space online - thus upload will fail during upload? + if (calculatedSpaceOnlinePostUpload > 0) { + // Based on this thread action, we beleive that there is space available online to upload - proceed spaceAvailableOnline = true; } - - // Do we have space available or is space available being restricted (so we make the blind assumption that there is space available) - if (spaceAvailableOnline) { - // Does this file exceed the maximum file size to upload to OneDrive? - if (thisFileSizeLocal <= maxUploadFileSize) { - // Attempt to upload the modified file - // Error handling is in performModifiedFileUpload(), and the JSON that is responded with - will either be null or a valid JSON object containing the upload result - uploadResponse = performModifiedFileUpload(dbItem, localFilePath, thisFileSizeLocal); - - // Evaluate the returned JSON uploadResponse - // If there was an error uploading the file, uploadResponse should be empty and invalid - if (uploadResponse.type() != JSONType.object) { - uploadFailed = true; - skippedExceptionError = true; - } - - } else { - // Skip file - too large - uploadFailed = true; - skippedMaxSize = true; - } - } else { - // Cant upload this file - no space available - uploadFailed = true; - } + } + + // Is quota being restricted? + if (cachedOnlineDriveData.quotaRestricted) { + // Space available online is being restricted - so we have no way to really know if there is space available online + spaceAvailableOnline = true; + } - // Did the upload fail? - if (uploadFailed) { - // Upload failed .. why? - // No space available online - if (!spaceAvailableOnline) { - addLogEntry("Skipping uploading modified file " ~ localFilePath ~ " due to insufficient free space available on Microsoft OneDrive", ["info", "notify"]); - } - // File exceeds max allowed size - if (skippedMaxSize) { - addLogEntry("Skipping uploading this modified file as it exceeds the maximum size allowed by OneDrive: " ~ localFilePath, ["info", "notify"]); - } - // Generic message - if (skippedExceptionError) { - // normal failure message if API or exception error generated - addLogEntry("Uploading modified file " ~ localFilePath ~ " ... failed!", ["info", "notify"]); + // Do we have space available or is space available being restricted (so we make the blind assumption that there is space available) + if (spaceAvailableOnline) { + // Does this file exceed the maximum file size to upload to OneDrive? + if (thisFileSizeLocal <= maxUploadFileSize) { + // Attempt to upload the modified file + // Error handling is in performModifiedFileUpload(), and the JSON that is responded with - will either be null or a valid JSON object containing the upload result + uploadResponse = performModifiedFileUpload(dbItem, localFilePath, thisFileSizeLocal); + + // Evaluate the returned JSON uploadResponse + // If there was an error uploading the file, uploadResponse should be empty and invalid + if (uploadResponse.type() != JSONType.object) { + uploadFailed = true; + skippedExceptionError = true; } + } else { - // Upload was successful - addLogEntry("Uploading modified file " ~ localFilePath ~ " ... done.", ["info", "notify"]); - - // Save JSON item in database - saveItem(uploadResponse); - - // Update the 'cachedOnlineDriveData' record for this 'dbItem.driveId' so that this is tracked as accuratly as possible for other threads - updateDriveDetailsCache(dbItem.driveId, cachedOnlineDriveData.quotaRestricted, cachedOnlineDriveData.quotaAvailable, thisFileSizeLocal); - - // Check the integrity of the uploaded modified file if not in a --dry-run scenario - if (!dryRun) { - // Perform the integrity of the uploaded modified file - performUploadIntegrityValidationChecks(uploadResponse, localFilePath, thisFileSizeLocal); - - // Update the date / time of the file online to match the local item - // Get the local file last modified time - SysTime localModifiedTime = timeLastModified(localFilePath).toUTC(); - localModifiedTime.fracSecs = Duration.zero; - // Get the latest eTag, and use that - string etagFromUploadResponse = uploadResponse["eTag"].str; - // Attempt to update the online date time stamp based on our local data - uploadLastModifiedTime(dbItem.driveId, dbItem.id, localModifiedTime, etagFromUploadResponse); - } + // Skip file - too large + uploadFailed = true; + skippedMaxSize = true; } - - addLogEntry("Thread " ~ to!string(i) ~ " Finished: " ~ to!string(Clock.currTime()), ["debug"]); - - } // end of 'foreach (i, localItemDetails; array.enumerate)' + } else { + // Cant upload this file - no space available + uploadFailed = true; + } + + // Did the upload fail? + if (uploadFailed) { + // Upload failed .. why? + // No space available online + if (!spaceAvailableOnline) { + addLogEntry("Skipping uploading modified file " ~ localFilePath ~ " due to insufficient free space available on Microsoft OneDrive", ["info", "notify"]); + } + // File exceeds max allowed size + if (skippedMaxSize) { + addLogEntry("Skipping uploading this modified file as it exceeds the maximum size allowed by OneDrive: " ~ localFilePath, ["info", "notify"]); + } + // Generic message + if (skippedExceptionError) { + // normal failure message if API or exception error generated + addLogEntry("Uploading modified file " ~ localFilePath ~ " ... failed!", ["info", "notify"]); + } + } else { + // Upload was successful + addLogEntry("Uploading modified file " ~ localFilePath ~ " ... done.", ["info", "notify"]); + + // Save JSON item in database + saveItem(uploadResponse); + + // Update the 'cachedOnlineDriveData' record for this 'dbItem.driveId' so that this is tracked as accuratly as possible for other threads + updateDriveDetailsCache(dbItem.driveId, cachedOnlineDriveData.quotaRestricted, cachedOnlineDriveData.quotaAvailable, thisFileSizeLocal); + + // Check the integrity of the uploaded modified file if not in a --dry-run scenario + if (!dryRun) { + // Perform the integrity of the uploaded modified file + performUploadIntegrityValidationChecks(uploadResponse, localFilePath, thisFileSizeLocal); + + // Update the date / time of the file online to match the local item + // Get the local file last modified time + SysTime localModifiedTime = timeLastModified(localFilePath).toUTC(); + localModifiedTime.fracSecs = Duration.zero; + // Get the latest eTag, and use that + string etagFromUploadResponse = uploadResponse["eTag"].str; + // Attempt to update the online date time stamp based on our local data + uploadLastModifiedTime(dbItem.driveId, dbItem.id, localModifiedTime, etagFromUploadResponse); + } + } } // Perform the upload of a locally modified file to OneDrive @@ -3943,10 +3946,17 @@ class SyncEngine { // Perform a filesystem walk to uncover new data to upload to OneDrive void scanLocalFilesystemPathForNewData(string path) { - // Cleanup array memory before we start adding files newLocalFilesToUploadToOneDrive = []; + // Perform a filesystem walk to uncover new data + scanLocalFilesystemPathForNewDataToUpload(path); + + // Upload new data that has been identified + processNewLocalItemsToUpload(); + } + + void scanLocalFilesystemPathForNewDataToUpload(string path) { // To improve logging output for this function, what is the 'logical path' we are scanning for file & folder differences? string logPath; if (path == ".") { @@ -3977,9 +3987,11 @@ class SyncEngine { // Perform the filesystem walk of this path, building an array of new items to upload scanPathForNewData(path); - if (!appConfig.surpressLoggingOutput) { - if (appConfig.verbosityCount == 0) - addLogEntry("\n", ["consoleOnlyNoNewLine"]); + if (isDir(path)) { + if (!appConfig.surpressLoggingOutput) { + if (appConfig.verbosityCount == 0) + addLogEntry("\n", ["consoleOnlyNoNewLine"]); + } } // To finish off the processing items, this is needed to reflect this in the log @@ -3990,7 +4002,10 @@ class SyncEngine { auto elapsedTime = finishTime - startTime; addLogEntry("Elapsed Time Filesystem Walk: " ~ to!string(elapsedTime), ["debug"]); - + } + + // Perform a filesystem walk to uncover new data to upload to OneDrive + void processNewLocalItemsToUpload() { // Upload new data that has been identified // Are there any items to download post fetching the /delta data? if (!newLocalFilesToUploadToOneDrive.empty) { @@ -4036,22 +4051,16 @@ class SyncEngine { // Cleanup array memory after uploading all files newLocalFilesToUploadToOneDrive = []; } - - if (!databaseItemsWhereContentHasChanged.empty) { - // There are changed local files that were in the DB to upload - addLogEntry("Changed local items to upload to OneDrive: " ~ to!string(databaseItemsWhereContentHasChanged.length)); - processChangedLocalItemsToUpload(); - // Cleanup array memory - databaseItemsWhereContentHasChanged = []; - } } // Scan this path for new data void scanPathForNewData(string path) { // Add a processing '.' - if (!appConfig.surpressLoggingOutput) { - if (appConfig.verbosityCount == 0) - addProcessingDotEntry(); + if (isDir(path)) { + if (!appConfig.surpressLoggingOutput) { + if (appConfig.verbosityCount == 0) + addProcessingDotEntry(); + } } ulong maxPathLength; @@ -4271,37 +4280,42 @@ class SyncEngine { } // Handle a single file inotify trigger when using --monitor - void handleLocalFileTrigger(string localFilePath) { + void handleLocalFileTrigger(string[] changedLocalFilesToUploadToOneDrive) { // Is this path a new file or an existing one? // Normally we would use pathFoundInDatabase() to calculate, but we need 'databaseItem' as well if the item is in the database - Item databaseItem; - bool fileFoundInDB = false; - string[3][] modifiedItemToUpload; - - foreach (driveId; onlineDriveDetails.keys) { - if (itemDB.selectByPath(localFilePath, driveId, databaseItem)) { - fileFoundInDB = true; - break; - } - } - - // Was the file found in the database? - if (!fileFoundInDB) { - // This is a new file as it is not in the database - // Log that the file has been added locally - addLogEntry("[M] New local file added: " ~ localFilePath, ["verbose"]); - scanLocalFilesystemPathForNewData(localFilePath); - } else { - // This is a potentially modified file, needs to be handled as such. Is the item truly modified? - if (!testFileHash(localFilePath, databaseItem)) { - // The local file failed the hash comparison test - there is a data difference - // Log that the file has changed locally - addLogEntry("[M] Local file changed: " ~ localFilePath, ["verbose"]); - // Add the modified item to the array to upload - modifiedItemToUpload ~= [databaseItem.driveId, databaseItem.id, localFilePath]; - uploadChangedLocalFileToOneDrive(modifiedItemToUpload); + foreach (localFilePath; changedLocalFilesToUploadToOneDrive) { + try { + Item databaseItem; + bool fileFoundInDB = false; + + foreach (driveId; onlineDriveDetails.keys) { + if (itemDB.selectByPath(localFilePath, driveId, databaseItem)) { + fileFoundInDB = true; + break; + } + } + + // Was the file found in the database? + if (!fileFoundInDB) { + // This is a new file as it is not in the database + // Log that the file has been added locally + addLogEntry("[M] New local file added: " ~ localFilePath, ["verbose"]); + scanLocalFilesystemPathForNewDataToUpload(localFilePath); + } else { + // This is a potentially modified file, needs to be handled as such. Is the item truly modified? + if (!testFileHash(localFilePath, databaseItem)) { + // The local file failed the hash comparison test - there is a data difference + // Log that the file has changed locally + addLogEntry("[M] Local file changed: " ~ localFilePath, ["verbose"]); + // Add the modified item to the array to upload + uploadChangedLocalFileToOneDrive([databaseItem.driveId, databaseItem.id, localFilePath]); + } + } + } catch(Exception e) { + addLogEntry("Cannot upload file changes/creation: " ~ e.msg, ["info", "notify"]); } } + processNewLocalItemsToUpload(); } // Query the database to determine if this path is within the existing database @@ -4959,10 +4973,10 @@ class SyncEngine { string changedItemParentId = fileDetailsFromOneDrive["parentReference"]["driveId"].str; string changedItemId = fileDetailsFromOneDrive["id"].str; addLogEntry("Skipping uploading this file as moving it to upload as a modified file (online item already exists): " ~ fileToUpload); - databaseItemsWhereContentHasChanged ~= [changedItemParentId, changedItemId, fileToUpload]; // In order for the processing of the local item as a 'changed' item, unfortunatly we need to save the online data to the local DB saveItem(fileDetailsFromOneDrive); + uploadChangedLocalFileToOneDrive([changedItemParentId, changedItemId, fileToUpload]); } } catch (OneDriveException exception) { // If we get a 404 .. the file is not online .. this is what we want .. file does not exist online @@ -6811,7 +6825,7 @@ class SyncEngine { if (!itemDB.selectByPath(oldPath, appConfig.defaultDriveId, oldItem)) { // The old path|item is not synced with the database, upload as a new file addLogEntry("Moved local item was not in-sync with local databse - uploading as new item"); - uploadNewFile(newPath); + scanLocalFilesystemPathForNewData(newPath); return; }