finalizing

This commit is contained in:
skilion 2015-09-17 00:16:23 +02:00
parent f887b29061
commit 4d230a394c
6 changed files with 164 additions and 140 deletions

View file

@ -2,7 +2,6 @@ DC = dmd
DFLAGS = -unittest -debug -g -gs -od./bin -of./bin/$@ -L-lcurl -L-lsqlite3 -L-ldl DFLAGS = -unittest -debug -g -gs -od./bin -of./bin/$@ -L-lcurl -L-lsqlite3 -L-ldl
SOURCES = \ SOURCES = \
/usr/include/dlang/dmd/core/sys/posix/poll.d \
/usr/include/dlang/dmd/etc/c/curl.d \ /usr/include/dlang/dmd/etc/c/curl.d \
/usr/include/dlang/dmd/std/net/curl.d \ /usr/include/dlang/dmd/std/net/curl.d \
src/config.d \ src/config.d \

View file

@ -37,11 +37,22 @@ final class ItemDatabase
eTag TEXT NOT NULL, eTag TEXT NOT NULL,
cTag TEXT NOT NULL, cTag TEXT NOT NULL,
mtime TEXT NOT NULL, mtime TEXT NOT NULL,
parentId TEXT NOT NULL, parentId TEXT,
crc32 TEXT crc32 TEXT,
FOREIGN KEY (parentId) REFERENCES item (id) ON DELETE CASCADE
)"); )");
db.exec("CREATE INDEX IF NOT EXISTS name_idx ON item (name)"); db.exec("CREATE INDEX IF NOT EXISTS name_idx ON item (name)");
insertItemStmt = db.prepare("INSERT OR REPLACE INTO item (id, name, type, eTag, cTag, mtime, parentId, crc32) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"); db.exec("PRAGMA foreign_keys = ON");
db.exec("PRAGMA recursive_triggers = ON");
//insertItemStmt = db.prepare("INSERT OR REPLACE INTO item (id, name, type, eTag, cTag, mtime, parentId, crc32) VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
insertItemStmt = db.prepare("
INSERT OR IGNORE
INTO item (id, name, type, eTag, cTag, mtime, parentId, crc32)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8);
UPDATE item
SET name = ?2, type = ?3, eTag = ?4, cTag = ?5, mtime = ?6, parentId = ?7, crc32 = ?8
WHERE id = ?1
");
selectItemByIdStmt = db.prepare("SELECT id, name, type, eTag, cTag, mtime, parentId, crc32 FROM item WHERE id = ?"); selectItemByIdStmt = db.prepare("SELECT id, name, type, eTag, cTag, mtime, parentId, crc32 FROM item WHERE id = ?");
selectItemByParentIdStmt = db.prepare("SELECT id FROM item WHERE parentId = ?"); selectItemByParentIdStmt = db.prepare("SELECT id FROM item WHERE parentId = ?");
} }
@ -117,7 +128,7 @@ final class ItemDatabase
} }
} }
auto s = db.prepare("SELECT a.id FROM item AS a LEFT JOIN item AS b ON a.parentId = b.id WHERE b.id IS NULL"); auto s = db.prepare("SELECT id FROM item WHERE parentId IS NULL");
auto r = s.exec(); auto r = s.exec();
assert(!r.empty()); assert(!r.empty());
return ItemRange(this, r.front[0].dup); return ItemRange(this, r.front[0].dup);

View file

@ -1,3 +1,4 @@
import core.time, core.thread;
import std.getopt, std.file, std.process, std.stdio; import std.getopt, std.file, std.process, std.stdio;
import config, itemdb, monitor, onedrive, sync; import config, itemdb, monitor, onedrive, sync;
@ -73,19 +74,19 @@ void main(string[] args)
chdir(syncDir); chdir(syncDir);
sync.applyDifferences(); sync.applyDifferences();
sync.uploadDifferences(); sync.uploadDifferences();
return;
if (monitor) { if (monitor) {
if (verbose) writeln("Monitoring for changes ..."); if (verbose) writeln("Initializing monitor ...");
Monitor m; Monitor m;
m.onDirCreated = delegate(string path) { m.onDirCreated = delegate(string path) {
if (verbose) writeln("[M] Directory created: ", path); if (verbose) writeln("[M] Directory created: ", path);
sync.uploadCreateDir(path); sync.uploadCreateDir(path);
// the directory could be the result of a move operation
sync.uploadDifferences(path); sync.uploadDifferences(path);
}; };
m.onFileChanged = delegate(string path) { m.onFileChanged = delegate(string path) {
if (verbose) writeln("[M] File changed: ", path); if (verbose) writeln("[M] File changed: ", path);
sync.uploadDifference2(path); sync.uploadDifference(path);
}; };
m.onDelete = delegate(string path) { m.onDelete = delegate(string path) {
if (verbose) writeln("[M] Item deleted: ", path); if (verbose) writeln("[M] Item deleted: ", path);
@ -93,13 +94,23 @@ void main(string[] args)
}; };
m.onMove = delegate(string from, string to) { m.onMove = delegate(string from, string to) {
if (verbose) writeln("[M] Item moved: ", from, " -> ", to); if (verbose) writeln("[M] Item moved: ", from, " -> ", to);
sync.moveItem(from, to); sync.uploadMoveItem(from, to);
}; };
m.init(); m.init(verbose);
m.addRecursive("test"); // monitor loop
while (true) m.update(); immutable auto checkInterval = dur!"seconds"(45);
// TODO download changes auto lastCheckTime = MonoTime.currTime();
while (true) {
m.update();
auto currTime = MonoTime.currTime();
if (currTime - lastCheckTime > checkInterval) {
lastCheckTime = currTime;
m.shutdown();
sync.applyDifferences();
sync.uploadDifferences();
m.init(verbose);
}
Thread.sleep(dur!"msecs"(100));
}
} }
destroy(sync);
} }

View file

@ -1,38 +1,27 @@
import core.stdc.errno: errno;
import core.stdc.string: strerror;
import core.sys.linux.sys.inotify; import core.sys.linux.sys.inotify;
import core.sys.posix.poll; import core.sys.posix.poll;
import core.sys.posix.unistd; import core.sys.posix.unistd;
import std.file, std.stdio, std.string; import std.exception, std.file, std.stdio, std.string;
// relevant inotify events // relevant inotify events
private immutable uint32_t mask = IN_ATTRIB | IN_CLOSE_WRITE | IN_CREATE | private immutable uint32_t mask = IN_ATTRIB | IN_CLOSE_WRITE | IN_CREATE |
IN_DELETE | IN_MOVE_SELF | IN_MOVE | IN_IGNORED | IN_Q_OVERFLOW; IN_DELETE | IN_MOVE | IN_IGNORED | IN_Q_OVERFLOW;
class MonitorException: Exception class MonitorException: ErrnoException
{ {
this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe this(string msg, string file = __FILE__, size_t line = __LINE__)
{ {
super(makeErrorMsg(msg), file, line, next); super(msg, file, line);
} }
this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
{
super(makeErrorMsg(msg), file, line, next);
}
private string makeErrorMsg(string msg)
{
return msg ~ " :" ~ fromStringz(strerror(errno())).idup;
}
} }
struct Monitor struct Monitor
{ {
bool verbose;
// inotify file descriptor // inotify file descriptor
private int fd; private int fd;
// map every watch descriptor to their dir // map every inotify watch descriptor to its directory
private string[int] dirs; private string[int] wdToDirName;
// map the inotify cookies of move_from events to their path // map the inotify cookies of move_from events to their path
private string[int] cookieToPath; private string[int] cookieToPath;
// buffer to receive the inotify events // buffer to receive the inotify events
@ -45,117 +34,121 @@ struct Monitor
@disable this(this); @disable this(this);
void init() void init(bool verbose)
{ {
assert(onDirCreated); this.verbose = verbose;
assert(onFileChanged);
assert(onDelete);
assert(onMove);
fd = inotify_init(); fd = inotify_init();
if (fd == -1) throw new MonitorException("inotify_init failed"); if (fd == -1) throw new MonitorException("inotify_init failed");
buffer = new void[10000]; if (!buffer) buffer = new void[4096];
addRecursive(".");
} }
void shutdown() void shutdown()
{ {
if (fd > 0) close(fd); if (fd > 0) close(fd);
wdToDirName = null;
} }
void add(string path) private void addRecursive(string dirname)
{ {
int wd = inotify_add_watch(fd, toStringz(path), mask); add(dirname);
if (wd == -1) throw new MonitorException("inotify_add_watch failed"); foreach(DirEntry entry; dirEntries(dirname, SpanMode.breadth, false)) {
dirs[wd] = path ~ "/";
writeln("Monitor directory: ", path);
}
void addRecursive(string path)
{
add(path);
foreach(DirEntry entry; dirEntries(path, SpanMode.breadth, false)) {
if (entry.isDir) add(entry.name); if (entry.isDir) add(entry.name);
} }
} }
private void add(string dirname)
{
int wd = inotify_add_watch(fd, toStringz(dirname), mask);
if (wd == -1) throw new MonitorException("inotify_add_watch failed");
wdToDirName[wd] = chompPrefix(dirname ~ "/", "./");
if (verbose) writeln("Monitor directory: ", dirname);
}
// remove a watch descriptor // remove a watch descriptor
private void remove(int wd) private void remove(int wd)
{ {
assert(wd in dirs); assert(wd in wdToDirName);
int ret = inotify_rm_watch(fd, wd); int ret = inotify_rm_watch(fd, wd);
if (ret == -1) throw new MonitorException("inotify_rm_watch failed"); if (ret == -1) throw new MonitorException("inotify_rm_watch failed");
writeln("Monitored directory removed: ", dirs[wd]); if (verbose) writeln("Monitored directory removed: ", wdToDirName[wd]);
dirs.remove(wd); wdToDirName.remove(wd);
} }
// return the file path from an inotify event // return the file path from an inotify event
private string getPath(const(inotify_event)* event) private string getPath(const(inotify_event)* event)
{ {
string path = dirs[event.wd]; string path = wdToDirName[event.wd];
if (event.len > 0) path ~= fromStringz(event.name.ptr); if (event.len > 0) path ~= fromStringz(event.name.ptr);
return path; return path;
} }
void update() void update()
{ {
pollfd[1] fds; assert(onDirCreated && onFileChanged && onDelete && onMove);
pollfd[1] fds = void;
fds[0].fd = fd; fds[0].fd = fd;
fds[0].events = POLLIN; fds[0].events = POLLIN;
int ret = poll(fds.ptr, 1, 15);
if (ret == -1) throw new MonitorException("poll failed");
else if (ret == 0) return; // no events available
assert(fds[0].revents & POLLIN); while (true) {
size_t length = read(fds[0].fd, buffer.ptr, buffer.length); int ret = poll(fds.ptr, 1, 0);
if (length == -1) throw new MonitorException("read failed"); if (ret == -1) throw new MonitorException("poll failed");
else if (ret == 0) break; // no events available
int i = 0; assert(fds[0].revents & POLLIN);
while (i < length) { size_t length = read(fds[0].fd, buffer.ptr, buffer.length);
inotify_event *event = cast(inotify_event*) &buffer[i]; if (length == -1) throw new MonitorException("read failed");
if (event.mask & IN_IGNORED) {
// forget the path associated to the watch descriptor int i = 0;
dirs.remove(event.wd); while (i < length) {
} else if (event.mask & IN_Q_OVERFLOW) { inotify_event *event = cast(inotify_event*) &buffer[i];
writeln("Inotify overflow, events missing"); if (event.mask & IN_IGNORED) {
assert(0); // forget the directory associated to the watch descriptor
} else if (event.mask & IN_MOVED_FROM) { wdToDirName.remove(event.wd);
string path = getPath(event); } else if (event.mask & IN_Q_OVERFLOW) {
cookieToPath[event.cookie] = path; throw new MonitorException("Inotify overflow, events missing");
writeln("moved from ", path); } else if (event.mask & IN_MOVED_FROM) {
} else if (event.mask & IN_MOVED_TO) { string path = getPath(event);
string path = getPath(event); cookieToPath[event.cookie] = path;
if (event.mask & IN_ISDIR) addRecursive(path); } else if (event.mask & IN_MOVED_TO) {
auto from = event.cookie in cookieToPath; string path = getPath(event);
if (from) { if (event.mask & IN_ISDIR) addRecursive(path);
cookieToPath.remove(event.cookie); auto from = event.cookie in cookieToPath;
onMove(*from, path); if (from) {
} else { cookieToPath.remove(event.cookie);
if (event.mask & IN_ISDIR) { onMove(*from, path);
onDirCreated(path);
} else { } else {
onFileChanged(path); // item moved from the outside
if (event.mask & IN_ISDIR) {
onDirCreated(path);
} else {
onFileChanged(path);
}
} }
} } else if (event.mask & IN_CREATE) {
} else { if (event.mask & IN_ISDIR) {
if (event.mask & IN_ISDIR) {
if (event.mask & IN_CREATE) {
string path = getPath(event); string path = getPath(event);
addRecursive(path); addRecursive(path);
onDirCreated(path); onDirCreated(path);
} else if (event.mask & IN_DELETE) {
string path = getPath(event);
onDelete(path);
} }
} else { } else if (event.mask & IN_DELETE) {
if (event.mask & IN_ATTRIB || event.mask & IN_CLOSE_WRITE) { string path = getPath(event);
onDelete(path);
} else if (event.mask & IN_ATTRIB || event.mask & IN_CLOSE_WRITE) {
if (!(event.mask & IN_ISDIR)) {
string path = getPath(event); string path = getPath(event);
onFileChanged(path); onFileChanged(path);
} else if (event.mask & IN_DELETE) {
string path = getPath(event);
onDelete(path);
} }
} else {
writeln("Unknow inotify event: ", format("%#x", event.mask));
} }
i += inotify_event.sizeof + event.len;
}
// assume that the items moved outside the watched directory has been deleted
foreach (cookie, path; cookieToPath) {
onDelete(path);
cookieToPath.remove(cookie);
} }
i += inotify_event.sizeof + event.len;
} }
} }
} }

View file

@ -1,4 +1,5 @@
import std.datetime, std.json, std.net.curl, std.path, std.string, std.uni, std.uri; import std.datetime, std.exception, std.json, std.net.curl, std.path;
import std.string, std.uni, std.uri;
import config; import config;
private immutable { private immutable {
@ -138,9 +139,14 @@ final class OneDriveApi
}; };
if (eTag) http.addRequestHeader("If-Match", eTag); if (eTag) http.addRequestHeader("If-Match", eTag);
http.addRequestHeader("Content-Type", "application/octet-stream"); http.addRequestHeader("Content-Type", "application/octet-stream");
upload(localPath, url, http); try {
// remove the headers upload(localPath, url, http);
setAccessToken(accessToken); } catch (ErrnoException e) {
throw new OneDriveException(e.msg, e);
} finally {
// remove the headers
setAccessToken(accessToken);
}
checkHttpCode(); checkHttpCode();
return parseJSON(content); return parseJSON(content);
} }

View file

@ -83,7 +83,7 @@ final class SyncEngine
onStatusToken(statusToken); onStatusToken(statusToken);
} while (changes["@changes.hasMoreChanges"].type == JSON_TYPE.TRUE); } while (changes["@changes.hasMoreChanges"].type == JSON_TYPE.TRUE);
// delete items in itemsToDelete // delete items in itemsToDelete
deleteItems(); if (itemsToDelete.length > 0) deleteItems();
// empty the skipped items // empty the skipped items
skippedItems.length = 0; skippedItems.length = 0;
assumeSafeAppend(skippedItems); assumeSafeAppend(skippedItems);
@ -101,8 +101,8 @@ final class SyncEngine
bool cached = itemdb.selectById(id, cachedItem); bool cached = itemdb.selectById(id, cachedItem);
if (cached && !isItemSynced(cachedItem)) { if (cached && !isItemSynced(cachedItem)) {
if (verbose) writeln("The local item is out of sync, renaming ..."); if (verbose) writeln("The local item is out of sync, renaming: ", cachedItem.path);
safeRename(cachedItem.path); if (exists(cachedItem.path)) safeRename(cachedItem.path);
cached = false; cached = false;
} }
@ -112,18 +112,20 @@ final class SyncEngine
if (cached) applyDeleteItem(cachedItem); if (cached) applyDeleteItem(cachedItem);
return; return;
} else if (isItemFile(item)) { } else if (isItemFile(item)) {
if (verbose) writeln("The item is a file");
type = ItemType.file; type = ItemType.file;
} else if (isItemFolder(item)) { } else if (isItemFolder(item)) {
if (verbose) writeln("The item is a directory");
type = ItemType.dir; type = ItemType.dir;
} else { } else {
if (verbose) writeln("The item is neither a file nor a directory"); if (verbose) writeln("The item is neither a file nor a directory, skipping");
skippedItems ~= id; skippedItems ~= id;
return; return;
} }
string parentId = item["parentReference"].object["id"].str; string parentId = item["parentReference"].object["id"].str;
if (name == "root" && parentId[$ - 1] == '0' && parentId[$ - 2] == '!') {
// HACK: recognize the root directory
parentId = null;
}
if (skippedItems.find(parentId).length != 0) { if (skippedItems.find(parentId).length != 0) {
if (verbose) writeln("The item is a children of a skipped item"); if (verbose) writeln("The item is a children of a skipped item");
skippedItems ~= id; skippedItems ~= id;
@ -148,7 +150,7 @@ final class SyncEngine
itemdb.insert(id, name, type, eTag, cTag, mtime, parentId, crc32); itemdb.insert(id, name, type, eTag, cTag, mtime, parentId, crc32);
itemdb.selectById(id, newItem); itemdb.selectById(id, newItem);
// TODO add item in the db anly if correctly downloaded // TODO add item in the db only if correctly downloaded
try { try {
if (!cached) { if (!cached) {
applyNewItem(newItem); applyNewItem(newItem);
@ -283,13 +285,15 @@ final class SyncEngine
assumeSafeAppend(itemsToDelete); assumeSafeAppend(itemsToDelete);
} }
// scan the directory for unsynced files and upload them // scan the root directory for unsynced files and upload them
public void uploadDifferences() public void uploadDifferences()
{ {
if (verbose) writeln("Uploading differences ..."); if (verbose) writeln("Uploading differences ...");
// check for changed files or deleted items
foreach (Item item; itemdb.selectAll()) { foreach (Item item; itemdb.selectAll()) {
uploadDifference(item); uploadDifference(item);
} }
if (verbose) writeln("Uploading new items ...");
// check for new files or directories // check for new files or directories
foreach (DirEntry entry; dirEntries(".", SpanMode.breadth, false)) { foreach (DirEntry entry; dirEntries(".", SpanMode.breadth, false)) {
string path = entry.name[2 .. $]; // HACK: skip "./" string path = entry.name[2 .. $]; // HACK: skip "./"
@ -304,16 +308,12 @@ final class SyncEngine
} }
} }
public void uploadDifferences(string path) /* scan the specified directory for unsynced files and uplaod them
NOTE: this function does not check for deleted files. */
public void uploadDifferences(string dirname)
{ {
assert(isDir(path)); foreach (DirEntry entry; dirEntries(dirname, SpanMode.breadth, false)) {
Item item; uploadDifference(entry.name);
foreach (DirEntry entry; dirEntries(path, SpanMode.breadth, false)) {
if (itemdb.selectByPath(entry.name, item)) {
uploadDifference(item);
} else {
uploadNewFile(entry.name);
}
} }
} }
@ -347,6 +347,17 @@ final class SyncEngine
} }
} }
// NOTE: this function works only for files
void uploadDifference(string filename)
{
Item item;
if (itemdb.selectByPath(filename, item)) {
uploadDifference(item);
} else {
uploadNewFile(filename);
}
}
// check if the item is changed and upload the differences // check if the item is changed and upload the differences
private void uploadItemDifferences(Item item) private void uploadItemDifferences(Item item)
{ {
@ -383,7 +394,13 @@ final class SyncEngine
private void uploadNewFile(string path) private void uploadNewFile(string path)
{ {
writeln("Uploading: ", path); writeln("Uploading: ", path);
auto res = onedrive.simpleUpload(path, path); JSONValue res;
try {
res = onedrive.simpleUpload(path, path);
} catch (OneDriveException e) {
writeln(e.msg);
return;
}
saveItem(res); saveItem(res);
string id = res["id"].str; string id = res["id"].str;
string eTag = res["eTag"].str; string eTag = res["eTag"].str;
@ -392,7 +409,7 @@ final class SyncEngine
private void uploadDeleteItem(Item item) private void uploadDeleteItem(Item item)
{ {
writeln("Deleting remote: ", item.path); writeln("Deleting remote item: ", item.path);
onedrive.deleteById(item.id, item.eTag); onedrive.deleteById(item.id, item.eTag);
itemdb.deleteById(item.id); itemdb.deleteById(item.id);
} }
@ -437,40 +454,27 @@ final class SyncEngine
itemdb.insert(id, name, type, eTag, cTag, mtime, parentId, crc32); itemdb.insert(id, name, type, eTag, cTag, mtime, parentId, crc32);
} }
// HACK void uploadMoveItem(const(char)[] from, string to)
void uploadDifference2(string path)
{ {
assert(isFile(path)); writeln("Moving remote item: ", from, " -> ", to);
Item item;
if (itemdb.selectByPath(path, item)) {
uploadDifference(item);
} else {
uploadNewFile(path);
}
}
void moveItem(const(char)[] from, string to)
{
writeln("Moving ", from, " to ", to, " ...");
Item item; Item item;
if (!itemdb.selectByPath(from, item)) { if (!itemdb.selectByPath(from, item)) {
throw new SyncException("Can't move a non synced item"); writeln("Can't move an unsynced item");
return;
} }
JSONValue diff = ["name": baseName(to)]; JSONValue diff = ["name": baseName(to)];
diff["parentReference"] = JSONValue([ diff["parentReference"] = JSONValue([
"path": "/drive/root:/" ~ dirName(to) "path": "/drive/root:/" ~ dirName(to)
]); ]);
writeln(diff.toPrettyString());
auto res = onedrive.updateById(item.id, diff, item.eTag); auto res = onedrive.updateById(item.id, diff, item.eTag);
saveItem(res); saveItem(res);
} }
void deleteByPath(const(char)[] path) void deleteByPath(const(char)[] path)
{ {
writeln("Deleting: ", path);
Item item; Item item;
if (!itemdb.selectByPath(path, item)) { if (!itemdb.selectByPath(path, item)) {
throw new SyncException("Can't delete a non synced item"); throw new SyncException("Can't delete an unsynced item");
} }
uploadDeleteItem(item); uploadDeleteItem(item);
} }