From 9b80d99ad487ba51cafa5e2dfc20fe42f1bfe0fe Mon Sep 17 00:00:00 2001 From: skilion Date: Sun, 27 Sep 2015 18:47:41 +0200 Subject: [PATCH] support for files bigger than 100 MB and resumable uploads --- Makefile | 1 + src/main.d | 25 ++++++------- src/onedrive.d | 45 ++++++++++++++++++++++- src/sync.d | 45 +++++++++++++++++------ src/upload.d | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 189 insertions(+), 24 deletions(-) create mode 100644 src/upload.d diff --git a/Makefile b/Makefile index 87bbf472..d0a4bfa4 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,7 @@ SOURCES = \ src/onedrive.d \ src/sqlite.d \ src/sync.d \ + src/upload.d \ src/util.d onedrive: $(SOURCES) diff --git a/src/main.d b/src/main.d index 1b781043..cfd8b226 100644 --- a/src/main.d +++ b/src/main.d @@ -62,21 +62,22 @@ void main(string[] args) if (verbose) writeln("Opening the item database ..."); auto itemdb = new ItemDatabase(databaseFilePath); - if (verbose) writeln("Initializing the Synchronization Engine ..."); - auto sync = new SyncEngine(cfg, onedrive, itemdb, verbose); - sync.onStatusToken = (string statusToken) { - std.file.write(statusTokenFilePath, statusToken); - }; - try { - string statusToken = readText(statusTokenFilePath); - sync.setStatusToken(statusToken); - } catch (FileException e) { - // swallow exception - } - string syncDir = expandTilde(cfg.get("sync_dir")); if (verbose) writeln("All operations will be performed in: ", syncDir); chdir(syncDir); + + if (verbose) writeln("Initializing the Synchronization Engine ..."); + auto sync = new SyncEngine(cfg, onedrive, itemdb, configDirName, verbose); + sync.onStatusToken = (string statusToken) { + std.file.write(statusTokenFilePath, statusToken); + }; + string statusToken; + try { + statusToken = readText(statusTokenFilePath); + } catch (FileException e) { + // swallow exception + } + sync.init(statusToken); performSync(sync); if (monitor) { diff --git a/src/onedrive.d b/src/onedrive.d index 3df952f9..73f0a5e4 100644 --- a/src/onedrive.d +++ b/src/onedrive.d @@ -1,5 +1,5 @@ import std.datetime, std.exception, std.json, std.net.curl, std.path; -import std.string, std.uni, std.uri; +import std.stdio, std.string, std.uni, std.uri; import config; private immutable { @@ -181,6 +181,49 @@ final class OneDriveApi return result; } + // https://dev.onedrive.com/items/upload_large_files.htm + JSONValue createUploadSession(const(char)[] path, const(char)[] eTag = null) + { + checkAccessTokenExpired(); + string url = itemByPathUrl ~ encodeComponent(path) ~ ":/upload.createSession"; + if (eTag) http.addRequestHeader("If-Match", eTag); + auto result = post(url, null); + // remove the if-match header + if (eTag) setAccessToken(accessToken); + return result; + } + + // https://dev.onedrive.com/items/upload_large_files.htm + JSONValue uploadFragment(const(char)[] uploadUrl, string filepath, long offset, long offsetSize, long fileSize) + { + checkAccessTokenExpired(); + http.method = HTTP.Method.put; + http.url = uploadUrl; + ubyte[] content; + http.onReceive = (ubyte[] data) { + content ~= data; + return data.length; + }; + auto file = File(filepath, "rb"); + file.seek(offset); + http.onSend = data => file.rawRead(data).length; + http.contentLength = offsetSize; + import std.conv; + string contentRange = "bytes " ~ to!string(offset) ~ "-" ~ to!string(offset + offsetSize - 1) ~ "/" ~ to!string(fileSize); + http.addRequestHeader("Content-Range", contentRange); + http.perform(); + checkHttpCode(); // TODO: retry on 5xx errors + // remove the content-range header + scope(exit) setAccessToken(accessToken); + return parseJSON(content); + } + + // https://dev.onedrive.com/items/upload_large_files.htm + JSONValue requestUploadStatus(const(char)[] uploadUrl) + { + return get(uploadUrl); + } + private void redeemToken(const(char)[] authCode) { string postData = "client_id=" ~ clientId ~ "&redirect_url=" ~ redirectUrl ~ "&client_secret=" ~ clientSecret; diff --git a/src/sync.d b/src/sync.d index c516f31c..e32ea112 100644 --- a/src/sync.d +++ b/src/sync.d @@ -1,7 +1,11 @@ import std.exception: ErrnoException; import std.algorithm, std.datetime, std.file, std.json, std.path, std.regex; import std.stdio, std.string; -import config, itemdb, onedrive, util; +import config, itemdb, onedrive, upload, util; + +private string uploadStateFileName = "resume_upload"; +// threshold after which files will be uploaded using an upload session +private long thresholdFileSize = 10 * 2^^20; // 10 Mib private bool isItemFolder(const ref JSONValue item) { @@ -52,6 +56,7 @@ final class SyncEngine private ItemDatabase itemdb; private bool verbose; private Regex!char skipDir, skipFile; + private UploadSession session; // token representing the last status correctly synced private string statusToken; // list of items to skip while applying the changes downloaded @@ -61,20 +66,28 @@ final class SyncEngine void delegate(string) onStatusToken; - this(Config cfg, OneDriveApi onedrive, ItemDatabase itemdb, bool verbose) + this(Config cfg, OneDriveApi onedrive, ItemDatabase itemdb, string configDirName, bool verbose) { assert(onedrive && itemdb); this.cfg = cfg; this.onedrive = onedrive; this.itemdb = itemdb; + //this.configDirName = configDirName; this.verbose = verbose; skipDir = wild2regex(cfg.get("skip_dir")); skipFile = wild2regex(cfg.get("skip_file")); + session = UploadSession(onedrive, configDirName ~ "/" ~ uploadStateFileName, verbose); } - void setStatusToken(string statusToken) + void init(string statusToken = null) { this.statusToken = statusToken; + // check if there is an interrupted upload session + if (session.restore()) { + writeln("Continuing the upload session ..."); + auto item = session.upload(); + saveItem(item); + } } void applyDifferences() @@ -395,13 +408,18 @@ final class SyncEngine if (!testCrc32(path, item.crc32)) { if (verbose) writeln("The file content has changed"); writeln("Uploading: ", path); - auto res = onedrive.simpleUpload(path, path, item.eTag); - saveItem(res); - id = res["id"].str; + JSONValue response; + if (getSize(path) <= thresholdFileSize) { + response = onedrive.simpleUpload(path, path); + } else { + response = session.upload(path, path); + } + saveItem(response); + id = response["id"].str; /* use the cTag instead of the eTag because Onedrive changes the * metadata of some type of files (ex. images) AFTER they have been * uploaded */ - eTag = res["cTag"].str; + eTag = response["cTag"].str; } uploadLastModifiedTime(id, eTag, localModifiedTime.toUTC()); } else { @@ -453,10 +471,15 @@ final class SyncEngine private void uploadNewFile(string path) { writeln("Uploading: ", path); - JSONValue res = onedrive.simpleUpload(path, path); - saveItem(res); - string id = res["id"].str; - string cTag = res["cTag"].str; + JSONValue response; + if (getSize(path) <= thresholdFileSize) { + response = onedrive.simpleUpload(path, path); + } else { + response = session.upload(path, path); + } + saveItem(response); + string id = response["id"].str; + string cTag = response["cTag"].str; SysTime mtime = timeLastModified(path).toUTC(); /* use the cTag instead of the eTag because Onedrive changes the * metadata of some type of files (ex. images) AFTER they have been diff --git a/src/upload.d b/src/upload.d new file mode 100644 index 00000000..c6d11884 --- /dev/null +++ b/src/upload.d @@ -0,0 +1,97 @@ +import std.algorithm; +import std.conv; +import std.datetime; +import std.file; +import std.json; +import std.stdio; +import onedrive; + +private long fragmentSize = 10 * 2^^20; // 10 Mib + +struct UploadSession +{ + private OneDriveApi onedrive; + private bool verbose; + // https://dev.onedrive.com/resources/uploadSession.htm + private JSONValue session; + // path where to save the session + private string sessionFilePath; + + this(OneDriveApi onedrive, string sessionFilePath, bool verbose) + { + assert(onedrive); + this.onedrive = onedrive; + this.sessionFilePath = sessionFilePath; + this.verbose = verbose; + } + + JSONValue upload(string localPath, string remotePath, const(char)[] eTag = null) + { + session = onedrive.createUploadSession(remotePath, eTag); + session["localPath"] = localPath; + save(); + return upload(); + } + + /* Restore the previous upload session. + * Returns true if the session is valid. Call upload() to resume it. + * Returns false if there is no session or the session is expired. */ + bool restore() + { + if (exists(sessionFilePath)) { + if (verbose) writeln("Trying to restore the upload session ..."); + session = readText(sessionFilePath).parseJSON(); + auto expiration = SysTime.fromISOExtString(session["expirationDateTime"].str); + if (expiration < Clock.currTime()) { + if (verbose) writeln("The upload session is expired"); + return false; + } + if (!exists(session["localPath"].str)) { + if (verbose) writeln("The file do not exist anymore"); + return false; + } + // request the session status + auto response = onedrive.requestUploadStatus(session["uploadUrl"].str); + session["expirationDateTime"] = response["expirationDateTime"]; + session["nextExpectedRanges"] = response["nextExpectedRanges"]; + if (session["nextExpectedRanges"].array.length == 0) { + if (verbose) writeln("The upload session is completed"); + return false; + } + return true; + } + return false; + } + + JSONValue upload() + { + long offset = session["nextExpectedRanges"][0].str.splitter('-').front.to!long; + long fileSize = getSize(session["localPath"].str); + JSONValue response; + while (true) { + long fragSize = fragmentSize < fileSize - offset ? fragmentSize : fileSize - offset; + if (verbose) writeln("Uploading fragment: ", offset, "-", offset + fragSize, "/", fileSize); + response = onedrive.uploadFragment( + session["uploadUrl"].str, + session["localPath"].str, + offset, + fragSize, + fileSize + ); + offset += fragmentSize; + if (offset >= fileSize) break; + // update the session + session["expirationDateTime"] = response["expirationDateTime"]; + session["nextExpectedRanges"] = response["nextExpectedRanges"]; + save(); + } + // upload complete + remove(sessionFilePath); + return response; + } + + private void save() + { + std.file.write(sessionFilePath, session.toString()); + } +}