TabFS/fs/ws.c
Omar Rizwan 784ec83696 Rewrite and refactor C half. No more shared memory! It's fast!
Three C modules:
- tabfs (main thread; talks to FUSE)
- common (tabfs<->ws communication helpers)
- ws (side thread; talks to browser over WebSocket)

It's single-threaded, but I don't think that matters anyway.
2019-02-25 13:02:25 -08:00

240 lines
6.3 KiB
C

// WebSocket server.
// Side thread that gets spawned.
#define WBY_STATIC
#define WBY_IMPLEMENTATION
#define WBY_USE_FIXED_TYPES
#define WBY_USE_ASSERT
#include "mmx/web.h"
#include "common.h"
static struct wby_server server;
static struct wby_con *con = NULL;
static int fill_fd_set_with_ws_sockets(fd_set *read_fds, fd_set *write_fds, fd_set *except_fds) {
// Based on web.h:1936 (start of wby_update)
int max_fd = 0;
FD_SET(server.socket, read_fds);
FD_SET(server.socket, except_fds);
max_fd = WBY_SOCK(server.socket);
if (con == NULL) { return max_fd; }
struct wby_connection *conn = (struct wby_connection *) con;
wby_socket socket = WBY_SOCK(conn->socket);
FD_SET(socket, read_fds);
FD_SET(socket, except_fds);
if (conn->state == WBY_CON_STATE_SEND_CONTINUE) {
FD_SET(socket, write_fds);
}
if (socket > max_fd) { max_fd = socket; }
return max_fd;
}
static void receive_tabfs_request_then_send_to_browser() {
char *request_data = common_receive_tabfs_to_ws(fill_fd_set_with_ws_sockets);
if (request_data == NULL) {
return;
}
if (con == NULL) {
common_send_ws_to_tabfs(NULL);
return;
}
wby_frame_begin(con, WBY_WSOP_TEXT_FRAME);
wby_write(con, request_data, strlen(request_data));
wby_frame_end(con);
/* pthread_mutex_lock(&queue_mutex); */
/* if (con == NULL) goto done; */
/* for (request_id id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) { */
/* if (queue[id].state == SEND_REQUEST) { */
/* char *request = queue[id].request; */
/* wby_frame_begin(con, WBY_WSOP_TEXT_FRAME); */
/* wby_write(con, request, strlen(request)); */
/* wby_frame_end(con); */
/* queue[id].state = RECEIVE_RESPONSE; */
/* free(request); */
/* queue[id].request = NULL; */
/* } */
/* } */
/* done: */
/* pthread_mutex_unlock(&queue_mutex); */
}
static int
dispatch(struct wby_con *connection, void *userdata)
{
return 1;
}
static int
websocket_connect(struct wby_con *connection, void *userdata)
{
/* connection bound userdata */
connection->user_data = NULL;
if (0 == strcmp(connection->request.uri, "/"))
return 0;
return 1;
}
static void
websocket_connected(struct wby_con *connection, void *userdata)
{
printf("WebSocket connected\n");
con = connection;
}
#define MAX_DATA_LENGTH 131072
static int
websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void *userdata)
{
unsigned char *data = calloc(1, MAX_DATA_LENGTH); // Will be freed at receiver (tabfs).
int i = 0;
DEBUG("WebSocket frame incoming\n");
DEBUG(" Frame OpCode: %d\n", frame->opcode);
DEBUG(" Final frame?: %s\n", (frame->flags & WBY_WSF_FIN) ? "yes" : "no");
DEBUG(" Masked? : %s\n", (frame->flags & WBY_WSF_MASKED) ? "yes" : "no");
DEBUG(" Data Length : %d\n", (int) frame->payload_length);
if ((unsigned long) frame->payload_length > MAX_DATA_LENGTH) {
printf("Data too long!\n");
exit(1);
}
while (i < frame->payload_length) {
unsigned char buffer[16];
int remain = frame->payload_length - i;
size_t read_size = remain > (int) sizeof buffer ? sizeof buffer : (size_t) remain;
size_t k;
DEBUG("%08x ", (int) i);
if (0 != wby_read(connection, buffer, read_size))
break;
for (k = 0; k < read_size; ++k)
DEBUG("%02x ", buffer[k]);
for (k = read_size; k < 16; ++k)
DEBUG(" ");
DEBUG(" | ");
for (k = 0; k < read_size; ++k)
DEBUG("%c", isprint(buffer[k]) ? buffer[k] : '?');
DEBUG("\n");
for (k = 0; k < read_size; ++k)
data[i + k] = buffer[k];
i += (int)read_size;
}
if ((int) strlen((const char *) data) != frame->payload_length) {
printf("Null in data! [%s]\n", data);
}
common_send_ws_to_tabfs((char *) data);
// Will be freed at the receiver end.
/* cJSON *resp = cJSON_Parse((const char *) data); */
/* cJSON *id_item = cJSON_GetObjectItemCaseSensitive(resp, "id"); */
/* if (id_item == NULL) { */
/* printf("No id in response!\n"); */
/* exit(1); */
/* } */
/* request_id id = id_item->valueint; */
/* pthread_mutex_lock(&queue_mutex); */
/* if (queue[id].state != RECEIVE_RESPONSE) { */
/* printf("Got response to request in wrong state!\n"); */
/* exit(1); */
/* } */
/* queue[id].state = HANDLE_RESPONSE; */
/* queue[id].response = resp; */
/* pthread_cond_signal(&queue_cv); */
/* pthread_mutex_unlock(&queue_mutex); */
return 0;
}
static void
websocket_closed(struct wby_con *connection, void *userdata)
{
printf("WebSocket closed\n");
if (con == connection) con = NULL;
}
static void
test_log(const char* text)
{
DEBUG("[debug] %s\n", text);
}
void await_io_demand_or_timeout() {
/* pthread_mutex_lock(&queue_mutex); */
/* struct timeval now; */
/* gettimeofday(&now, NULL); */
/* struct timespec tsp; */
/* tsp.tv_sec = now.tv_sec; */
/* tsp.tv_nsec = now.tv_usec * 1000; */
/* tsp.tv_nsec += 200 * 1000000; // wait for 200ms max */
/* pthread_cond_timedwait(&queue_cv, &queue_mutex, &tsp); */
/* pthread_mutex_unlock(&queue_mutex); */
}
void *websocket_main(void *threadid)
{
void *memory = NULL;
wby_size needed_memory = 0;
struct wby_config config;
memset(&config, 0, sizeof config);
config.userdata = NULL;
config.address = "127.0.0.1";
config.port = 8888;
config.connection_max = 4;
config.request_buffer_size = 2048;
config.io_buffer_size = 8192;
config.log = test_log;
config.dispatch = dispatch;
config.ws_connect = websocket_connect;
config.ws_connected = websocket_connected;
config.ws_frame = websocket_frame;
config.ws_closed = websocket_closed;
wby_init(&server, &config, &needed_memory);
memory = calloc(needed_memory, 1);
wby_start(&server, memory);
printf("Awaiting WebSocket connection from Chrome extension.\n");
for (;;) {
// FIXME: makes reconnect impossible. :<
/* await_io_demand_or_timeout(); */
receive_tabfs_request_then_send_to_browser();
wby_update(&server); // We receive stuff during this phase.
}
wby_stop(&server);
free(memory);
#if defined(_WIN32)
WSACleanup();
#endif
return 0;
}