From 4d230a394c092dcbe14fe1e0c449ea63baebd4ff Mon Sep 17 00:00:00 2001 From: skilion Date: Thu, 17 Sep 2015 00:16:23 +0200 Subject: [PATCH] finalizing --- Makefile | 1 - src/itemdb.d | 19 ++++-- src/main.d | 31 +++++++--- src/monitor.d | 161 +++++++++++++++++++++++-------------------------- src/onedrive.d | 14 +++-- src/sync.d | 78 ++++++++++++------------ 6 files changed, 164 insertions(+), 140 deletions(-) diff --git a/Makefile b/Makefile index 4c1f72bf..b77a2eca 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,6 @@ DC = dmd DFLAGS = -unittest -debug -g -gs -od./bin -of./bin/$@ -L-lcurl -L-lsqlite3 -L-ldl SOURCES = \ - /usr/include/dlang/dmd/core/sys/posix/poll.d \ /usr/include/dlang/dmd/etc/c/curl.d \ /usr/include/dlang/dmd/std/net/curl.d \ src/config.d \ diff --git a/src/itemdb.d b/src/itemdb.d index b45753e1..de38f667 100644 --- a/src/itemdb.d +++ b/src/itemdb.d @@ -37,11 +37,22 @@ final class ItemDatabase eTag TEXT NOT NULL, cTag TEXT NOT NULL, mtime TEXT NOT NULL, - parentId TEXT NOT NULL, - crc32 TEXT + parentId TEXT, + crc32 TEXT, + FOREIGN KEY (parentId) REFERENCES item (id) ON DELETE CASCADE )"); db.exec("CREATE INDEX IF NOT EXISTS name_idx ON item (name)"); - insertItemStmt = db.prepare("INSERT OR REPLACE INTO item (id, name, type, eTag, cTag, mtime, parentId, crc32) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"); + db.exec("PRAGMA foreign_keys = ON"); + db.exec("PRAGMA recursive_triggers = ON"); + //insertItemStmt = db.prepare("INSERT OR REPLACE INTO item (id, name, type, eTag, cTag, mtime, parentId, crc32) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"); + insertItemStmt = db.prepare(" + INSERT OR IGNORE + INTO item (id, name, type, eTag, cTag, mtime, parentId, crc32) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8); + UPDATE item + SET name = ?2, type = ?3, eTag = ?4, cTag = ?5, mtime = ?6, parentId = ?7, crc32 = ?8 + WHERE id = ?1 + "); selectItemByIdStmt = db.prepare("SELECT id, name, type, eTag, cTag, mtime, parentId, crc32 FROM item WHERE id = ?"); selectItemByParentIdStmt = db.prepare("SELECT id FROM item WHERE parentId = ?"); } @@ -117,7 +128,7 @@ final class ItemDatabase } } - auto s = db.prepare("SELECT a.id FROM item AS a LEFT JOIN item AS b ON a.parentId = b.id WHERE b.id IS NULL"); + auto s = db.prepare("SELECT id FROM item WHERE parentId IS NULL"); auto r = s.exec(); assert(!r.empty()); return ItemRange(this, r.front[0].dup); diff --git a/src/main.d b/src/main.d index c61b4b3b..a1032638 100644 --- a/src/main.d +++ b/src/main.d @@ -1,3 +1,4 @@ +import core.time, core.thread; import std.getopt, std.file, std.process, std.stdio; import config, itemdb, monitor, onedrive, sync; @@ -73,19 +74,19 @@ void main(string[] args) chdir(syncDir); sync.applyDifferences(); sync.uploadDifferences(); - return; if (monitor) { - if (verbose) writeln("Monitoring for changes ..."); + if (verbose) writeln("Initializing monitor ..."); Monitor m; m.onDirCreated = delegate(string path) { if (verbose) writeln("[M] Directory created: ", path); sync.uploadCreateDir(path); + // the directory could be the result of a move operation sync.uploadDifferences(path); }; m.onFileChanged = delegate(string path) { if (verbose) writeln("[M] File changed: ", path); - sync.uploadDifference2(path); + sync.uploadDifference(path); }; m.onDelete = delegate(string path) { if (verbose) writeln("[M] Item deleted: ", path); @@ -93,13 +94,23 @@ void main(string[] args) }; m.onMove = delegate(string from, string to) { if (verbose) writeln("[M] Item moved: ", from, " -> ", to); - sync.moveItem(from, to); + sync.uploadMoveItem(from, to); }; - m.init(); - m.addRecursive("test"); - while (true) m.update(); - // TODO download changes + m.init(verbose); + // monitor loop + immutable auto checkInterval = dur!"seconds"(45); + auto lastCheckTime = MonoTime.currTime(); + while (true) { + m.update(); + auto currTime = MonoTime.currTime(); + if (currTime - lastCheckTime > checkInterval) { + lastCheckTime = currTime; + m.shutdown(); + sync.applyDifferences(); + sync.uploadDifferences(); + m.init(verbose); + } + Thread.sleep(dur!"msecs"(100)); + } } - - destroy(sync); } diff --git a/src/monitor.d b/src/monitor.d index 1224055a..3d5f3a9d 100644 --- a/src/monitor.d +++ b/src/monitor.d @@ -1,38 +1,27 @@ -import core.stdc.errno: errno; -import core.stdc.string: strerror; import core.sys.linux.sys.inotify; import core.sys.posix.poll; import core.sys.posix.unistd; -import std.file, std.stdio, std.string; +import std.exception, std.file, std.stdio, std.string; // relevant inotify events private immutable uint32_t mask = IN_ATTRIB | IN_CLOSE_WRITE | IN_CREATE | - IN_DELETE | IN_MOVE_SELF | IN_MOVE | IN_IGNORED | IN_Q_OVERFLOW; + IN_DELETE | IN_MOVE | IN_IGNORED | IN_Q_OVERFLOW; -class MonitorException: Exception +class MonitorException: ErrnoException { - this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) + @safe this(string msg, string file = __FILE__, size_t line = __LINE__) { - super(makeErrorMsg(msg), file, line, next); + super(msg, file, line); } - - this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__) - { - super(makeErrorMsg(msg), file, line, next); - } - - private string makeErrorMsg(string msg) - { - return msg ~ " :" ~ fromStringz(strerror(errno())).idup; - } } struct Monitor { + bool verbose; // inotify file descriptor private int fd; - // map every watch descriptor to their dir - private string[int] dirs; + // map every inotify watch descriptor to its directory + private string[int] wdToDirName; // map the inotify cookies of move_from events to their path private string[int] cookieToPath; // buffer to receive the inotify events @@ -45,117 +34,121 @@ struct Monitor @disable this(this); - void init() + void init(bool verbose) { - assert(onDirCreated); - assert(onFileChanged); - assert(onDelete); - assert(onMove); + this.verbose = verbose; fd = inotify_init(); if (fd == -1) throw new MonitorException("inotify_init failed"); - buffer = new void[10000]; + if (!buffer) buffer = new void[4096]; + addRecursive("."); } void shutdown() { if (fd > 0) close(fd); + wdToDirName = null; } - void add(string path) + private void addRecursive(string dirname) { - int wd = inotify_add_watch(fd, toStringz(path), mask); - if (wd == -1) throw new MonitorException("inotify_add_watch failed"); - dirs[wd] = path ~ "/"; - writeln("Monitor directory: ", path); - } - - void addRecursive(string path) - { - add(path); - foreach(DirEntry entry; dirEntries(path, SpanMode.breadth, false)) { + add(dirname); + foreach(DirEntry entry; dirEntries(dirname, SpanMode.breadth, false)) { if (entry.isDir) add(entry.name); } } + private void add(string dirname) + { + int wd = inotify_add_watch(fd, toStringz(dirname), mask); + if (wd == -1) throw new MonitorException("inotify_add_watch failed"); + wdToDirName[wd] = chompPrefix(dirname ~ "/", "./"); + if (verbose) writeln("Monitor directory: ", dirname); + } + // remove a watch descriptor private void remove(int wd) { - assert(wd in dirs); + assert(wd in wdToDirName); int ret = inotify_rm_watch(fd, wd); if (ret == -1) throw new MonitorException("inotify_rm_watch failed"); - writeln("Monitored directory removed: ", dirs[wd]); - dirs.remove(wd); + if (verbose) writeln("Monitored directory removed: ", wdToDirName[wd]); + wdToDirName.remove(wd); } // return the file path from an inotify event private string getPath(const(inotify_event)* event) { - string path = dirs[event.wd]; + string path = wdToDirName[event.wd]; if (event.len > 0) path ~= fromStringz(event.name.ptr); return path; } void update() { - pollfd[1] fds; + assert(onDirCreated && onFileChanged && onDelete && onMove); + pollfd[1] fds = void; fds[0].fd = fd; fds[0].events = POLLIN; - int ret = poll(fds.ptr, 1, 15); - if (ret == -1) throw new MonitorException("poll failed"); - else if (ret == 0) return; // no events available - assert(fds[0].revents & POLLIN); - size_t length = read(fds[0].fd, buffer.ptr, buffer.length); - if (length == -1) throw new MonitorException("read failed"); + while (true) { + int ret = poll(fds.ptr, 1, 0); + if (ret == -1) throw new MonitorException("poll failed"); + else if (ret == 0) break; // no events available - int i = 0; - while (i < length) { - inotify_event *event = cast(inotify_event*) &buffer[i]; - if (event.mask & IN_IGNORED) { - // forget the path associated to the watch descriptor - dirs.remove(event.wd); - } else if (event.mask & IN_Q_OVERFLOW) { - writeln("Inotify overflow, events missing"); - assert(0); - } else if (event.mask & IN_MOVED_FROM) { - string path = getPath(event); - cookieToPath[event.cookie] = path; - writeln("moved from ", path); - } else if (event.mask & IN_MOVED_TO) { - string path = getPath(event); - if (event.mask & IN_ISDIR) addRecursive(path); - auto from = event.cookie in cookieToPath; - if (from) { - cookieToPath.remove(event.cookie); - onMove(*from, path); - } else { - if (event.mask & IN_ISDIR) { - onDirCreated(path); + assert(fds[0].revents & POLLIN); + size_t length = read(fds[0].fd, buffer.ptr, buffer.length); + if (length == -1) throw new MonitorException("read failed"); + + int i = 0; + while (i < length) { + inotify_event *event = cast(inotify_event*) &buffer[i]; + if (event.mask & IN_IGNORED) { + // forget the directory associated to the watch descriptor + wdToDirName.remove(event.wd); + } else if (event.mask & IN_Q_OVERFLOW) { + throw new MonitorException("Inotify overflow, events missing"); + } else if (event.mask & IN_MOVED_FROM) { + string path = getPath(event); + cookieToPath[event.cookie] = path; + } else if (event.mask & IN_MOVED_TO) { + string path = getPath(event); + if (event.mask & IN_ISDIR) addRecursive(path); + auto from = event.cookie in cookieToPath; + if (from) { + cookieToPath.remove(event.cookie); + onMove(*from, path); } else { - onFileChanged(path); + // item moved from the outside + if (event.mask & IN_ISDIR) { + onDirCreated(path); + } else { + onFileChanged(path); + } } - } - } else { - if (event.mask & IN_ISDIR) { - if (event.mask & IN_CREATE) { + } else if (event.mask & IN_CREATE) { + if (event.mask & IN_ISDIR) { string path = getPath(event); addRecursive(path); onDirCreated(path); - } else if (event.mask & IN_DELETE) { - string path = getPath(event); - onDelete(path); } - } else { - if (event.mask & IN_ATTRIB || event.mask & IN_CLOSE_WRITE) { + } else if (event.mask & IN_DELETE) { + string path = getPath(event); + onDelete(path); + } else if (event.mask & IN_ATTRIB || event.mask & IN_CLOSE_WRITE) { + if (!(event.mask & IN_ISDIR)) { string path = getPath(event); onFileChanged(path); - } else if (event.mask & IN_DELETE) { - string path = getPath(event); - onDelete(path); } + } else { + writeln("Unknow inotify event: ", format("%#x", event.mask)); } + i += inotify_event.sizeof + event.len; + } + // assume that the items moved outside the watched directory has been deleted + foreach (cookie, path; cookieToPath) { + onDelete(path); + cookieToPath.remove(cookie); } - i += inotify_event.sizeof + event.len; } } } diff --git a/src/onedrive.d b/src/onedrive.d index 1d77d394..104a9378 100644 --- a/src/onedrive.d +++ b/src/onedrive.d @@ -1,4 +1,5 @@ -import std.datetime, std.json, std.net.curl, std.path, std.string, std.uni, std.uri; +import std.datetime, std.exception, std.json, std.net.curl, std.path; +import std.string, std.uni, std.uri; import config; private immutable { @@ -138,9 +139,14 @@ final class OneDriveApi }; if (eTag) http.addRequestHeader("If-Match", eTag); http.addRequestHeader("Content-Type", "application/octet-stream"); - upload(localPath, url, http); - // remove the headers - setAccessToken(accessToken); + try { + upload(localPath, url, http); + } catch (ErrnoException e) { + throw new OneDriveException(e.msg, e); + } finally { + // remove the headers + setAccessToken(accessToken); + } checkHttpCode(); return parseJSON(content); } diff --git a/src/sync.d b/src/sync.d index cb683d27..2eaa3ae5 100644 --- a/src/sync.d +++ b/src/sync.d @@ -83,7 +83,7 @@ final class SyncEngine onStatusToken(statusToken); } while (changes["@changes.hasMoreChanges"].type == JSON_TYPE.TRUE); // delete items in itemsToDelete - deleteItems(); + if (itemsToDelete.length > 0) deleteItems(); // empty the skipped items skippedItems.length = 0; assumeSafeAppend(skippedItems); @@ -101,8 +101,8 @@ final class SyncEngine bool cached = itemdb.selectById(id, cachedItem); if (cached && !isItemSynced(cachedItem)) { - if (verbose) writeln("The local item is out of sync, renaming ..."); - safeRename(cachedItem.path); + if (verbose) writeln("The local item is out of sync, renaming: ", cachedItem.path); + if (exists(cachedItem.path)) safeRename(cachedItem.path); cached = false; } @@ -112,18 +112,20 @@ final class SyncEngine if (cached) applyDeleteItem(cachedItem); return; } else if (isItemFile(item)) { - if (verbose) writeln("The item is a file"); type = ItemType.file; } else if (isItemFolder(item)) { - if (verbose) writeln("The item is a directory"); type = ItemType.dir; } else { - if (verbose) writeln("The item is neither a file nor a directory"); + if (verbose) writeln("The item is neither a file nor a directory, skipping"); skippedItems ~= id; return; } string parentId = item["parentReference"].object["id"].str; + if (name == "root" && parentId[$ - 1] == '0' && parentId[$ - 2] == '!') { + // HACK: recognize the root directory + parentId = null; + } if (skippedItems.find(parentId).length != 0) { if (verbose) writeln("The item is a children of a skipped item"); skippedItems ~= id; @@ -148,7 +150,7 @@ final class SyncEngine itemdb.insert(id, name, type, eTag, cTag, mtime, parentId, crc32); itemdb.selectById(id, newItem); - // TODO add item in the db anly if correctly downloaded + // TODO add item in the db only if correctly downloaded try { if (!cached) { applyNewItem(newItem); @@ -283,13 +285,15 @@ final class SyncEngine assumeSafeAppend(itemsToDelete); } - // scan the directory for unsynced files and upload them + // scan the root directory for unsynced files and upload them public void uploadDifferences() { if (verbose) writeln("Uploading differences ..."); + // check for changed files or deleted items foreach (Item item; itemdb.selectAll()) { uploadDifference(item); } + if (verbose) writeln("Uploading new items ..."); // check for new files or directories foreach (DirEntry entry; dirEntries(".", SpanMode.breadth, false)) { string path = entry.name[2 .. $]; // HACK: skip "./" @@ -304,16 +308,12 @@ final class SyncEngine } } - public void uploadDifferences(string path) + /* scan the specified directory for unsynced files and uplaod them + NOTE: this function does not check for deleted files. */ + public void uploadDifferences(string dirname) { - assert(isDir(path)); - Item item; - foreach (DirEntry entry; dirEntries(path, SpanMode.breadth, false)) { - if (itemdb.selectByPath(entry.name, item)) { - uploadDifference(item); - } else { - uploadNewFile(entry.name); - } + foreach (DirEntry entry; dirEntries(dirname, SpanMode.breadth, false)) { + uploadDifference(entry.name); } } @@ -347,6 +347,17 @@ final class SyncEngine } } + // NOTE: this function works only for files + void uploadDifference(string filename) + { + Item item; + if (itemdb.selectByPath(filename, item)) { + uploadDifference(item); + } else { + uploadNewFile(filename); + } + } + // check if the item is changed and upload the differences private void uploadItemDifferences(Item item) { @@ -383,7 +394,13 @@ final class SyncEngine private void uploadNewFile(string path) { writeln("Uploading: ", path); - auto res = onedrive.simpleUpload(path, path); + JSONValue res; + try { + res = onedrive.simpleUpload(path, path); + } catch (OneDriveException e) { + writeln(e.msg); + return; + } saveItem(res); string id = res["id"].str; string eTag = res["eTag"].str; @@ -392,7 +409,7 @@ final class SyncEngine private void uploadDeleteItem(Item item) { - writeln("Deleting remote: ", item.path); + writeln("Deleting remote item: ", item.path); onedrive.deleteById(item.id, item.eTag); itemdb.deleteById(item.id); } @@ -437,40 +454,27 @@ final class SyncEngine itemdb.insert(id, name, type, eTag, cTag, mtime, parentId, crc32); } - // HACK - void uploadDifference2(string path) + void uploadMoveItem(const(char)[] from, string to) { - assert(isFile(path)); - Item item; - if (itemdb.selectByPath(path, item)) { - uploadDifference(item); - } else { - uploadNewFile(path); - } - } - - void moveItem(const(char)[] from, string to) - { - writeln("Moving ", from, " to ", to, " ..."); + writeln("Moving remote item: ", from, " -> ", to); Item item; if (!itemdb.selectByPath(from, item)) { - throw new SyncException("Can't move a non synced item"); + writeln("Can't move an unsynced item"); + return; } JSONValue diff = ["name": baseName(to)]; diff["parentReference"] = JSONValue([ "path": "/drive/root:/" ~ dirName(to) ]); - writeln(diff.toPrettyString()); auto res = onedrive.updateById(item.id, diff, item.eTag); saveItem(res); } void deleteByPath(const(char)[] path) { - writeln("Deleting: ", path); Item item; if (!itemdb.selectByPath(path, item)) { - throw new SyncException("Can't delete a non synced item"); + throw new SyncException("Can't delete an unsynced item"); } uploadDeleteItem(item); }