From 8e4e7a2a959d0ad0e615b1fabe8b7e00e4deef2e Mon Sep 17 00:00:00 2001 From: Omar Rizwan Date: Tue, 13 Nov 2018 00:58:12 -0800 Subject: [PATCH] Starting to multithread the server. --- fs/hello.c | 108 +++++++++++++++++++++++++++-------------------------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/fs/hello.c b/fs/hello.c index bf0fe89..96d62a4 100644 --- a/fs/hello.c +++ b/fs/hello.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #define WBY_STATIC @@ -13,14 +14,6 @@ #include "cJSON/cJSON.h" #include "cJSON/cJSON.c" -#define MAX_WSCONN 8 -struct server_state { - int quit; - unsigned frame_counter; - struct wby_con *conn[MAX_WSCONN]; - int conn_count; -}; - struct wby_server server; struct wby_con *con; @@ -47,6 +40,9 @@ struct response { } body; }; +pthread_cond_t response_cv = PTHREAD_COND_INITIALIZER; +pthread_mutex_t response_mutex = PTHREAD_MUTEX_INITIALIZER; + struct response response = (struct response) { .op = NONE }; static const char *file_path = "/hello.txt"; @@ -73,11 +69,18 @@ static void send_req(cJSON *req) { cJSON_Delete(req); \ } while (0) -static void await_response(enum opcode op) { +static struct response await_response(enum opcode op) { + pthread_mutex_lock(&response_mutex); + memset(&response, 0, sizeof response); - do { - wby_update(&server); - } while (response.op == NONE); + while (response.op == NONE) { + pthread_cond_wait(&response_cv, &response_mutex); + } + + struct response ret = response; + pthread_mutex_unlock(&response_mutex); + + return ret; } static int @@ -90,15 +93,15 @@ hello_getattr(const char *path, struct stat *stbuf) cJSON_AddStringToObject(req, "path", path); }); - await_response(GETATTR); - if (response.error != 0) { - printf("error re getattr(%s): %d\n", path, response.error); - return -response.error; + struct response resp = await_response(GETATTR); + if (resp.error != 0) { + printf("error re getattr(%s): %d\n", path, resp.error); + return -resp.error; } - stbuf->st_mode = response.body.getattr.st_mode; - stbuf->st_nlink = response.body.getattr.st_nlink; - stbuf->st_size = response.body.getattr.st_size; + stbuf->st_mode = resp.body.getattr.st_mode; + stbuf->st_nlink = resp.body.getattr.st_nlink; + stbuf->st_size = resp.body.getattr.st_size; printf("returning re getattr(%s)\n", path); /* if (strcmp(path, "/") == 0) { /\* The root directory of our file system. *\/ */ /* stbuf->st_mode = S_IFDIR | 0755; */ @@ -110,6 +113,7 @@ hello_getattr(const char *path, struct stat *stbuf) /* } else /\* We reject everything else. *\/ */ /* return -ENOENT; */ + return 0; } @@ -137,9 +141,10 @@ hello_readdir(const char *path, void *buf, fuse_fill_dir_t filler, }); printf("awaiting response to readdir(%s)\n", path); - await_response(READDIR); - struct readdir *readdir = &response.body.readdir; - printf("response: %d files\n", readdir->num_entries); + struct response resp = await_response(READDIR); + + struct readdir *readdir = &resp.body.readdir; + printf("response: %d files\n", (int) readdir->num_entries); for (size_t i = 0; i < readdir->num_entries; ++i) { filler(buf, readdir->entries[i], NULL, 0); @@ -183,18 +188,16 @@ dispatch(struct wby_con *connection, void *userdata) static int websocket_connect(struct wby_con *connection, void *userdata) { - struct server_state *state = (struct server_state*)userdata; /* connection bound userdata */ connection->user_data = NULL; - if (0 == strcmp(connection->request.uri, "/") && state->conn_count < MAX_WSCONN) + if (0 == strcmp(connection->request.uri, "/")) return 0; - else return 1; + return 1; } static void websocket_connected(struct wby_con *connection, void *userdata) { - struct server_state *state = (struct server_state*)userdata; printf("WebSocket connected\n"); con = connection; } @@ -241,6 +244,8 @@ websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void printf("Null in data! [%s]\n", data); } + pthread_mutex_lock(&response_mutex); + cJSON *ret = cJSON_Parse((const char *) data); cJSON *op_item = cJSON_GetObjectItemCaseSensitive(ret, "op"); @@ -275,23 +280,16 @@ websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void done: if (ret) cJSON_Delete(ret); + + pthread_cond_signal(&response_cv); + pthread_mutex_unlock(&response_mutex); return 0; } static void websocket_closed(struct wby_con *connection, void *userdata) { - int i; - struct server_state *state = (struct server_state*)userdata; printf("WebSocket closed\n"); - for (i = 0; i < state->conn_count; i++) { - if (state->conn[i] == connection) { - int remain = state->conn_count - i; - memmove(state->conn + i, state->conn + i + 1, (size_t)remain * sizeof(struct wby_con*)); - --state->conn_count; - break; - } - } } static void @@ -300,19 +298,17 @@ test_log(const char* text) printf("[debug] %s\n", text); } -int -main(int argc, char **argv) +void *websocket_main(void *threadid) { - void *memory = NULL; + void *memory = NULL; wby_size needed_memory = 0; - struct server_state state; struct wby_config config; memset(&config, 0, sizeof config); - config.userdata = &state; + config.userdata = NULL; config.address = "127.0.0.1"; config.port = 8888; - config.connection_max = 1; + config.connection_max = 4; config.request_buffer_size = 2048; config.io_buffer_size = 8192; config.log = test_log; @@ -326,18 +322,24 @@ main(int argc, char **argv) memory = calloc(needed_memory, 1); wby_start(&server, memory); - memset(&state, 0, sizeof state); - printf("Awaiting WebSocket connection from Chrome extension.\n"); - while (con == NULL) { + for (;;) { wby_update(&server); } - return fuse_main(argc, argv, &hello_filesystem_operations, NULL); -/* wby_stop(&server); */ -/* free(memory); */ -/* #if defined(_WIN32) */ -/* WSACleanup(); */ -/* #endif */ -/* return 0; */ - // + + wby_stop(&server); + free(memory); +#if defined(_WIN32) + WSACleanup(); +#endif + return 0; +} + +int +main(int argc, char **argv) +{ + pthread_t websocket_thread; + pthread_create(&websocket_thread, NULL, websocket_main, NULL); + return fuse_main(argc, argv, &hello_filesystem_operations, NULL); + }