support for files bigger than 100 MB and resumable uploads

This commit is contained in:
skilion 2015-09-27 18:47:41 +02:00
parent 88b11433a7
commit 9b80d99ad4
5 changed files with 189 additions and 24 deletions

View file

@ -13,6 +13,7 @@ SOURCES = \
src/onedrive.d \
src/sqlite.d \
src/sync.d \
src/upload.d \
src/util.d
onedrive: $(SOURCES)

View file

@ -62,21 +62,22 @@ void main(string[] args)
if (verbose) writeln("Opening the item database ...");
auto itemdb = new ItemDatabase(databaseFilePath);
if (verbose) writeln("Initializing the Synchronization Engine ...");
auto sync = new SyncEngine(cfg, onedrive, itemdb, verbose);
sync.onStatusToken = (string statusToken) {
std.file.write(statusTokenFilePath, statusToken);
};
try {
string statusToken = readText(statusTokenFilePath);
sync.setStatusToken(statusToken);
} catch (FileException e) {
// swallow exception
}
string syncDir = expandTilde(cfg.get("sync_dir"));
if (verbose) writeln("All operations will be performed in: ", syncDir);
chdir(syncDir);
if (verbose) writeln("Initializing the Synchronization Engine ...");
auto sync = new SyncEngine(cfg, onedrive, itemdb, configDirName, verbose);
sync.onStatusToken = (string statusToken) {
std.file.write(statusTokenFilePath, statusToken);
};
string statusToken;
try {
statusToken = readText(statusTokenFilePath);
} catch (FileException e) {
// swallow exception
}
sync.init(statusToken);
performSync(sync);
if (monitor) {

View file

@ -1,5 +1,5 @@
import std.datetime, std.exception, std.json, std.net.curl, std.path;
import std.string, std.uni, std.uri;
import std.stdio, std.string, std.uni, std.uri;
import config;
private immutable {
@ -181,6 +181,49 @@ final class OneDriveApi
return result;
}
// https://dev.onedrive.com/items/upload_large_files.htm
JSONValue createUploadSession(const(char)[] path, const(char)[] eTag = null)
{
checkAccessTokenExpired();
string url = itemByPathUrl ~ encodeComponent(path) ~ ":/upload.createSession";
if (eTag) http.addRequestHeader("If-Match", eTag);
auto result = post(url, null);
// remove the if-match header
if (eTag) setAccessToken(accessToken);
return result;
}
// https://dev.onedrive.com/items/upload_large_files.htm
JSONValue uploadFragment(const(char)[] uploadUrl, string filepath, long offset, long offsetSize, long fileSize)
{
checkAccessTokenExpired();
http.method = HTTP.Method.put;
http.url = uploadUrl;
ubyte[] content;
http.onReceive = (ubyte[] data) {
content ~= data;
return data.length;
};
auto file = File(filepath, "rb");
file.seek(offset);
http.onSend = data => file.rawRead(data).length;
http.contentLength = offsetSize;
import std.conv;
string contentRange = "bytes " ~ to!string(offset) ~ "-" ~ to!string(offset + offsetSize - 1) ~ "/" ~ to!string(fileSize);
http.addRequestHeader("Content-Range", contentRange);
http.perform();
checkHttpCode(); // TODO: retry on 5xx errors
// remove the content-range header
scope(exit) setAccessToken(accessToken);
return parseJSON(content);
}
// https://dev.onedrive.com/items/upload_large_files.htm
JSONValue requestUploadStatus(const(char)[] uploadUrl)
{
return get(uploadUrl);
}
private void redeemToken(const(char)[] authCode)
{
string postData = "client_id=" ~ clientId ~ "&redirect_url=" ~ redirectUrl ~ "&client_secret=" ~ clientSecret;

View file

@ -1,7 +1,11 @@
import std.exception: ErrnoException;
import std.algorithm, std.datetime, std.file, std.json, std.path, std.regex;
import std.stdio, std.string;
import config, itemdb, onedrive, util;
import config, itemdb, onedrive, upload, util;
private string uploadStateFileName = "resume_upload";
// threshold after which files will be uploaded using an upload session
private long thresholdFileSize = 10 * 2^^20; // 10 Mib
private bool isItemFolder(const ref JSONValue item)
{
@ -52,6 +56,7 @@ final class SyncEngine
private ItemDatabase itemdb;
private bool verbose;
private Regex!char skipDir, skipFile;
private UploadSession session;
// token representing the last status correctly synced
private string statusToken;
// list of items to skip while applying the changes downloaded
@ -61,20 +66,28 @@ final class SyncEngine
void delegate(string) onStatusToken;
this(Config cfg, OneDriveApi onedrive, ItemDatabase itemdb, bool verbose)
this(Config cfg, OneDriveApi onedrive, ItemDatabase itemdb, string configDirName, bool verbose)
{
assert(onedrive && itemdb);
this.cfg = cfg;
this.onedrive = onedrive;
this.itemdb = itemdb;
//this.configDirName = configDirName;
this.verbose = verbose;
skipDir = wild2regex(cfg.get("skip_dir"));
skipFile = wild2regex(cfg.get("skip_file"));
session = UploadSession(onedrive, configDirName ~ "/" ~ uploadStateFileName, verbose);
}
void setStatusToken(string statusToken)
void init(string statusToken = null)
{
this.statusToken = statusToken;
// check if there is an interrupted upload session
if (session.restore()) {
writeln("Continuing the upload session ...");
auto item = session.upload();
saveItem(item);
}
}
void applyDifferences()
@ -395,13 +408,18 @@ final class SyncEngine
if (!testCrc32(path, item.crc32)) {
if (verbose) writeln("The file content has changed");
writeln("Uploading: ", path);
auto res = onedrive.simpleUpload(path, path, item.eTag);
saveItem(res);
id = res["id"].str;
JSONValue response;
if (getSize(path) <= thresholdFileSize) {
response = onedrive.simpleUpload(path, path);
} else {
response = session.upload(path, path);
}
saveItem(response);
id = response["id"].str;
/* use the cTag instead of the eTag because Onedrive changes the
* metadata of some type of files (ex. images) AFTER they have been
* uploaded */
eTag = res["cTag"].str;
eTag = response["cTag"].str;
}
uploadLastModifiedTime(id, eTag, localModifiedTime.toUTC());
} else {
@ -453,10 +471,15 @@ final class SyncEngine
private void uploadNewFile(string path)
{
writeln("Uploading: ", path);
JSONValue res = onedrive.simpleUpload(path, path);
saveItem(res);
string id = res["id"].str;
string cTag = res["cTag"].str;
JSONValue response;
if (getSize(path) <= thresholdFileSize) {
response = onedrive.simpleUpload(path, path);
} else {
response = session.upload(path, path);
}
saveItem(response);
string id = response["id"].str;
string cTag = response["cTag"].str;
SysTime mtime = timeLastModified(path).toUTC();
/* use the cTag instead of the eTag because Onedrive changes the
* metadata of some type of files (ex. images) AFTER they have been

97
src/upload.d Normal file
View file

@ -0,0 +1,97 @@
import std.algorithm;
import std.conv;
import std.datetime;
import std.file;
import std.json;
import std.stdio;
import onedrive;
private long fragmentSize = 10 * 2^^20; // 10 Mib
struct UploadSession
{
private OneDriveApi onedrive;
private bool verbose;
// https://dev.onedrive.com/resources/uploadSession.htm
private JSONValue session;
// path where to save the session
private string sessionFilePath;
this(OneDriveApi onedrive, string sessionFilePath, bool verbose)
{
assert(onedrive);
this.onedrive = onedrive;
this.sessionFilePath = sessionFilePath;
this.verbose = verbose;
}
JSONValue upload(string localPath, string remotePath, const(char)[] eTag = null)
{
session = onedrive.createUploadSession(remotePath, eTag);
session["localPath"] = localPath;
save();
return upload();
}
/* Restore the previous upload session.
* Returns true if the session is valid. Call upload() to resume it.
* Returns false if there is no session or the session is expired. */
bool restore()
{
if (exists(sessionFilePath)) {
if (verbose) writeln("Trying to restore the upload session ...");
session = readText(sessionFilePath).parseJSON();
auto expiration = SysTime.fromISOExtString(session["expirationDateTime"].str);
if (expiration < Clock.currTime()) {
if (verbose) writeln("The upload session is expired");
return false;
}
if (!exists(session["localPath"].str)) {
if (verbose) writeln("The file do not exist anymore");
return false;
}
// request the session status
auto response = onedrive.requestUploadStatus(session["uploadUrl"].str);
session["expirationDateTime"] = response["expirationDateTime"];
session["nextExpectedRanges"] = response["nextExpectedRanges"];
if (session["nextExpectedRanges"].array.length == 0) {
if (verbose) writeln("The upload session is completed");
return false;
}
return true;
}
return false;
}
JSONValue upload()
{
long offset = session["nextExpectedRanges"][0].str.splitter('-').front.to!long;
long fileSize = getSize(session["localPath"].str);
JSONValue response;
while (true) {
long fragSize = fragmentSize < fileSize - offset ? fragmentSize : fileSize - offset;
if (verbose) writeln("Uploading fragment: ", offset, "-", offset + fragSize, "/", fileSize);
response = onedrive.uploadFragment(
session["uploadUrl"].str,
session["localPath"].str,
offset,
fragSize,
fileSize
);
offset += fragmentSize;
if (offset >= fileSize) break;
// update the session
session["expirationDateTime"] = response["expirationDateTime"];
session["nextExpectedRanges"] = response["nextExpectedRanges"];
save();
}
// upload complete
remove(sessionFilePath);
return response;
}
private void save()
{
std.file.write(sessionFilePath, session.toString());
}
}