From 784ec83696d9ecedc10ede022a035e671dd21607 Mon Sep 17 00:00:00 2001 From: Omar Rizwan Date: Mon, 25 Feb 2019 13:02:25 -0800 Subject: [PATCH] Rewrite and refactor C half. No more shared memory! It's fast! Three C modules: - tabfs (main thread; talks to FUSE) - common (tabfs<->ws communication helpers) - ws (side thread; talks to browser over WebSocket) It's single-threaded, but I don't think that matters anyway. --- .gitignore | 4 + extension/background.js | 30 ++- fs/Makefile | 15 +- fs/common.c | 62 ++++++ fs/common.h | 18 ++ fs/hello.c | 459 ---------------------------------------- fs/tabfs.c | 210 ++++++++++++++++++ fs/ws.c | 239 +++++++++++++++++++++ fs/ws.h | 6 + 9 files changed, 567 insertions(+), 476 deletions(-) create mode 100644 .gitignore create mode 100644 fs/common.c create mode 100644 fs/common.h delete mode 100644 fs/hello.c create mode 100644 fs/tabfs.c create mode 100644 fs/ws.c create mode 100644 fs/ws.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0a47e76 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +scratch +fs/tabfs +fs/tabfs.dSYM \ No newline at end of file diff --git a/extension/background.js b/extension/background.js index 8a1d808..a0e0294 100644 --- a/extension/background.js +++ b/extension/background.js @@ -69,6 +69,8 @@ function pathComponent(path, i) { return components[i >= 0 ? i : components.length + i]; } +const debugged = {}; + const router = { "tabs": { /* "last-focused": { @@ -98,18 +100,24 @@ const router = { } }, "tree": { - async open(path) { - const debuggee = {tabId: parseInt(pathComponent(path, -2))}; - await new Promise(resolve => chrome.debugger.attach(debuggee, "1.2", resolve)); - chrome.debugger.sendCommand(debuggee, "Page.getResourceTree", {}, function(result) { - console.log(result); - }); + async opendir(path) { + const tabId = parseInt(pathComponent(path, -2)); + if (!debugged[tabId]) { + await new Promise(resolve => chrome.debugger.attach({tabId}, "1.2", resolve)); + debugged[tabId] = 0; + } + debugged[tabId] += 1; + return 0; }, - async read(path) { - + async readdir(path) { + const tabId = parseInt(pathComponent(path, -2)); + if (!debugged[tabId]) throw new UnixError(unix.EIO); + const result = await new Promise(resolve => chrome.debugger.sendCommand({tabId}, "Page.getResourceTree", {}, resolve)); + const frameTree = result.frameTree; + return frameTree.resources.map(r => String(r.contentSize)); }, - async close(path) { - + async releasedir(path) { + return 0; } } } @@ -182,6 +190,7 @@ async function releasedir(path) { let ws; async function onmessage(event) { const req = JSON.parse(event.data); + console.log('req', req); let response = { op: req.op, error: unix.EIO }; /* console.time(req.op + ':' + req.path);*/ @@ -238,6 +247,7 @@ async function onmessage(event) { /* console.timeEnd(req.op + ':' + req.path);*/ response.id = req.id; + console.log('resp', response); ws.send(JSON.stringify(response)); }; diff --git a/fs/Makefile b/fs/Makefile index 0a85261..1c35ef3 100644 --- a/fs/Makefile +++ b/fs/Makefile @@ -1,4 +1,4 @@ -TARGETS = hello +TARGETS = tabfs # Root for OSXFUSE includes and libraries OSXFUSE_ROOT = /usr/local @@ -18,12 +18,10 @@ CFLAGS_EXTRA = -Wall -g $(CFLAGS) LIBS = -losxfuse -.c: - $(CC) $(CFLAGS_OSXFUSE) $(CFLAGS_EXTRA) -o $@ $< $(LIBS) - all: $(TARGETS) -hello: hello.c +tabfs: common.c ws.c tabfs.c + $(CC) $(CFLAGS_OSXFUSE) $(CFLAGS_EXTRA) -o $@ $^ $(LIBS) clean: rm -f $(TARGETS) *.o @@ -33,5 +31,8 @@ unmount: killall -9 hello || true diskutil unmount force mnt || true -mount: hello - ./hello -odirect_io -f mnt +mount: tabfs + ./tabfs -odirect_io -s -f mnt + +debugmount: tabfs + lldb -- ./tabfs -odirect_io -s -f mnt diff --git a/fs/common.c b/fs/common.c new file mode 100644 index 0000000..f651009 --- /dev/null +++ b/fs/common.c @@ -0,0 +1,62 @@ +#include +#include +#include + +#include +#include +#include + +#include "common.h" + +static int tabfs_to_ws[2]; +static int ws_to_tabfs[2]; + +void common_init() { + if (pipe(tabfs_to_ws)) exit(1); + if (pipe(ws_to_tabfs)) exit(1); +} + +void common_send_tabfs_to_ws(char *request_data) { + write(tabfs_to_ws[1], &request_data, sizeof(request_data)); +} + +char *common_receive_tabfs_to_ws(fd_set_filler_fn_t filler) { + fd_set read_fds, write_fds, except_fds; + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + FD_ZERO(&except_fds); + + int max_fd = filler(&read_fds, &write_fds, &except_fds); + + FD_SET(tabfs_to_ws[0], &read_fds); + if (tabfs_to_ws[0] > max_fd) { max_fd = tabfs_to_ws[0]; } + + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 200000; + + select(max_fd + 1, &read_fds, &write_fds, &except_fds, &timeout); + + if (!FD_ISSET(tabfs_to_ws[0], &read_fds)) { + // We can't read from tabfs_to_ws right now. Could be that it + // timed out, could be that we got a websocket event instead, + // whatever. + + return NULL; + } + + char *request_data; + read(tabfs_to_ws[0], &request_data, sizeof(request_data)); + + return request_data; +} + +void common_send_ws_to_tabfs(char *response_data) { + write(ws_to_tabfs[1], &response_data, sizeof(response_data)); +} +char *common_receive_ws_to_tabfs() { + char *response_data; + read(ws_to_tabfs[0], &response_data, sizeof(response_data)); + + return response_data; +} diff --git a/fs/common.h b/fs/common.h new file mode 100644 index 0000000..79fd9f3 --- /dev/null +++ b/fs/common.h @@ -0,0 +1,18 @@ +#ifndef COMMON_H +#define COMMON_H + +#include + +#define DEBUG(...) + +void common_init(); + +typedef int (*fd_set_filler_fn_t)(fd_set*, fd_set*, fd_set*); + +void common_send_tabfs_to_ws(char *request_data); +char *common_receive_tabfs_to_ws(fd_set_filler_fn_t filler); + +void common_send_ws_to_tabfs(char *response_data); +char *common_receive_ws_to_tabfs(); + +#endif diff --git a/fs/hello.c b/fs/hello.c deleted file mode 100644 index bb6a7b1..0000000 --- a/fs/hello.c +++ /dev/null @@ -1,459 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#define WBY_STATIC -#define WBY_IMPLEMENTATION -#define WBY_USE_FIXED_TYPES -#define WBY_USE_ASSERT -#include "mmx/web.h" - -#include "cJSON/cJSON.h" -#include "cJSON/cJSON.c" - -#define DEBUG(...) - -struct wby_server server; -struct wby_con *con = NULL; - -pthread_cond_t queue_cv = PTHREAD_COND_INITIALIZER; -pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; - -enum request_response_state { - EMPTY = 0, - SEND_REQUEST, - RECEIVE_RESPONSE, - HANDLE_RESPONSE -}; - -struct request_response { - enum request_response_state state; - - char *request; - cJSON *response; - - clock_t start; -}; - -#define REQUEST_RESPONSE_QUEUE_SIZE 128 -typedef int request_id; -struct request_response queue[REQUEST_RESPONSE_QUEUE_SIZE]; - -static request_id enqueue_request(cJSON *req) { - pthread_mutex_lock(&queue_mutex); - - // Look for the first free slot. - request_id id; - for (id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) { - if (queue[id].state == EMPTY) break; - } - if (id >= REQUEST_RESPONSE_QUEUE_SIZE) { - printf("Request-response queue is full!\n"); - exit(1); - } - cJSON_AddNumberToObject(req, "id", id); - - queue[id].state = SEND_REQUEST; - queue[id].request = cJSON_Print(req); - queue[id].response = NULL; - queue[id].start = clock(); - - /* printf("%s\n", queue[id].request); */ - - pthread_cond_signal(&queue_cv); - pthread_mutex_unlock(&queue_mutex); - - return id; -} - -void send_any_enqueued_requests() { - pthread_mutex_lock(&queue_mutex); - - if (con == NULL) goto done; - - for (request_id id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) { - if (queue[id].state == SEND_REQUEST) { - char *request = queue[id].request; - - wby_frame_begin(con, WBY_WSOP_TEXT_FRAME); - wby_write(con, request, strlen(request)); - wby_frame_end(con); - - queue[id].state = RECEIVE_RESPONSE; - free(request); - queue[id].request = NULL; - } - } - - done: - pthread_mutex_unlock(&queue_mutex); -} - -static cJSON *await_response(request_id id) { - pthread_mutex_lock(&queue_mutex); - - while (queue[id].state != HANDLE_RESPONSE) { - pthread_cond_wait(&queue_cv, &queue_mutex); - } - - cJSON *resp = queue[id].response; - queue[id].state = EMPTY; - queue[id].response = NULL; - - /* printf("Elapsed: %f seconds\n", (double)(clock() - queue[id].start) / CLOCKS_PER_SEC); */ - - pthread_mutex_unlock(&queue_mutex); - - return resp; -} - -#define MAKE_REQ(op, req_body, resp_handler) \ - do { \ - int ret = -1; \ - cJSON *req = NULL; \ - cJSON *resp = NULL; \ - \ - pthread_mutex_lock(&queue_mutex); \ - int disconnected = (con == NULL); \ - pthread_mutex_unlock(&queue_mutex); \ - if (disconnected) { ret = -EIO; goto done; } \ - \ - req = cJSON_CreateObject(); \ - cJSON_AddStringToObject(req, "op", op); \ - req_body \ - \ - request_id id = enqueue_request(req); \ - resp = await_response(id); \ - \ - cJSON *error_item = cJSON_GetObjectItemCaseSensitive(resp, "error"); \ - if (error_item) { \ - ret = -error_item->valueint; \ - if (ret != 0) goto done; \ - } \ - \ - ret = -1; \ - resp_handler \ - \ -done: \ - if (req != NULL) cJSON_Delete(req); \ - if (resp != NULL) cJSON_Delete(resp); \ - return ret; \ - } while (0) - -#define JSON_GET_PROP_INT(lvalue, key) \ - do { \ - lvalue = cJSON_GetObjectItemCaseSensitive(resp, key)->valueint; \ - } while (0) - -static int -hello_getattr(const char *path, struct stat *stbuf) -{ - memset(stbuf, 0, sizeof(struct stat)); - - MAKE_REQ("getattr", { - cJSON_AddStringToObject(req, "path", path); - }, { - JSON_GET_PROP_INT(stbuf->st_mode, "st_mode"); - JSON_GET_PROP_INT(stbuf->st_nlink, "st_nlink"); - JSON_GET_PROP_INT(stbuf->st_size, "st_size"); - - ret = 0; - }); -} - -static int -hello_readlink(const char *path, char *buf, size_t size) -{ - MAKE_REQ("readlink", { - cJSON_AddStringToObject(req, "path", path); - }, { - cJSON *resp_buf_item = cJSON_GetObjectItemCaseSensitive(resp, "buf"); - // FIXME: fix - char *resp_buf = cJSON_GetStringValue(resp_buf_item); - size_t resp_buf_len = strlen(resp_buf); - size = resp_buf_len < size ? resp_buf_len : size; - - memcpy(buf, resp_buf, size); - - ret = size; - }); -} - -static int -hello_open(const char *path, struct fuse_file_info *fi) -{ - MAKE_REQ("open", { - cJSON_AddStringToObject(req, "path", path); - cJSON_AddNumberToObject(req, "flags", fi->flags); - }, { - cJSON *fh_item = cJSON_GetObjectItemCaseSensitive(resp, "fh"); - if (fh_item) fi->fh = fh_item->valueint; - - ret = 0; - }); -} - -static int -hello_read(const char *path, char *buf, size_t size, off_t offset, - struct fuse_file_info *fi) -{ - MAKE_REQ("read", { - cJSON_AddStringToObject(req, "path", path); - cJSON_AddNumberToObject(req, "size", size); - cJSON_AddNumberToObject(req, "offset", offset); - - cJSON_AddNumberToObject(req, "fh", fi->fh); - cJSON_AddNumberToObject(req, "flags", fi->flags); - }, { - cJSON *resp_buf_item = cJSON_GetObjectItemCaseSensitive(resp, "buf"); - if (!resp_buf_item) return -EIO; - - char *resp_buf = cJSON_GetStringValue(resp_buf_item); - if (!resp_buf) return -EIO; - - size_t resp_buf_len = strlen(resp_buf); - size = resp_buf_len < size ? resp_buf_len : size; - - memcpy(buf, resp_buf, size); - - ret = size; - }); -} - -static int hello_release(const char *path, struct fuse_file_info *fi) { - MAKE_REQ("release", { - cJSON_AddStringToObject(req, "path", path); - cJSON_AddNumberToObject(req, "fh", fi->fh); - }, { - ret = 0; - }); -} - -static int -hello_opendir(const char *path, struct fuse_file_info *fi) -{ - MAKE_REQ("opendir", { - cJSON_AddStringToObject(req, "path", path); - cJSON_AddNumberToObject(req, "flags", fi->flags); - }, { - cJSON *fh_item = cJSON_GetObjectItemCaseSensitive(resp, "fh"); - if (fh_item) fi->fh = fh_item->valueint; - - ret = 0; - }); -} - -static int -hello_readdir(const char *path, void *buf, fuse_fill_dir_t filler, - off_t offset, struct fuse_file_info *fi) -{ - // send {op: "readdir", path} to the websocket handler - MAKE_REQ("readdir", { - cJSON_AddStringToObject(req, "path", path); - }, { - cJSON *entries = cJSON_GetObjectItemCaseSensitive(resp, "entries"); - cJSON *entry; - cJSON_ArrayForEach(entry, entries) { - filler(buf, cJSON_GetStringValue(entry), NULL, 0); - } - - ret = 0; - }); -} - -static int -hello_releasedir(const char *path, struct fuse_file_info *fi) -{ - MAKE_REQ("releasedir", { - cJSON_AddStringToObject(req, "path", path); - cJSON_AddNumberToObject(req, "fh", fi->fh); - }, { - ret = 0; - }); -} - -static struct fuse_operations hello_filesystem_operations = { - .getattr = hello_getattr, /* To provide size, permissions, etc. */ - .readlink = hello_readlink, - .open = hello_open, /* To enforce read-only access. */ - .read = hello_read, /* To provide file content. */ - .release = hello_release, - - .opendir = hello_opendir, - .readdir = hello_readdir, /* To provide directory listing. */ - .releasedir = hello_releasedir -}; - -static int -dispatch(struct wby_con *connection, void *userdata) -{ - return 1; -} - -static int -websocket_connect(struct wby_con *connection, void *userdata) -{ - /* connection bound userdata */ - connection->user_data = NULL; - if (0 == strcmp(connection->request.uri, "/")) - return 0; - return 1; -} - -static void -websocket_connected(struct wby_con *connection, void *userdata) -{ - printf("WebSocket connected\n"); - con = connection; -} - -static int -websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void *userdata) -{ - unsigned char data[131072] = {0}; - - int i = 0; - DEBUG("WebSocket frame incoming\n"); - DEBUG(" Frame OpCode: %d\n", frame->opcode); - DEBUG(" Final frame?: %s\n", (frame->flags & WBY_WSF_FIN) ? "yes" : "no"); - DEBUG(" Masked? : %s\n", (frame->flags & WBY_WSF_MASKED) ? "yes" : "no"); - DEBUG(" Data Length : %d\n", (int) frame->payload_length); - - if ((unsigned long) frame->payload_length > sizeof(data)) { - printf("Data too long!\n"); - exit(1); - } - - while (i < frame->payload_length) { - unsigned char buffer[16]; - int remain = frame->payload_length - i; - size_t read_size = remain > (int) sizeof buffer ? sizeof buffer : (size_t) remain; - size_t k; - - DEBUG("%08x ", (int) i); - if (0 != wby_read(connection, buffer, read_size)) - break; - for (k = 0; k < read_size; ++k) - DEBUG("%02x ", buffer[k]); - for (k = read_size; k < 16; ++k) - DEBUG(" "); - DEBUG(" | "); - for (k = 0; k < read_size; ++k) - DEBUG("%c", isprint(buffer[k]) ? buffer[k] : '?'); - DEBUG("\n"); - for (k = 0; k < read_size; ++k) - data[i + k] = buffer[k]; - i += (int)read_size; - } - - if ((int) strlen((const char *) data) != frame->payload_length) { - printf("Null in data! [%s]\n", data); - } - - // Will be freed at the receiver end. - cJSON *resp = cJSON_Parse((const char *) data); - - cJSON *id_item = cJSON_GetObjectItemCaseSensitive(resp, "id"); - if (id_item == NULL) { - printf("No id in response!\n"); - exit(1); - } - request_id id = id_item->valueint; - - pthread_mutex_lock(&queue_mutex); - - if (queue[id].state != RECEIVE_RESPONSE) { - printf("Got response to request in wrong state!\n"); - exit(1); - } - queue[id].state = HANDLE_RESPONSE; - queue[id].response = resp; - - pthread_cond_signal(&queue_cv); - pthread_mutex_unlock(&queue_mutex); - - return 0; -} - -static void -websocket_closed(struct wby_con *connection, void *userdata) -{ - printf("WebSocket closed\n"); - con = NULL; -} - -static void -test_log(const char* text) -{ - DEBUG("[debug] %s\n", text); -} - -int check_io_demand() { - if (con == NULL) return 1; - - for (request_id id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) { - if (queue[id].state == SEND_REQUEST || queue[id].state == RECEIVE_RESPONSE) { - return 1; - } - } - return 0; -} -void await_io_demand() { - pthread_mutex_lock(&queue_mutex); - while (!check_io_demand()) { - pthread_cond_wait(&queue_cv, &queue_mutex); - } - pthread_mutex_unlock(&queue_mutex); -} - -void *websocket_main(void *threadid) -{ - void *memory = NULL; - wby_size needed_memory = 0; - - struct wby_config config; - memset(&config, 0, sizeof config); - config.userdata = NULL; - config.address = "127.0.0.1"; - config.port = 8888; - config.connection_max = 4; - config.request_buffer_size = 2048; - config.io_buffer_size = 8192; - config.log = test_log; - config.dispatch = dispatch; - config.ws_connect = websocket_connect; - config.ws_connected = websocket_connected; - config.ws_frame = websocket_frame; - config.ws_closed = websocket_closed; - - wby_init(&server, &config, &needed_memory); - memory = calloc(needed_memory, 1); - wby_start(&server, memory); - - printf("Awaiting WebSocket connection from Chrome extension.\n"); - for (;;) { - await_io_demand(); - - send_any_enqueued_requests(); - - wby_update(&server); - } - - wby_stop(&server); - free(memory); -#if defined(_WIN32) - WSACleanup(); -#endif - return 0; -} - -int -main(int argc, char **argv) -{ - pthread_t websocket_thread; - pthread_create(&websocket_thread, NULL, websocket_main, NULL); - return fuse_main(argc, argv, &hello_filesystem_operations, NULL); -} diff --git a/fs/tabfs.c b/fs/tabfs.c new file mode 100644 index 0000000..03cfeea --- /dev/null +++ b/fs/tabfs.c @@ -0,0 +1,210 @@ +#include +#include +#include +#include +#include +#include + +#include "cJSON/cJSON.h" +#include "cJSON/cJSON.c" + +#include "common.h" +#include "ws.h" + +static cJSON *send_request_then_await_response(cJSON *req) { + char *request_data = cJSON_Print(req); // Will be freed on ws side. + common_send_tabfs_to_ws(request_data); + + char *response_data = common_receive_ws_to_tabfs(); + if (response_data == NULL) { + // Connection is dead. + return cJSON_Parse("{ \"error\": 5 }"); + } + + cJSON *resp = cJSON_Parse((const char *) response_data); + free(response_data); + + return resp; +} + +#define MAKE_REQ(op, req_body, resp_handler) \ + do { \ + int ret = -1; \ + cJSON *req = NULL; \ + cJSON *resp = NULL; \ + \ + req = cJSON_CreateObject(); \ + cJSON_AddStringToObject(req, "op", op); \ + req_body \ + \ + resp = send_request_then_await_response(req); \ + \ + cJSON *error_item = cJSON_GetObjectItemCaseSensitive(resp, "error"); \ + if (error_item) { \ + ret = -error_item->valueint; \ + if (ret != 0) goto done; \ + } \ + \ + ret = -1; \ + resp_handler \ + \ +done: \ + if (req != NULL) cJSON_Delete(req); \ + if (resp != NULL) cJSON_Delete(resp); \ + return ret; \ + } while (0) + +#define JSON_GET_PROP_INT(lvalue, key) \ + do { \ + lvalue = cJSON_GetObjectItemCaseSensitive(resp, key)->valueint; \ + } while (0) + +static int +tabfs_getattr(const char *path, struct stat *stbuf) +{ + memset(stbuf, 0, sizeof(struct stat)); + + MAKE_REQ("getattr", { + cJSON_AddStringToObject(req, "path", path); + }, { + JSON_GET_PROP_INT(stbuf->st_mode, "st_mode"); + JSON_GET_PROP_INT(stbuf->st_nlink, "st_nlink"); + JSON_GET_PROP_INT(stbuf->st_size, "st_size"); + + ret = 0; + }); +} + +static int +tabfs_readlink(const char *path, char *buf, size_t size) +{ + MAKE_REQ("readlink", { + cJSON_AddStringToObject(req, "path", path); + }, { + cJSON *resp_buf_item = cJSON_GetObjectItemCaseSensitive(resp, "buf"); + // FIXME: fix + char *resp_buf = cJSON_GetStringValue(resp_buf_item); + size_t resp_buf_len = strlen(resp_buf); + size = resp_buf_len < size ? resp_buf_len : size; + + memcpy(buf, resp_buf, size); + + ret = size; + }); +} + +static int +tabfs_open(const char *path, struct fuse_file_info *fi) +{ + MAKE_REQ("open", { + cJSON_AddStringToObject(req, "path", path); + cJSON_AddNumberToObject(req, "flags", fi->flags); + }, { + cJSON *fh_item = cJSON_GetObjectItemCaseSensitive(resp, "fh"); + if (fh_item) fi->fh = fh_item->valueint; + + ret = 0; + }); +} + +static int +tabfs_read(const char *path, char *buf, size_t size, off_t offset, + struct fuse_file_info *fi) +{ + MAKE_REQ("read", { + cJSON_AddStringToObject(req, "path", path); + cJSON_AddNumberToObject(req, "size", size); + cJSON_AddNumberToObject(req, "offset", offset); + + cJSON_AddNumberToObject(req, "fh", fi->fh); + cJSON_AddNumberToObject(req, "flags", fi->flags); + }, { + cJSON *resp_buf_item = cJSON_GetObjectItemCaseSensitive(resp, "buf"); + if (!resp_buf_item) return -EIO; + + char *resp_buf = cJSON_GetStringValue(resp_buf_item); + if (!resp_buf) return -EIO; + + size_t resp_buf_len = strlen(resp_buf); + size = resp_buf_len < size ? resp_buf_len : size; + + memcpy(buf, resp_buf, size); + + ret = size; + }); +} + +static int tabfs_release(const char *path, struct fuse_file_info *fi) { + MAKE_REQ("release", { + cJSON_AddStringToObject(req, "path", path); + cJSON_AddNumberToObject(req, "fh", fi->fh); + }, { + ret = 0; + }); +} + +static int +tabfs_opendir(const char *path, struct fuse_file_info *fi) +{ + MAKE_REQ("opendir", { + cJSON_AddStringToObject(req, "path", path); + cJSON_AddNumberToObject(req, "flags", fi->flags); + }, { + cJSON *fh_item = cJSON_GetObjectItemCaseSensitive(resp, "fh"); + if (fh_item) fi->fh = fh_item->valueint; + + ret = 0; + }); +} + +static int +tabfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, + off_t offset, struct fuse_file_info *fi) +{ + // send {op: "readdir", path} to the websocket handler + MAKE_REQ("readdir", { + cJSON_AddStringToObject(req, "path", path); + }, { + cJSON *entries = cJSON_GetObjectItemCaseSensitive(resp, "entries"); + cJSON *entry; + cJSON_ArrayForEach(entry, entries) { + filler(buf, cJSON_GetStringValue(entry), NULL, 0); + } + + ret = 0; + }); +} + +static int +tabfs_releasedir(const char *path, struct fuse_file_info *fi) +{ + MAKE_REQ("releasedir", { + cJSON_AddStringToObject(req, "path", path); + cJSON_AddNumberToObject(req, "fh", fi->fh); + }, { + ret = 0; + }); +} + +static struct fuse_operations tabfs_filesystem_operations = { + .getattr = tabfs_getattr, /* To provide size, permissions, etc. */ + .readlink = tabfs_readlink, + .open = tabfs_open, /* To enforce read-only access. */ + .read = tabfs_read, /* To provide file content. */ + .release = tabfs_release, + + .opendir = tabfs_opendir, + .readdir = tabfs_readdir, /* To provide directory listing. */ + .releasedir = tabfs_releasedir +}; + +int +main(int argc, char **argv) +{ + common_init(); + + pthread_t websocket_thread; + pthread_create(&websocket_thread, NULL, websocket_main, NULL); + + return fuse_main(argc, argv, &tabfs_filesystem_operations, NULL); +} diff --git a/fs/ws.c b/fs/ws.c new file mode 100644 index 0000000..c7cb3ec --- /dev/null +++ b/fs/ws.c @@ -0,0 +1,239 @@ +// WebSocket server. +// Side thread that gets spawned. + +#define WBY_STATIC +#define WBY_IMPLEMENTATION +#define WBY_USE_FIXED_TYPES +#define WBY_USE_ASSERT +#include "mmx/web.h" + +#include "common.h" + +static struct wby_server server; +static struct wby_con *con = NULL; + +static int fill_fd_set_with_ws_sockets(fd_set *read_fds, fd_set *write_fds, fd_set *except_fds) { + // Based on web.h:1936 (start of wby_update) + + int max_fd = 0; + FD_SET(server.socket, read_fds); + FD_SET(server.socket, except_fds); + max_fd = WBY_SOCK(server.socket); + + if (con == NULL) { return max_fd; } + + struct wby_connection *conn = (struct wby_connection *) con; + wby_socket socket = WBY_SOCK(conn->socket); + FD_SET(socket, read_fds); + FD_SET(socket, except_fds); + if (conn->state == WBY_CON_STATE_SEND_CONTINUE) { + FD_SET(socket, write_fds); + } + + if (socket > max_fd) { max_fd = socket; } + return max_fd; +} + +static void receive_tabfs_request_then_send_to_browser() { + char *request_data = common_receive_tabfs_to_ws(fill_fd_set_with_ws_sockets); + if (request_data == NULL) { + return; + } + + if (con == NULL) { + common_send_ws_to_tabfs(NULL); + return; + } + + wby_frame_begin(con, WBY_WSOP_TEXT_FRAME); + wby_write(con, request_data, strlen(request_data)); + wby_frame_end(con); + + /* pthread_mutex_lock(&queue_mutex); */ + + /* if (con == NULL) goto done; */ + + /* for (request_id id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) { */ + /* if (queue[id].state == SEND_REQUEST) { */ + /* char *request = queue[id].request; */ + + /* wby_frame_begin(con, WBY_WSOP_TEXT_FRAME); */ + /* wby_write(con, request, strlen(request)); */ + /* wby_frame_end(con); */ + + /* queue[id].state = RECEIVE_RESPONSE; */ + /* free(request); */ + /* queue[id].request = NULL; */ + /* } */ + /* } */ + + /* done: */ + /* pthread_mutex_unlock(&queue_mutex); */ +} + +static int +dispatch(struct wby_con *connection, void *userdata) +{ + return 1; +} + +static int +websocket_connect(struct wby_con *connection, void *userdata) +{ + /* connection bound userdata */ + connection->user_data = NULL; + if (0 == strcmp(connection->request.uri, "/")) + return 0; + return 1; +} + +static void +websocket_connected(struct wby_con *connection, void *userdata) +{ + printf("WebSocket connected\n"); + con = connection; +} + +#define MAX_DATA_LENGTH 131072 + +static int +websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void *userdata) +{ + unsigned char *data = calloc(1, MAX_DATA_LENGTH); // Will be freed at receiver (tabfs). + + int i = 0; + DEBUG("WebSocket frame incoming\n"); + DEBUG(" Frame OpCode: %d\n", frame->opcode); + DEBUG(" Final frame?: %s\n", (frame->flags & WBY_WSF_FIN) ? "yes" : "no"); + DEBUG(" Masked? : %s\n", (frame->flags & WBY_WSF_MASKED) ? "yes" : "no"); + DEBUG(" Data Length : %d\n", (int) frame->payload_length); + + if ((unsigned long) frame->payload_length > MAX_DATA_LENGTH) { + printf("Data too long!\n"); + exit(1); + } + + while (i < frame->payload_length) { + unsigned char buffer[16]; + int remain = frame->payload_length - i; + size_t read_size = remain > (int) sizeof buffer ? sizeof buffer : (size_t) remain; + size_t k; + + DEBUG("%08x ", (int) i); + if (0 != wby_read(connection, buffer, read_size)) + break; + for (k = 0; k < read_size; ++k) + DEBUG("%02x ", buffer[k]); + for (k = read_size; k < 16; ++k) + DEBUG(" "); + DEBUG(" | "); + for (k = 0; k < read_size; ++k) + DEBUG("%c", isprint(buffer[k]) ? buffer[k] : '?'); + DEBUG("\n"); + for (k = 0; k < read_size; ++k) + data[i + k] = buffer[k]; + i += (int)read_size; + } + + if ((int) strlen((const char *) data) != frame->payload_length) { + printf("Null in data! [%s]\n", data); + } + + common_send_ws_to_tabfs((char *) data); + + // Will be freed at the receiver end. + /* cJSON *resp = cJSON_Parse((const char *) data); */ + + /* cJSON *id_item = cJSON_GetObjectItemCaseSensitive(resp, "id"); */ + /* if (id_item == NULL) { */ + /* printf("No id in response!\n"); */ + /* exit(1); */ + /* } */ + /* request_id id = id_item->valueint; */ + + /* pthread_mutex_lock(&queue_mutex); */ + + /* if (queue[id].state != RECEIVE_RESPONSE) { */ + /* printf("Got response to request in wrong state!\n"); */ + /* exit(1); */ + /* } */ + /* queue[id].state = HANDLE_RESPONSE; */ + /* queue[id].response = resp; */ + + /* pthread_cond_signal(&queue_cv); */ + /* pthread_mutex_unlock(&queue_mutex); */ + + return 0; +} + +static void +websocket_closed(struct wby_con *connection, void *userdata) +{ + printf("WebSocket closed\n"); + + if (con == connection) con = NULL; +} + +static void +test_log(const char* text) +{ + DEBUG("[debug] %s\n", text); +} + +void await_io_demand_or_timeout() { + /* pthread_mutex_lock(&queue_mutex); */ + + /* struct timeval now; */ + /* gettimeofday(&now, NULL); */ + + /* struct timespec tsp; */ + /* tsp.tv_sec = now.tv_sec; */ + /* tsp.tv_nsec = now.tv_usec * 1000; */ + + /* tsp.tv_nsec += 200 * 1000000; // wait for 200ms max */ + /* pthread_cond_timedwait(&queue_cv, &queue_mutex, &tsp); */ + + /* pthread_mutex_unlock(&queue_mutex); */ +} + +void *websocket_main(void *threadid) +{ + void *memory = NULL; + wby_size needed_memory = 0; + + struct wby_config config; + memset(&config, 0, sizeof config); + config.userdata = NULL; + config.address = "127.0.0.1"; + config.port = 8888; + config.connection_max = 4; + config.request_buffer_size = 2048; + config.io_buffer_size = 8192; + config.log = test_log; + config.dispatch = dispatch; + config.ws_connect = websocket_connect; + config.ws_connected = websocket_connected; + config.ws_frame = websocket_frame; + config.ws_closed = websocket_closed; + + wby_init(&server, &config, &needed_memory); + memory = calloc(needed_memory, 1); + wby_start(&server, memory); + + printf("Awaiting WebSocket connection from Chrome extension.\n"); + for (;;) { + // FIXME: makes reconnect impossible. :< + /* await_io_demand_or_timeout(); */ + + receive_tabfs_request_then_send_to_browser(); + + wby_update(&server); // We receive stuff during this phase. + } + + wby_stop(&server); + free(memory); +#if defined(_WIN32) + WSACleanup(); +#endif + return 0; +} diff --git a/fs/ws.h b/fs/ws.h new file mode 100644 index 0000000..fad2c63 --- /dev/null +++ b/fs/ws.h @@ -0,0 +1,6 @@ +#ifndef WS_H +#define WS_H + +void *websocket_main(void *threadid); + +#endif