Multithreaded. Is this gonna help?

Only tested with single-thread mode still on so far.
This commit is contained in:
Omar Rizwan 2018-11-24 00:33:04 -08:00
parent 528467a55b
commit c8fad64066
2 changed files with 96 additions and 44 deletions

View file

@ -160,6 +160,7 @@ ws.onmessage = async function(event) {
const req = JSON.parse(event.data);
let response = { op: req.op, error: unix.EIO };
console.time(req.op + ':' + req.path);
try {
if (req.op === 'getattr') {
response = {
@ -195,13 +196,14 @@ ws.onmessage = async function(event) {
op: 'release'
};
}
} catch (e) {
response = {
op: req.op,
error: e instanceof UnixError ? e.error : unix.EIO
}
}
console.timeEnd(req.op + ':' + req.path);
response.id = req.id;
ws.send(JSON.stringify(response));
};

View file

@ -17,52 +17,86 @@
struct wby_server server;
struct wby_con *con = NULL;
pthread_mutex_t request_data_mutex = PTHREAD_MUTEX_INITIALIZER;
char *request_data = NULL;
pthread_cond_t queue_cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t response_cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t response_mutex = PTHREAD_MUTEX_INITIALIZER;
cJSON *response = NULL;
enum request_response_state {
EMPTY = 0,
SEND_REQUEST,
RECEIVE_RESPONSE,
HANDLE_RESPONSE
};
static const char *file_path = "/hello.txt";
static const char file_content[] = "Hello World!\n";
static const size_t file_size = sizeof(file_content)/sizeof(char) - 1;
struct request_response {
enum request_response_state state;
char *request;
cJSON *response;
};
static void dispatch_send_req(cJSON *req) {
pthread_mutex_lock(&request_data_mutex);
#define REQUEST_RESPONSE_QUEUE_SIZE 128
typedef int request_id;
struct request_response queue[REQUEST_RESPONSE_QUEUE_SIZE];
request_data = cJSON_Print(req);
printf("%s\n", request_data);
static request_id enqueue_request(cJSON *req) {
pthread_mutex_lock(&queue_mutex);
pthread_mutex_unlock(&request_data_mutex);
// Look for the first free slot.
request_id id;
for (id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) {
if (queue[id].state == EMPTY) break;
}
if (id >= REQUEST_RESPONSE_QUEUE_SIZE) {
printf("Request-response queue is full!\n");
exit(1);
}
cJSON_AddNumberToObject(req, "id", id);
queue[id].state = SEND_REQUEST;
queue[id].request = cJSON_Print(req);
queue[id].response = NULL;
printf("%s\n", queue[id].request);
pthread_mutex_unlock(&queue_mutex);
return id;
}
void send_req_if_any() {
pthread_mutex_lock(&request_data_mutex);
void send_any_enqueued_requests() {
pthread_mutex_lock(&queue_mutex);
if (con == NULL || request_data == NULL) goto done;
if (con == NULL) goto done;
wby_frame_begin(con, WBY_WSOP_TEXT_FRAME);
wby_write(con, request_data, strlen(request_data));
wby_frame_end(con);
for (request_id id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) {
if (queue[id].state == SEND_REQUEST) {
char *request = queue[id].request;
free(request_data);
request_data = NULL;
wby_frame_begin(con, WBY_WSOP_TEXT_FRAME);
wby_write(con, request, strlen(request));
wby_frame_end(con);
done:
pthread_mutex_unlock(&request_data_mutex);
}
static cJSON *await_response() {
pthread_mutex_lock(&response_mutex);
response = NULL;
while (response == NULL) {
pthread_cond_wait(&response_cv, &response_mutex);
queue[id].state = RECEIVE_RESPONSE;
free(request);
queue[id].request = NULL;
}
}
cJSON *resp = response;
pthread_mutex_unlock(&response_mutex);
done:
pthread_mutex_unlock(&queue_mutex);
}
static cJSON *await_response(request_id id) {
pthread_mutex_lock(&queue_mutex);
while (queue[id].state != HANDLE_RESPONSE) {
pthread_cond_wait(&queue_cv, &queue_mutex);
}
cJSON *resp = queue[id].response;
queue[id].state = EMPTY;
queue[id].response = NULL;
pthread_mutex_unlock(&queue_mutex);
return resp;
}
@ -73,18 +107,17 @@ static cJSON *await_response() {
cJSON *req = NULL; \
cJSON *resp = NULL; \
\
pthread_mutex_lock(&request_data_mutex); \
pthread_mutex_lock(&queue_mutex); \
int disconnected = (con == NULL); \
pthread_mutex_unlock(&request_data_mutex); \
pthread_mutex_unlock(&queue_mutex); \
if (disconnected) { ret = -EIO; goto done; } \
\
req = cJSON_CreateObject(); \
cJSON_AddStringToObject(req, "op", op); \
req_body \
\
dispatch_send_req(req); \
\
resp = await_response();\
request_id id = enqueue_request(req); \
resp = await_response(id); \
\
cJSON *error_item = cJSON_GetObjectItemCaseSensitive(resp, "error"); \
if (error_item) { \
@ -269,10 +302,27 @@ websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void
printf("Null in data! [%s]\n", data);
}
pthread_mutex_lock(&response_mutex);
response = cJSON_Parse((const char *) data);
pthread_cond_signal(&response_cv);
pthread_mutex_unlock(&response_mutex);
// Will be freed at the receiver end.
cJSON *resp = cJSON_Parse((const char *) data);
cJSON *id_item = cJSON_GetObjectItemCaseSensitive(resp, "id");
if (id_item == NULL) {
printf("No id in response!\n");
exit(1);
}
request_id id = id_item->valueint;
pthread_mutex_lock(&queue_mutex);
printf("got resp");
if (queue[id].state != RECEIVE_RESPONSE) {
printf("Got response to request in wrong state!\n");
exit(1);
}
queue[id].state = HANDLE_RESPONSE;
queue[id].response = resp;
pthread_cond_signal(&queue_cv);
pthread_mutex_unlock(&queue_mutex);
return 0;
}
@ -316,7 +366,7 @@ void *websocket_main(void *threadid)
printf("Awaiting WebSocket connection from Chrome extension.\n");
for (;;) {
send_req_if_any();
send_any_enqueued_requests();
wby_update(&server);
}