This commit is contained in:
JC-comp 2024-03-10 10:12:50 +08:00 committed by GitHub
commit c214fd74b1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 519 additions and 130 deletions

View file

@ -68,6 +68,7 @@ endif
SOURCES = \
src/main.d \
src/config.d \
src/progress.d \
src/log.d \
src/util.d \
src/qxor.d \

View file

@ -707,6 +707,19 @@ final class ItemDatabase {
return items;
}
// Select number of items associated with the provided driveId
ulong selectCountByDriveId(const(char)[] driveId) {
assert(driveId);
Item[] items;
auto stmt = db.prepare("SELECT COUNT(*) FROM item WHERE driveId = ?1");
stmt.bind(1, driveId);
auto res = stmt.exec();
if (!res.empty) {
return parse!ulong(res.front[0]);
}
return 0;
}
// Select all items associated with the provided driveId
Item[] selectAllItemsByDriveId(const(char)[] driveId) {
assert(driveId);

View file

@ -12,6 +12,10 @@ import core.sync.mutex;
import core.thread;
import std.format;
import std.string;
import std.process;
import std.conv;
import progress;
version(Notifications) {
import dnotify;
@ -26,6 +30,7 @@ shared MonoTime lastInsertedTime;
class LogBuffer {
private:
string[3][] buffer;
bool progressUpdated;
Mutex bufferLock;
Condition condReady;
string logFilePath;
@ -37,10 +42,12 @@ class LogBuffer {
bool sendGUINotification;
public:
int terminalCols;
this(bool verboseLogging, bool debugLogging) {
// Initialise the mutex
bufferLock = new Mutex();
condReady = new Condition(bufferLock);
progressUpdated = false;
// Initialise other items
this.logFilePath = logFilePath;
this.writeToFile = writeToFile;
@ -48,6 +55,7 @@ class LogBuffer {
this.debugLogging = debugLogging;
this.isRunning = true;
this.sendGUINotification = true;
this.terminalCols = 120;
this.flushThread = new Thread(&flushBuffer);
flushThread.isDaemon(true);
flushThread.start();
@ -95,6 +103,13 @@ class LogBuffer {
(cast()condReady).notify();
}
}
void wakeUpFlushJob() {
synchronized (bufferLock) {
progressUpdated = true;
}
(cast()condReady).notify();
}
shared void notify(string message) {
// Use dnotify's functionality for GUI notifications, if GUI notifications is enabled
@ -109,6 +124,10 @@ class LogBuffer {
}
}
shared void setTerminalCols(int cols) {
this.terminalCols = cols;
}
private void flushBuffer() {
while (isRunning) {
flush();
@ -119,11 +138,15 @@ class LogBuffer {
private void flush() {
string[3][] messages;
synchronized(bufferLock) {
while (buffer.empty && isRunning) {
condReady.wait();
if (buffer.empty && !progressUpdated && isRunning) {
if (progressManager.isEmpty())
condReady.wait();
else
condReady.wait(1.seconds);
}
messages = buffer;
buffer.length = 0;
progressUpdated = false;
}
foreach (msg; messages) {
@ -137,7 +160,8 @@ class LogBuffer {
write(msg[2]);
} else {
// write this to the console with a new line
writeln(msg[2]);
write(msg[2]);
progressManager.closeAndClearLine();
}
}
@ -150,12 +174,23 @@ class LogBuffer {
}
}
}
// Dump progress information
progressManager.dump();
}
}
// Function to initialize the logging system
void initialiseLogging(bool verboseLogging = false, bool debugLogging = false) {
logBuffer = cast(shared) new LogBuffer(verboseLogging, debugLogging);
try {
auto askCols = execute(["tput", "cols"]);
if (askCols.status == 0)
logBuffer.setTerminalCols(parse!int(askCols.output));
} catch (Exception e) {
// pass
}
lastInsertedTime = MonoTime.currTime();
}

View file

@ -27,12 +27,14 @@ import log;
import curlEngine;
import util;
import onedrive;
import progress;
import syncEngine;
import itemdb;
import clientSideFiltering;
import monitor;
import webhook;
// What other constant variables do we require?
const int EXIT_RESYNC_REQUIRED = 126;
@ -137,10 +139,13 @@ int main(string[] cliArgs) {
if (verbosityCount == 1) { verboseLogging = true;}
if (verbosityCount >= 2) { debugLogging = true;}
// Initialize the application progress manager class
initialiseProgressManager(verboseLogging, debugLogging);
// Initialize the application logging class, as we know the application verbosity level
// If we need to enable logging to a file, we can only do this once we know the application configuration which is done slightly later on
initialiseLogging(verboseLogging, debugLogging);
/**
// most used
addLogEntry("Basic 'info' message", ["info"]); .... or just use addLogEntry("Basic 'info' message");
@ -731,6 +736,7 @@ int main(string[] cliArgs) {
// Detail the outcome of the sync process
displaySyncOutcome();
progressManager.clearAllJobs();
}
// Are we doing a --monitor operation?
@ -1013,6 +1019,8 @@ int main(string[] cliArgs) {
}
}
}
progressManager.clearAllJobs();
if (performMonitor) {
auto nextCheckTime = lastCheckTime + checkOnlineInterval;
@ -1086,6 +1094,8 @@ int main(string[] cliArgs) {
Thread.sleep(sleepTime);
}
}
progressManager.clearAllJobs();
}
}
} else {

372
src/progress.d Normal file
View file

@ -0,0 +1,372 @@
module progress;
import core.sync.mutex;
import core.atomic;
import std.algorithm;
import std.conv;
import std.container;
import std.datetime.stopwatch;
import std.stdio;
import std.string;
import log;
shared ProgressManager progressManager;
// Helper control code
enum CURSOR_UP = "\x1b[1A";
enum LINEFEED = "\r";
enum ERASE_IN_LINE = "\x1b[K";
class Progress {
enum Status
{
running,
success,
failed,
cancelled,
}
enum Type
{
sync, // syncing progress (i.e. syncOneDriveAccountToLocalDisk, uploadChangedLocalFileToOneDrive, ...)
file, // operation on file (i.e. upload, download, ... for single file)
}
private:
Progress[] sub_progresses;
// What is the type of progress
Type type;
// Identifier
string name;
string message;
int bar_length;
// Progress statistics
// Time elapsed
MonoTime start_time;
MonoTime end_time;
// When is the last progress updated
MonoTime last_update_time;
// When is the last progress displayed
MonoTime last_update_displayed;
// What is the last progress displayed
float last_progress;
// Progress indicator
size_t index;
size_t total;
Status status;
// Is the job completed
bool completed;
// Is verbose is on
bool verbose;
// Should the job display completed status when the job is completed
bool log_when_done;
this(Type type, string name) {
this.type = type;
this.name = name;
this.bar_length = logBuffer.terminalCols;
this.start_time = MonoTime.currTime;
this.last_update_time = MonoTime.currTime;
this.last_update_displayed = MonoTime.currTime;
this.last_progress = 0;
this.index = 0;
this.total = 0;
this.status = Status.running;
this.completed = false;
this.verbose = true;
this.log_when_done = true;
}
public:
void reset() {
sub_progresses = null;
this.start_time = MonoTime.currTime;
this.last_progress = 0;
index = 0;
total = 0;
this.status = Status.running;
this.completed = false;
}
void setVerbose(bool verbose) {
this.verbose = verbose;
}
void setLogWhenDone(bool log_when_done) {
this.log_when_done = log_when_done;
}
void setMessage(string message) {
this.message = message;
}
// Add size to total number of progress
void add(size_t size) {
total += size;
updateDisplay();
}
// Add n to total number of completed progress
void next(size_t n) {
synchronized (this) {
this.index += n;
updateDisplay();
}
}
// Update total number of progress and total number of completed progress
void update(size_t index, size_t total) {
synchronized (this) {
this.index = index;
this.total = total;
updateDisplay();
}
}
// Set completion information when the job is finished.
void done(Status status=Status.success) {
this.completed = true;
this.status = status;
this.end_time = MonoTime.currTime;
// Print results
if (verbose && log_when_done) {
addLogEntry(getMessageLine());
}
}
// Retrieve the progress string of this job recursively.
void getMessageLine(ref string line, ref int counter) {
// Only print unfinished progress
if (completed || !verbose)
return;
// Accumulate number of lines
counter += 1;
// Print sub progress first
foreach (child; sub_progresses)
child.getMessageLine(line, counter);
// Print this progress
line ~= getMessageLine() ~ ERASE_IN_LINE ~ "\n";
}
// Retrieve the progress string for this job only.
string getMessageLine() {
string line;
// Format of the progress string
// line = name | percentage | progress | rate ... message
string percentage;
string rate;
string progress;
// Calculate percentage if number of total progress is available
// percentage format: XX.X%
if (!completed && total > 0) {
// Round to XX.X
float currentDLPercent = to!float(roundTo!int(10 * getProgress())) / 10;
percentage = rightJustify(to!string(currentDLPercent) ~ "%", 5, ' ');
}
// progress format: OOOO (IN) XX:XX:XX
string statusStr;
Duration elapsed;
if (completed) {
// Job completed
switch (status) {
case Status.success:
statusStr = "DONE";
break;
case Status.failed:
statusStr = "FAILED";
break;
case Status.cancelled:
statusStr = "CANCELLED";
break;
default:
statusStr = "DONE";
break;
}
statusStr ~= " in";
elapsed = end_time - start_time;
} else {
// Calculate time elapsed
statusStr = "Elapsed";
elapsed = MonoTime.currTime - start_time;
}
int h, m, s;
elapsed.split!("hours", "minutes", "seconds")(h, m, s);
progress = format("%s %02d:%02d:%02d", statusStr, h, m, s);
// Calculate ETA if total number of progress available
// rate format: ETA XX:XX:XX
// rate format: XXX items/sec
if (!completed && index > 0) {
Duration collectedTime = last_update_time - start_time;
float msec_per_item = 1.0 * collectedTime.total!"msecs" / index;
if (total > 0) {
// Calculate ETA if total number of progress available
long expected = to!long(msec_per_item * (total - index));
long eta = expected + collectedTime.total!"msecs" - elapsed.total!"msecs";
if (eta < 0)
eta = 1000;
dur!"msecs"(eta).split!("hours", "minutes", "seconds")(h, m, s);
rate = format("ETA %02d:%02d:%02d", h, m, s);
} else {
// Calculate processing rate if total number of progress not available
float items_per_sec = 1.0 / msec_per_item * 1000;
rate = format("%0.2f items/sec", items_per_sec);
}
}
if (percentage.length > 0) percentage = " | " ~ percentage;
if (progress.length > 0) progress = " | " ~ progress;
if (rate.length > 0) rate = " | " ~ rate;
string real_message = message;
// Fallback to show proccessed items when message is empty
if (message.empty) {
if (!completed && total > 0)
real_message = format("%d items left", total - index);
else
real_message = format("Processed %d items", index);
}
line = format("%s%s%s%s ... ",
name,
percentage,
progress,
rate);
// Prevent messages from overflowing onto the next line
if (completed) {
line ~= real_message;
} else {
int length_left = to!int(this.bar_length - line.length);
if (length_left > 0) {
int start_pos = 0;
if (real_message.length > length_left) {
start_pos = to!int(real_message.length) - length_left;
}
line ~= real_message[start_pos .. $];
}
}
return line;
}
// Get % of the current progress
float getProgress() {
return 100.0 * index / total;
}
// Add a progress to the progress list
Progress createSubProgress(Type type, string name) {
Progress progress = new Progress(type, name);
progress.setVerbose(verbose);
sub_progresses ~= progress;
return progress;
}
private:
void updateDisplay() {
this.last_update_time = MonoTime.currTime;
if (MonoTime.currTime - this.last_update_displayed > 200.msecs) {
this.last_update_displayed = MonoTime.currTime;
(cast()logBuffer).wakeUpFlushJob();
}
}
}
synchronized class ProgressManager {
enum Verbose
{
TRUE = true,
FALSE = false
}
enum LogDone
{
TRUE = true,
FALSE = false
}
private:
// List of submitted progress
Progress[] progressList;
bool verbose;
this(bool verbose) {
this.verbose = verbose;
}
public:
Progress createProgress(Progress.Type type, string name) {
Progress progress = new Progress(type, name);
progressList ~= cast(shared)progress;
return progress;
}
Progress createProgress(Progress parentProgress, Progress.Type type, string name) {
Progress progress;
if(parentProgress is null) {
progress = new Progress(type, name);
progressList ~= cast(shared)progress;
} else {
progress = parentProgress.createSubProgress(type, name);
}
return progress;
}
void dump() {
if (!verbose)
return;
int counter;
string line;
foreach(progress; progressList) {
if (!progress.completed) {
(cast()progress).getMessageLine(line, counter);
}
}
write(line);
// Move up to first line
while(counter-- > 0) {
write(CURSOR_UP ~ LINEFEED);
}
}
void clearAllJobs() {
progressList = null;
}
void clearLine() {
write(ERASE_IN_LINE);
}
void closeAndClearLine() {
clearLine();
writeln();
}
bool isEmpty() {
if (!verbose)
return true;
return progressList.empty;
}
}
void initialiseProgressManager(bool verboseLogging = false, bool debugLogging = false) {
progressManager = new shared(ProgressManager)(!(verboseLogging || debugLogging));
}

View file

@ -33,6 +33,7 @@ import util;
import onedrive;
import itemdb;
import clientSideFiltering;
import progress;
class jsonResponseException: Exception {
@safe pure this(string inputMessage) {
@ -789,15 +790,13 @@ class SyncEngine {
currentDeltaLink = null;
}
// Dynamic output for non-verbose and verbose run so that the user knows something is being retreived from the OneDrive API
if (appConfig.verbosityCount == 0) {
if (!appConfig.surpressLoggingOutput) {
addProcessingLogHeaderEntry("Fetching items from the OneDrive API for Drive ID: " ~ driveIdToQuery, appConfig.verbosityCount);
}
} else {
addLogEntry("Fetching /delta response from the OneDrive API for Drive ID: " ~ driveIdToQuery, ["verbose"]);
if (appConfig.verbosityCount != 0 || !appConfig.surpressLoggingOutput) {
addLogEntry("Fetching /delta response from the OneDrive API for Drive ID: " ~ driveIdToQuery);
}
// Progress
Progress progress = progressManager.createProgress(Progress.Type.sync, "Fetching /delta response");
// Create a new API Instance for querying /delta and initialise it
// Reuse the socket to speed up
bool keepAlive = true;
@ -823,14 +822,8 @@ class SyncEngine {
ulong nrChanges = count(deltaChanges["value"].array);
int changeCount = 0;
if (appConfig.verbosityCount == 0) {
// Dynamic output for a non-verbose run so that the user knows something is happening
if (!appConfig.surpressLoggingOutput) {
addProcessingDotEntry();
}
} else {
addLogEntry("Processing API Response Bundle: " ~ to!string(responseBundleCount) ~ " - Quantity of 'changes|items' in this bundle to process: " ~ to!string(nrChanges), ["verbose"]);
}
progress.next(1);
addLogEntry("Processing API Response Bundle: " ~ to!string(responseBundleCount) ~ " - Quantity of 'changes|items' in this bundle to process: " ~ to!string(nrChanges), ["verbose"]);
jsonItemsReceived = jsonItemsReceived + nrChanges;
@ -876,14 +869,8 @@ class SyncEngine {
object.destroy(getDeltaQueryOneDriveApiInstance);
// Log that we have finished querying the /delta API
if (appConfig.verbosityCount == 0) {
if (!appConfig.surpressLoggingOutput) {
// Close out the '....' being printed to the console
addLogEntry("\n", ["consoleOnlyNoNewLine"]);
}
} else {
addLogEntry("Finished processing /delta JSON response from the OneDrive API", ["verbose"]);
}
addLogEntry("Finished processing /delta JSON response from the OneDrive API", ["verbose"]);
progress.done();
// If this was set, now unset it, as this will have been completed, so that for a true up, we dont do a double full scan
if (appConfig.fullScanTrueUpRequired) {
@ -962,9 +949,13 @@ class SyncEngine {
ulong batchCount = (jsonItemsToProcess.length + batchSize - 1) / batchSize;
ulong batchesProcessed = 0;
Progress progress = progressManager.createProgress(Progress.Type.sync, "Processing /delta changes");
progress.add(jsonItemsToProcess.length);
// Dynamic output for a non-verbose run so that the user knows something is happening
if (!appConfig.surpressLoggingOutput) {
addProcessingLogHeaderEntry("Processing " ~ to!string(jsonItemsToProcess.length) ~ " applicable changes and items received from Microsoft OneDrive", appConfig.verbosityCount);
// Logfile entry
addLogEntry("Processing " ~ to!string(jsonItemsToProcess.length) ~ " applicable changes and items received from Microsoft OneDrive");
}
// For each batch, process the JSON items that need to be now processed.
@ -973,29 +964,17 @@ class SyncEngine {
// Chunk the total items to process into 500 lot items
batchesProcessed++;
if (appConfig.verbosityCount == 0) {
// Dynamic output for a non-verbose run so that the user knows something is happening
if (!appConfig.surpressLoggingOutput) {
addProcessingDotEntry();
}
} else {
addLogEntry("Processing OneDrive JSON item batch [" ~ to!string(batchesProcessed) ~ "/" ~ to!string(batchCount) ~ "] to ensure consistent local state", ["verbose"]);
}
addLogEntry("Processing OneDrive JSON item batch [" ~ to!string(batchesProcessed) ~ "/" ~ to!string(batchCount) ~ "] to ensure consistent local state", ["verbose"]);
// Process the batch
processJSONItemsInBatch(batchOfJSONItems, batchesProcessed, batchCount);
processJSONItemsInBatch(batchOfJSONItems, batchesProcessed, batchCount, progress);
// To finish off the JSON processing items, this is needed to reflect this in the log
addLogEntry("------------------------------------------------------------------", ["debug"]);
}
if (appConfig.verbosityCount == 0) {
// close off '.' output
if (!appConfig.surpressLoggingOutput) {
addLogEntry("\n", ["consoleOnlyNoNewLine"]);
}
}
progress.done();
// Debug output - what was processed
addLogEntry("Number of JSON items to process is: " ~ to!string(jsonItemsToProcess.length), ["debug"]);
addLogEntry("Number of JSON items processed was: " ~ to!string(processedCount), ["debug"]);
@ -1191,13 +1170,14 @@ class SyncEngine {
}
// Process each of the elements contained in jsonItemsToProcess[]
void processJSONItemsInBatch(JSONValue[] array, ulong batchGroup, ulong batchCount) {
void processJSONItemsInBatch(JSONValue[] array, ulong batchGroup, ulong batchCount, Progress progress) {
ulong batchElementCount = array.length;
foreach (i, onedriveJSONItem; array.enumerate) {
// Use the JSON elements rather can computing a DB struct via makeItem()
ulong elementCount = i +1;
progress.next(1);
// To show this is the processing for this particular item, start off with this breaker line
addLogEntry("------------------------------------------------------------------", ["debug"]);
@ -2030,23 +2010,16 @@ class SyncEngine {
// Download new file items as identified
void downloadOneDriveItems() {
// Lets deal with all the JSON items that need to be downloaded in a batch process
ulong batchSize = appConfig.getValueLong("threads");
ulong batchCount = (fileJSONItemsToDownload.length + batchSize - 1) / batchSize;
ulong batchesProcessed = 0;
Progress progress = progressManager.createProgress(Progress.Type.sync, "Downloading drive items");
progress.add(fileJSONItemsToDownload.length);
foreach (chunk; fileJSONItemsToDownload.chunks(batchSize)) {
// send an array containing 'appConfig.getValueLong("threads")' JSON items to download
downloadOneDriveItemsInParallel(chunk);
}
}
// Download items in parallel
void downloadOneDriveItemsInParallel(JSONValue[] array) {
// This function recieved an array of 16 JSON items to download
foreach (i, onedriveJSONItem; taskPool.parallel(array)) {
foreach (i, onedriveJSONItem; taskPool.parallel(fileJSONItemsToDownload)) {
// Take each JSON item and
downloadFileItem(onedriveJSONItem);
progress.next(1);
}
progress.done();
}
// Perform the actual download of an object from OneDrive
@ -2800,7 +2773,7 @@ class SyncEngine {
// Log what we are doing
if (!appConfig.surpressLoggingOutput) {
addProcessingLogHeaderEntry("Performing a database consistency and integrity check on locally stored data", appConfig.verbosityCount);
addLogEntry("Performing a database consistency and integrity check on locally stored data");
}
// What driveIDsArray do we use? If we are doing a --single-directory we need to use just the drive id associated with that operation
@ -2818,6 +2791,10 @@ class SyncEngine {
foreach (driveId; consistencyCheckDriveIdsArray) {
// Make the logging more accurate - we cant update driveId as this then breaks the below queries
addLogEntry("Processing DB entries for this Drive ID: " ~ driveId, ["verbose"]);
// Progress
Progress progress = progressManager.createProgress(Progress.Type.sync, "DB consistency check");
progress.add(itemDB.selectCountByDriveId(driveId));
// Freshen the cached quota details for this driveID
addOrUpdateOneDriveOnlineDetails(driveId);
@ -2875,7 +2852,7 @@ class SyncEngine {
// Process each database database item associated with the driveId
foreach(dbItem; driveItems) {
// Does it still exist on disk in the location the DB thinks it is
checkDatabaseItemForConsistency(dbItem);
checkDatabaseItemForConsistency(dbItem, progress);
}
} else {
// Check everything associated with each driveId we know about
@ -2888,15 +2865,10 @@ class SyncEngine {
// Process each database database item associated with the driveId
foreach(dbItem; driveItems) {
// Does it still exist on disk in the location the DB thinks it is
checkDatabaseItemForConsistency(dbItem);
checkDatabaseItemForConsistency(dbItem, progress);
}
}
}
// Close out the '....' being printed to the console
if (!appConfig.surpressLoggingOutput) {
if (appConfig.verbosityCount == 0)
addLogEntry("\n", ["consoleOnlyNoNewLine"]);
progress.done();
}
// Are we doing a --download-only sync?
@ -2913,7 +2885,7 @@ class SyncEngine {
}
// Check this Database Item for its consistency on disk
void checkDatabaseItemForConsistency(Item dbItem) {
void checkDatabaseItemForConsistency(Item dbItem, Progress progress) {
// What is the local path item
string localFilePath;
@ -2941,11 +2913,6 @@ class SyncEngine {
// Log what we are doing
addLogEntry("Processing: " ~ logOutputPath, ["verbose"]);
// Add a processing '.'
if (!appConfig.surpressLoggingOutput) {
if (appConfig.verbosityCount == 0)
addProcessingDotEntry();
}
// Determine which action to take
final switch (dbItem.type) {
@ -2955,7 +2922,7 @@ class SyncEngine {
break;
case ItemType.dir:
// Logging output result is handled by checkDirectoryDatabaseItemForConsistency
checkDirectoryDatabaseItemForConsistency(dbItem, localFilePath);
checkDirectoryDatabaseItemForConsistency(dbItem, localFilePath, progress);
break;
case ItemType.remote:
// DB items that match: dbItem.remoteType == ItemType.dir - these should have been skipped above
@ -2967,6 +2934,7 @@ class SyncEngine {
// Unknown type - we dont action these items
break;
}
progress.next(1);
}
// Perform the database consistency check on this file item
@ -3089,7 +3057,7 @@ class SyncEngine {
}
// Perform the database consistency check on this directory item
void checkDirectoryDatabaseItemForConsistency(Item dbItem, string localFilePath) {
void checkDirectoryDatabaseItemForConsistency(Item dbItem, string localFilePath, Progress progress) {
// What is the source of this item data?
string itemSource = "database";
@ -3110,7 +3078,7 @@ class SyncEngine {
if (!singleDirectoryScope) {
// loop through the children
foreach (Item child; itemDB.selectChildren(dbItem.driveId, dbItem.id)) {
checkDatabaseItemForConsistency(child);
checkDatabaseItemForConsistency(child, progress);
}
}
}
@ -3156,7 +3124,7 @@ class SyncEngine {
if (!singleDirectoryScope) {
// loop through the children
foreach (Item child; itemDB.selectChildren(dbItem.driveId, dbItem.id)) {
checkDatabaseItemForConsistency(child);
checkDatabaseItemForConsistency(child, progress);
}
}
}
@ -3617,25 +3585,17 @@ class SyncEngine {
// Process the list of local changes to upload to OneDrive
void processChangedLocalItemsToUpload() {
// Each element in this array 'databaseItemsWhereContentHasChanged' is an Database Item ID that has been modified locally
ulong batchSize = appConfig.getValueLong("threads");
ulong batchCount = (databaseItemsWhereContentHasChanged.length + batchSize - 1) / batchSize;
ulong batchesProcessed = 0;
// For each batch of files to upload, upload the changed data to OneDrive
foreach (chunk; databaseItemsWhereContentHasChanged.chunks(batchSize)) {
processChangedLocalItemsToUploadInParallel(chunk);
}
}
Progress progress = progressManager.createProgress(Progress.Type.sync, "Upload changed local file");
progress.add(databaseItemsWhereContentHasChanged.length);
// Upload the changed file batches in parallel
void processChangedLocalItemsToUploadInParallel(string[3][] array) {
foreach (i, localItemDetails; taskPool.parallel(array)) {
foreach (i, localItemDetails; taskPool.parallel(databaseItemsWhereContentHasChanged)) {
addLogEntry("Upload Thread " ~ to!string(i) ~ " Starting: " ~ to!string(Clock.currTime()), ["debug"]);
uploadChangedLocalFileToOneDrive(localItemDetails);
addLogEntry("Upload Thread " ~ to!string(i) ~ " Finished: " ~ to!string(Clock.currTime()), ["debug"]);
progress.next(1);
}
progress.done();
}
// Upload changed local files to OneDrive in parallel
@ -4217,24 +4177,23 @@ class SyncEngine {
if (isDir(path)) {
if (!appConfig.surpressLoggingOutput) {
if (!cleanupLocalFiles) {
addProcessingLogHeaderEntry("Scanning the local file system '" ~ logPath ~ "' for new data to upload", appConfig.verbosityCount);
addLogEntry("Scanning the local file system '" ~ logPath ~ "' for new data to upload");
} else {
addProcessingLogHeaderEntry("Scanning the local file system '" ~ logPath ~ "' for data to cleanup", appConfig.verbosityCount);
addLogEntry("Scanning the local file system '" ~ logPath ~ "' for data to cleanup");
}
}
}
// Progress
Progress progress = progressManager.createProgress(Progress.Type.sync, "Filesystem Walk");
progress.add(preview_localfile_count(path));
auto startTime = Clock.currTime();
addLogEntry("Starting Filesystem Walk: " ~ to!string(startTime), ["debug"]);
// Perform the filesystem walk of this path, building an array of new items to upload
scanPathForNewData(path);
if (isDir(path)) {
if (!appConfig.surpressLoggingOutput) {
if (appConfig.verbosityCount == 0)
addLogEntry("\n", ["consoleOnlyNoNewLine"]);
}
}
scanPathForNewData(path, progress);
progress.done();
// To finish off the processing items, this is needed to reflect this in the log
addLogEntry("------------------------------------------------------------------", ["debug"]);
@ -4252,7 +4211,7 @@ class SyncEngine {
// Are there any items to download post fetching the /delta data?
if (!newLocalFilesToUploadToOneDrive.empty) {
// There are elements to upload
addProcessingLogHeaderEntry("New items to upload to OneDrive: " ~ to!string(newLocalFilesToUploadToOneDrive.length), appConfig.verbosityCount);
addLogEntry("New items to upload to OneDrive: " ~ to!string(newLocalFilesToUploadToOneDrive.length));
// Reset totalDataToUpload
totalDataToUpload = 0;
@ -4296,15 +4255,9 @@ class SyncEngine {
}
// Scan this path for new data
void scanPathForNewData(string path) {
// Add a processing '.'
if (isDir(path)) {
if (!appConfig.surpressLoggingOutput) {
if (appConfig.verbosityCount == 0)
addProcessingDotEntry();
}
}
void scanPathForNewData(string path, Progress progress) {
progress.next(1);
ulong maxPathLength;
ulong pathWalkLength;
@ -4487,7 +4440,7 @@ class SyncEngine {
auto entries = dirEntries(path, SpanMode.shallow, false);
foreach (DirEntry entry; entries) {
string thisPath = entry.name;
scanPathForNewData(thisPath);
scanPathForNewData(thisPath, progress);
}
} catch (FileException e) {
// display the error message
@ -4537,6 +4490,9 @@ class SyncEngine {
void handleLocalFileTrigger(string[] changedLocalFilesToUploadToOneDrive) {
// Is this path a new file or an existing one?
// Normally we would use pathFoundInDatabase() to calculate, but we need 'databaseItem' as well if the item is in the database
Progress progress = progressManager.createProgress(Progress.Type.sync, "Upload changed local file");
progress.add(changedLocalFilesToUploadToOneDrive.length);
foreach (localFilePath; changedLocalFilesToUploadToOneDrive) {
try {
Item databaseItem;
@ -4568,7 +4524,9 @@ class SyncEngine {
} catch(Exception e) {
addLogEntry("Cannot upload file changes/creation: " ~ e.msg, ["info", "notify"]);
}
progress.next(1);
}
progress.done();
processNewLocalItemsToUpload();
}
@ -5011,28 +4969,16 @@ class SyncEngine {
// Upload new file items as identified
void uploadNewLocalFileItems() {
// Lets deal with the new local items in a batch process
ulong batchSize = appConfig.getValueLong("threads");
ulong batchCount = (newLocalFilesToUploadToOneDrive.length + batchSize - 1) / batchSize;
ulong batchesProcessed = 0;
foreach (chunk; newLocalFilesToUploadToOneDrive.chunks(batchSize)) {
uploadNewLocalFileItemsInParallel(chunk);
}
if (appConfig.verbosityCount == 0)
addLogEntry("\n", ["consoleOnlyNoNewLine"]);
}
// Upload the file batches in parallel
void uploadNewLocalFileItemsInParallel(string[] array) {
foreach (i, fileToUpload; taskPool.parallel(array)) {
// Add a processing '.'
if (appConfig.verbosityCount == 0)
addProcessingDotEntry();
Progress progress = progressManager.createProgress(Progress.Type.sync, "Upload new local file");
progress.add(newLocalFilesToUploadToOneDrive.length);
foreach (i, fileToUpload; taskPool.parallel(newLocalFilesToUploadToOneDrive)) {
addLogEntry("Upload Thread " ~ to!string(i) ~ " Starting: " ~ to!string(Clock.currTime()), ["debug"]);
uploadNewFile(fileToUpload);
addLogEntry("Upload Thread " ~ to!string(i) ~ " Finished: " ~ to!string(Clock.currTime()), ["debug"]);
progress.next(1);
}
progress.done();
}
// Upload a new file to OneDrive

View file

@ -11,6 +11,7 @@ import std.net.curl;
import std.datetime;
import std.file;
import std.path;
import std.process;
import std.regex;
import std.socket;
import std.stdio;
@ -1108,6 +1109,17 @@ int calc_eta(size_t counter, size_t iterations, ulong start_time) {
}
}
ulong preview_localfile_count(string path) {
try {
auto askFSCount = executeShell("find -L " ~ path ~ " | wc -l");
if (askFSCount.status == 0)
return parse!ulong(askFSCount.output);
} catch (Exception e) {
// pass
}
return 0;
}
void forceExit() {
// Allow logging to flush and complete
Thread.sleep(dur!("msecs")(500));