Add support for batched local monitor processing (#2609)

* Fix file upload fallback for all scenario
* Add support for batched monitor
* Add recursive match
* Adjust logging output
* Add error handling
This commit is contained in:
JC-comp 2024-02-12 14:12:20 +08:00 committed by GitHub
parent 7621bbab65
commit a92221bb50
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 448 additions and 311 deletions

View file

@ -748,16 +748,11 @@ int main(string[] cliArgs) {
};
// Delegated function for when inotify detects a local file has been changed
filesystemMonitor.onFileChanged = delegate(string path) {
filesystemMonitor.onFileChanged = delegate(string[] changedLocalFilesToUploadToOneDrive) {
// Handle a potentially locally changed file
// Logging for this event moved to handleLocalFileTrigger() due to threading and false triggers from scanLocalFilesystemPathForNewData() above
try {
syncEngineInstance.handleLocalFileTrigger(path);
} catch (CurlException e) {
addLogEntry("Offline, cannot upload changed item: " ~ path, ["verbose"]);
} catch(Exception e) {
addLogEntry("Cannot upload file changes/creation: " ~ e.msg, ["info", "notify"]);
}
addLogEntry("[M] Total number of local file changed: " ~ to!string(changedLocalFilesToUploadToOneDrive.length));
syncEngineInstance.handleLocalFileTrigger(changedLocalFilesToUploadToOneDrive);
};
// Delegated function for when inotify detects a delete event

View file

@ -8,6 +8,7 @@ import core.sys.linux.sys.inotify;
import core.sys.posix.poll;
import core.sys.posix.unistd;
import core.sys.posix.sys.select;
import core.thread;
import core.time;
import std.algorithm;
import std.concurrency;
@ -135,7 +136,6 @@ shared class MonitorBackgroundWorker {
}
}
void startMonitorJob(shared(MonitorBackgroundWorker) worker, Tid callerTid)
{
try {
@ -146,6 +146,96 @@ void startMonitorJob(shared(MonitorBackgroundWorker) worker, Tid callerTid)
worker.shutdown();
}
enum ActionType {
moved,
deleted,
changed,
createDir
}
struct Action {
ActionType type;
bool skipped;
string src;
string dst;
}
struct ActionHolder {
Action[] actions;
ulong[string] srcMap;
void append(ActionType type, string src, string dst=null) {
ulong[] pendingTargets;
switch (type) {
case ActionType.changed:
if (src in srcMap && actions[srcMap[src]].type == ActionType.changed) {
// skip duplicate operations
return;
}
break;
case ActionType.createDir:
break;
case ActionType.deleted:
if (src in srcMap) {
ulong pendingTarget = srcMap[src];
// Skip operations require reading local file that is gone
switch (actions[pendingTarget].type) {
case ActionType.changed:
case ActionType.createDir:
actions[srcMap[src]].skipped = true;
srcMap.remove(src);
break;
default:
break;
}
}
break;
case ActionType.moved:
for(int i = 0; i < actions.length; i++) {
// Only match for latest operation
if (actions[i].src in srcMap) {
switch (actions[i].type) {
case ActionType.changed:
case ActionType.createDir:
// check if the source is the prefix of the target
string prefix = src ~ "/";
string target = actions[i].src;
if (prefix[0] != '.')
prefix = "./" ~ prefix;
if (target[0] != '.')
target = "./" ~ target;
string comm = commonPrefix(prefix, target);
if (src == actions[i].src || comm.length == prefix.length) {
// Hold operations require reading local file that is moved after the target is moved online
pendingTargets ~= i;
actions[i].skipped = true;
srcMap.remove(actions[i].src);
if (comm.length == target.length)
actions[i].src = dst;
else
actions[i].src = dst ~ target[comm.length - 1 .. target.length];
}
break;
default:
break;
}
}
}
break;
default:
break;
}
actions ~= Action(type, false, src, dst);
srcMap[src] = actions.length - 1;
foreach (pendingTarget; pendingTargets) {
actions ~= actions[pendingTarget];
actions[$-1].skipped = false;
srcMap[actions[$-1].src] = actions.length - 1;
}
}
}
final class Monitor {
// Class variables
ApplicationConfig appConfig;
@ -171,13 +261,15 @@ final class Monitor {
// Configure function delegates
void delegate(string path) onDirCreated;
void delegate(string path) onFileChanged;
void delegate(string[] path) onFileChanged;
void delegate(string path) onDelete;
void delegate(string from, string to) onMove;
// List of paths that were moved, not deleted
bool[string] movedNotDeleted;
ActionHolder actionHolder;
// Configure the class varaible to consume the application configuration including selective sync
this(ApplicationConfig appConfig, ClientSideFiltering selectiveSync) {
this.appConfig = appConfig;
@ -370,11 +462,13 @@ final class Monitor {
events: POLLIN
};
while (true) {
bool hasNotification = false;
while (true) {
int ret = poll(&fds, 1, 0);
if (ret == -1) throw new MonitorException("poll failed");
else if (ret == 0) break; // no events available
hasNotification = true;
size_t length = read(worker.fd, buffer.ptr, buffer.length);
if (length == -1) throw new MonitorException("read failed");
@ -469,32 +563,32 @@ final class Monitor {
auto from = event.cookie in cookieToPath;
if (from) {
cookieToPath.remove(event.cookie);
if (useCallbacks) onMove(*from, path);
if (useCallbacks) actionHolder.append(ActionType.moved, *from, path);
movedNotDeleted.remove(*from); // Clear moved status
} else {
// Handle file moved in from outside
if (event.mask & IN_ISDIR) {
if (useCallbacks) onDirCreated(path);
if (useCallbacks) actionHolder.append(ActionType.createDir, path);
} else {
if (useCallbacks) onFileChanged(path);
if (useCallbacks) actionHolder.append(ActionType.changed, path);
}
}
} else if (event.mask & IN_CREATE) {
addLogEntry("event IN_CREATE: " ~ path, ["debug"]);
if (event.mask & IN_ISDIR) {
addRecursive(path);
if (useCallbacks) onDirCreated(path);
if (useCallbacks) actionHolder.append(ActionType.createDir, path);
}
} else if (event.mask & IN_DELETE) {
if (path in movedNotDeleted) {
movedNotDeleted.remove(path); // Ignore delete for moved files
} else {
addLogEntry("event IN_DELETE: " ~ path, ["debug"]);
if (useCallbacks) onDelete(path);
if (useCallbacks) actionHolder.append(ActionType.deleted, path);
}
} else if ((event.mask & IN_CLOSE_WRITE) && !(event.mask & IN_ISDIR)) {
addLogEntry("event IN_CLOSE_WRITE and not IN_ISDIR: " ~ path, ["debug"]);
if (useCallbacks) onFileChanged(path);
if (useCallbacks) actionHolder.append(ActionType.changed, path);
} else {
addLogEntry("event unhandled: " ~ path, ["debug"]);
assert(0);
@ -503,6 +597,11 @@ final class Monitor {
skip:
i += inotify_event.sizeof + event.len;
}
Thread.sleep(dur!"seconds"(1));
}
if (!hasNotification) break;
processChanges();
// Assume that the items moved outside the watched directory have been deleted
foreach (cookie, path; cookieToPath) {
addLogEntry("Deleting cookie|watch (post loop): " ~ path, ["debug"]);
@ -515,6 +614,35 @@ final class Monitor {
}
}
private void processChanges() {
string[] changes;
foreach(action; actionHolder.actions) {
if (action.skipped)
continue;
switch (action.type) {
case ActionType.changed:
changes ~= action.src;
break;
case ActionType.deleted:
onDelete(action.src);
break;
case ActionType.createDir:
onDirCreated(action.src);
break;
case ActionType.moved:
onMove(action.src, action.dst);
break;
default:
break;
}
}
if (!changes.empty)
onFileChanged(changes);
object.destroy(actionHolder);
}
Tid watch() {
initialised = true;
return spawn(&startMonitorJob, worker, thisTid);

View file

@ -3486,22 +3486,29 @@ class SyncEngine {
// For each batch of files to upload, upload the changed data to OneDrive
foreach (chunk; databaseItemsWhereContentHasChanged.chunks(batchSize)) {
uploadChangedLocalFileToOneDrive(chunk);
processChangedLocalItemsToUploadInParallel(chunk);
}
}
// Upload the changed file batches in parallel
void processChangedLocalItemsToUploadInParallel(string[3][] array) {
foreach (i, localItemDetails; taskPool.parallel(array)) {
addLogEntry("Upload Thread " ~ to!string(i) ~ " Starting: " ~ to!string(Clock.currTime()), ["debug"]);
uploadChangedLocalFileToOneDrive(localItemDetails);
addLogEntry("Upload Thread " ~ to!string(i) ~ " Finished: " ~ to!string(Clock.currTime()), ["debug"]);
}
}
// Upload changed local files to OneDrive in parallel
void uploadChangedLocalFileToOneDrive(string[3][] array) {
foreach (i, localItemDetails; taskPool.parallel(array)) {
addLogEntry("Thread " ~ to!string(i) ~ " Starting: " ~ to!string(Clock.currTime()), ["debug"]);
void uploadChangedLocalFileToOneDrive(string[3] localItemDetails) {
// These are the details of the item we need to upload
string changedItemParentId = localItemDetails[0];
string changedItemId = localItemDetails[1];
string localFilePath = localItemDetails[2];
addLogEntry("uploadChangedLocalFileToOneDrive: " ~ localFilePath, ["debug"]);
// How much space is remaining on OneDrive
ulong remainingFreeSpace;
// Did the upload fail?
@ -3630,10 +3637,6 @@ class SyncEngine {
uploadLastModifiedTime(dbItem.driveId, dbItem.id, localModifiedTime, etagFromUploadResponse);
}
}
addLogEntry("Thread " ~ to!string(i) ~ " Finished: " ~ to!string(Clock.currTime()), ["debug"]);
} // end of 'foreach (i, localItemDetails; array.enumerate)'
}
// Perform the upload of a locally modified file to OneDrive
@ -3943,10 +3946,17 @@ class SyncEngine {
// Perform a filesystem walk to uncover new data to upload to OneDrive
void scanLocalFilesystemPathForNewData(string path) {
// Cleanup array memory before we start adding files
newLocalFilesToUploadToOneDrive = [];
// Perform a filesystem walk to uncover new data
scanLocalFilesystemPathForNewDataToUpload(path);
// Upload new data that has been identified
processNewLocalItemsToUpload();
}
void scanLocalFilesystemPathForNewDataToUpload(string path) {
// To improve logging output for this function, what is the 'logical path' we are scanning for file & folder differences?
string logPath;
if (path == ".") {
@ -3977,10 +3987,12 @@ class SyncEngine {
// Perform the filesystem walk of this path, building an array of new items to upload
scanPathForNewData(path);
if (isDir(path)) {
if (!appConfig.surpressLoggingOutput) {
if (appConfig.verbosityCount == 0)
addLogEntry("\n", ["consoleOnlyNoNewLine"]);
}
}
// To finish off the processing items, this is needed to reflect this in the log
addLogEntry("------------------------------------------------------------------", ["debug"]);
@ -3990,7 +4002,10 @@ class SyncEngine {
auto elapsedTime = finishTime - startTime;
addLogEntry("Elapsed Time Filesystem Walk: " ~ to!string(elapsedTime), ["debug"]);
}
// Perform a filesystem walk to uncover new data to upload to OneDrive
void processNewLocalItemsToUpload() {
// Upload new data that has been identified
// Are there any items to download post fetching the /delta data?
if (!newLocalFilesToUploadToOneDrive.empty) {
@ -4036,23 +4051,17 @@ class SyncEngine {
// Cleanup array memory after uploading all files
newLocalFilesToUploadToOneDrive = [];
}
if (!databaseItemsWhereContentHasChanged.empty) {
// There are changed local files that were in the DB to upload
addLogEntry("Changed local items to upload to OneDrive: " ~ to!string(databaseItemsWhereContentHasChanged.length));
processChangedLocalItemsToUpload();
// Cleanup array memory
databaseItemsWhereContentHasChanged = [];
}
}
// Scan this path for new data
void scanPathForNewData(string path) {
// Add a processing '.'
if (isDir(path)) {
if (!appConfig.surpressLoggingOutput) {
if (appConfig.verbosityCount == 0)
addProcessingDotEntry();
}
}
ulong maxPathLength;
ulong pathWalkLength;
@ -4271,12 +4280,13 @@ class SyncEngine {
}
// Handle a single file inotify trigger when using --monitor
void handleLocalFileTrigger(string localFilePath) {
void handleLocalFileTrigger(string[] changedLocalFilesToUploadToOneDrive) {
// Is this path a new file or an existing one?
// Normally we would use pathFoundInDatabase() to calculate, but we need 'databaseItem' as well if the item is in the database
foreach (localFilePath; changedLocalFilesToUploadToOneDrive) {
try {
Item databaseItem;
bool fileFoundInDB = false;
string[3][] modifiedItemToUpload;
foreach (driveId; onlineDriveDetails.keys) {
if (itemDB.selectByPath(localFilePath, driveId, databaseItem)) {
@ -4290,7 +4300,7 @@ class SyncEngine {
// This is a new file as it is not in the database
// Log that the file has been added locally
addLogEntry("[M] New local file added: " ~ localFilePath, ["verbose"]);
scanLocalFilesystemPathForNewData(localFilePath);
scanLocalFilesystemPathForNewDataToUpload(localFilePath);
} else {
// This is a potentially modified file, needs to be handled as such. Is the item truly modified?
if (!testFileHash(localFilePath, databaseItem)) {
@ -4298,10 +4308,14 @@ class SyncEngine {
// Log that the file has changed locally
addLogEntry("[M] Local file changed: " ~ localFilePath, ["verbose"]);
// Add the modified item to the array to upload
modifiedItemToUpload ~= [databaseItem.driveId, databaseItem.id, localFilePath];
uploadChangedLocalFileToOneDrive(modifiedItemToUpload);
uploadChangedLocalFileToOneDrive([databaseItem.driveId, databaseItem.id, localFilePath]);
}
}
} catch(Exception e) {
addLogEntry("Cannot upload file changes/creation: " ~ e.msg, ["info", "notify"]);
}
}
processNewLocalItemsToUpload();
}
// Query the database to determine if this path is within the existing database
@ -4959,10 +4973,10 @@ class SyncEngine {
string changedItemParentId = fileDetailsFromOneDrive["parentReference"]["driveId"].str;
string changedItemId = fileDetailsFromOneDrive["id"].str;
addLogEntry("Skipping uploading this file as moving it to upload as a modified file (online item already exists): " ~ fileToUpload);
databaseItemsWhereContentHasChanged ~= [changedItemParentId, changedItemId, fileToUpload];
// In order for the processing of the local item as a 'changed' item, unfortunatly we need to save the online data to the local DB
saveItem(fileDetailsFromOneDrive);
uploadChangedLocalFileToOneDrive([changedItemParentId, changedItemId, fileToUpload]);
}
} catch (OneDriveException exception) {
// If we get a 404 .. the file is not online .. this is what we want .. file does not exist online
@ -6811,7 +6825,7 @@ class SyncEngine {
if (!itemDB.selectByPath(oldPath, appConfig.defaultDriveId, oldItem)) {
// The old path|item is not synced with the database, upload as a new file
addLogEntry("Moved local item was not in-sync with local databse - uploading as new item");
uploadNewFile(newPath);
scanLocalFilesystemPathForNewData(newPath);
return;
}