| /* |
| copyright: Boaz Segev, 2016-2019 |
| license: MIT |
| |
| Feel free to copy, use and enjoy according to the license provided. |
| */ |
| #define FIO_INCLUDE_STR |
| #include <fio.h> |
| |
| /* subscription lists have a long lifetime */ |
| #define FIO_FORCE_MALLOC_TMP 1 |
| #define FIO_INCLUDE_LINKED_LIST |
| #include <fio.h> |
| |
| #include <fiobj.h> |
| |
| #include <http.h> |
| #include <http_internal.h> |
| |
| #include <arpa/inet.h> |
| #include <errno.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <strings.h> |
| |
| #include <websocket_parser.h> |
| |
| #if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) |
| #include <endian.h> |
| #if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) && \ |
| __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ |
| #define __BIG_ENDIAN__ |
| #endif |
| #endif |
| |
| /******************************************************************************* |
| Buffer management - update to change the way the buffer is handled. |
| */ |
| struct buffer_s { |
| void *data; |
| size_t size; |
| }; |
| |
| #pragma weak create_ws_buffer |
| /** returns a buffer_s struct, with a buffer (at least) `size` long. */ |
| struct buffer_s create_ws_buffer(ws_s *owner); |
| |
| #pragma weak resize_ws_buffer |
| /** returns a buffer_s struct, with a buffer (at least) `size` long. */ |
| struct buffer_s resize_ws_buffer(ws_s *owner, struct buffer_s); |
| |
| #pragma weak free_ws_buffer |
| /** releases an existing buffer. */ |
| void free_ws_buffer(ws_s *owner, struct buffer_s); |
| |
| /** Sets the initial buffer size. (4Kb)*/ |
| #define WS_INITIAL_BUFFER_SIZE 4096UL |
| |
| /******************************************************************************* |
| Buffer management - simple implementation... |
| Since Websocket connections have a long life expectancy, optimizing this part of |
| the code probably wouldn't offer a high performance boost. |
| */ |
| |
| // buffer increments by 4,096 Bytes (4Kb) |
| #define round_up_buffer_size(size) (((size) >> 12) + 1) << 12 |
| |
| struct buffer_s create_ws_buffer(ws_s *owner) { |
| (void)(owner); |
| struct buffer_s buff; |
| buff.size = WS_INITIAL_BUFFER_SIZE; |
| buff.data = malloc(buff.size); |
| return buff; |
| } |
| |
| struct buffer_s resize_ws_buffer(ws_s *owner, struct buffer_s buff) { |
| buff.size = round_up_buffer_size(buff.size); |
| void *tmp = realloc(buff.data, buff.size); |
| if (!tmp) { |
| free_ws_buffer(owner, buff); |
| buff.data = NULL; |
| buff.size = 0; |
| } |
| buff.data = tmp; |
| return buff; |
| } |
| void free_ws_buffer(ws_s *owner, struct buffer_s buff) { |
| (void)(owner); |
| free(buff.data); |
| } |
| |
| #undef round_up_buffer_size |
| |
| /******************************************************************************* |
| Create/Destroy the websocket object (prototypes) |
| */ |
| |
| static ws_s *new_websocket(); |
| static void destroy_ws(ws_s *ws); |
| |
| /******************************************************************************* |
| The Websocket object (protocol + parser) |
| */ |
| struct ws_s { |
| /** The Websocket protocol */ |
| fio_protocol_s protocol; |
| /** connection data */ |
| intptr_t fd; |
| /** callbacks */ |
| void (*on_message)(ws_s *ws, fio_str_info_s msg, uint8_t is_text); |
| void (*on_shutdown)(ws_s *ws); |
| void (*on_ready)(ws_s *ws); |
| void (*on_open)(ws_s *ws); |
| void (*on_close)(intptr_t uuid, void *udata); |
| /** Opaque user data. */ |
| void *udata; |
| /** The maximum websocket message size */ |
| size_t max_msg_size; |
| /** active pub/sub subscriptions */ |
| fio_ls_s subscriptions; |
| fio_lock_i sub_lock; |
| /** socket buffer. */ |
| struct buffer_s buffer; |
| /** data length (how much of the buffer actually used). */ |
| size_t length; |
| /** message buffer. */ |
| FIOBJ msg; |
| /** latest text state. */ |
| uint8_t is_text; |
| /** websocket connection type. */ |
| uint8_t is_client; |
| }; |
| |
| /* ***************************************************************************** |
| Create/Destroy the websocket subscription objects |
| ***************************************************************************** */ |
| |
| static inline void clear_subscriptions(ws_s *ws) { |
| fio_lock(&ws->sub_lock); |
| while (fio_ls_any(&ws->subscriptions)) { |
| fio_unsubscribe(fio_ls_pop(&ws->subscriptions)); |
| } |
| fio_unlock(&ws->sub_lock); |
| } |
| |
| /* ***************************************************************************** |
| Callbacks - Required functions for websocket_parser.h |
| ***************************************************************************** */ |
| |
| static void websocket_on_unwrapped(void *ws_p, void *msg, uint64_t len, |
| char first, char last, char text, |
| unsigned char rsv) { |
| ws_s *ws = ws_p; |
| if (last && first) { |
| ws->on_message(ws, (fio_str_info_s){.data = msg, .len = len}, |
| (uint8_t)text); |
| return; |
| } |
| if (first) { |
| ws->is_text = (uint8_t)text; |
| if (ws->msg == FIOBJ_INVALID) |
| ws->msg = fiobj_str_buf(len); |
| fiobj_str_resize(ws->msg, 0); |
| } |
| fiobj_str_write(ws->msg, msg, len); |
| if (last) { |
| ws->on_message(ws, fiobj_obj2cstr(ws->msg), ws->is_text); |
| } |
| |
| (void)rsv; |
| } |
| static void websocket_on_protocol_ping(void *ws_p, void *msg_, uint64_t len) { |
| ws_s *ws = ws_p; |
| if (msg_) { |
| void *buff = malloc(len + 16); |
| len = (((ws_s *)ws)->is_client |
| ? websocket_client_wrap(buff, msg_, len, 10, 1, 1, 0) |
| : websocket_server_wrap(buff, msg_, len, 10, 1, 1, 0)); |
| fio_write2(ws->fd, .data.buffer = buff, .length = len); |
| } else { |
| if (((ws_s *)ws)->is_client) { |
| fio_write2(ws->fd, .data.buffer = "\x89\x80mask", .length = 2, |
| .after.dealloc = FIO_DEALLOC_NOOP); |
| } else { |
| fio_write2(ws->fd, .data.buffer = "\x89\x00", .length = 2, |
| .after.dealloc = FIO_DEALLOC_NOOP); |
| } |
| } |
| } |
| static void websocket_on_protocol_pong(void *ws_p, void *msg, uint64_t len) { |
| (void)len; |
| (void)msg; |
| (void)ws_p; |
| } |
| static void websocket_on_protocol_close(void *ws_p) { |
| ws_s *ws = ws_p; |
| fio_close(ws->fd); |
| } |
| static void websocket_on_protocol_error(void *ws_p) { |
| ws_s *ws = ws_p; |
| fio_close(ws->fd); |
| } |
| |
| /******************************************************************************* |
| The Websocket Protocol implementation |
| */ |
| |
| #define ws_protocol(fd) ((ws_s *)(server_get_protocol(fd))) |
| |
| static void ws_ping(intptr_t fd, fio_protocol_s *ws) { |
| (void)(ws); |
| if (((ws_s *)ws)->is_client) { |
| fio_write2(fd, .data.buffer = "\x89\x80MASK", .length = 6, |
| .after.dealloc = FIO_DEALLOC_NOOP); |
| } else { |
| fio_write2(fd, .data.buffer = "\x89\x00", .length = 2, |
| .after.dealloc = FIO_DEALLOC_NOOP); |
| } |
| } |
| |
| static void on_close(intptr_t uuid, fio_protocol_s *_ws) { |
| destroy_ws((ws_s *)_ws); |
| (void)uuid; |
| } |
| |
| static void on_ready(intptr_t fduuid, fio_protocol_s *ws) { |
| (void)(fduuid); |
| if (((ws_s *)ws)->on_ready) |
| ((ws_s *)ws)->on_ready((ws_s *)ws); |
| } |
| |
| static uint8_t on_shutdown(intptr_t fd, fio_protocol_s *ws) { |
| (void)(fd); |
| if (ws && ((ws_s *)ws)->on_shutdown) |
| ((ws_s *)ws)->on_shutdown((ws_s *)ws); |
| if (((ws_s *)ws)->is_client) { |
| fio_write2(fd, .data.buffer = "\x8a\x80MASK", .length = 6, |
| .after.dealloc = FIO_DEALLOC_NOOP); |
| } else { |
| fio_write2(fd, .data.buffer = "\x8a\x00", .length = 2, |
| .after.dealloc = FIO_DEALLOC_NOOP); |
| } |
| return 0; |
| } |
| |
| static void on_data(intptr_t sockfd, fio_protocol_s *ws_) { |
| ws_s *const ws = (ws_s *)ws_; |
| if (ws == NULL) |
| return; |
| struct websocket_packet_info_s info = |
| websocket_buffer_peek(ws->buffer.data, ws->length); |
| const uint64_t raw_length = info.packet_length + info.head_length; |
| /* test expected data amount */ |
| if (ws->max_msg_size < raw_length) { |
| /* too big */ |
| websocket_close(ws); |
| return; |
| } |
| /* test buffer capacity */ |
| if (raw_length > ws->buffer.size) { |
| ws->buffer.size = (size_t)raw_length; |
| ws->buffer = resize_ws_buffer(ws, ws->buffer); |
| if (!ws->buffer.data) { |
| // no memory. |
| websocket_close(ws); |
| return; |
| } |
| } |
| |
| const ssize_t len = fio_read(sockfd, (uint8_t *)ws->buffer.data + ws->length, |
| ws->buffer.size - ws->length); |
| if (len <= 0) { |
| return; |
| } |
| ws->length = websocket_consume(ws->buffer.data, ws->length + len, ws, |
| (~(ws->is_client) & 1)); |
| |
| fio_force_event(sockfd, FIO_EVENT_ON_DATA); |
| } |
| |
| static void on_data_first(intptr_t sockfd, fio_protocol_s *ws_) { |
| ws_s *const ws = (ws_s *)ws_; |
| if (ws->on_open) |
| ws->on_open(ws); |
| ws->protocol.on_data = on_data; |
| ws->protocol.on_ready = on_ready; |
| |
| if (ws->length) { |
| ws->length = websocket_consume(ws->buffer.data, ws->length, ws, |
| (~(ws->is_client) & 1)); |
| } |
| fio_force_event(sockfd, FIO_EVENT_ON_DATA); |
| fio_force_event(sockfd, FIO_EVENT_ON_READY); |
| } |
| |
| /* later */ |
| static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text, |
| char first, char last, char client); |
| |
| /******************************************************************************* |
| Create/Destroy the websocket object |
| */ |
| |
| static ws_s *new_websocket(intptr_t uuid) { |
| // allocate the protocol object |
| ws_s *ws = malloc(sizeof(*ws)); |
| *ws = (ws_s){ |
| .protocol.ping = ws_ping, |
| .protocol.on_data = on_data_first, |
| .protocol.on_close = on_close, |
| .protocol.on_ready = NULL /* filled in after `on_open` */, |
| .protocol.on_shutdown = on_shutdown, |
| .subscriptions = FIO_LS_INIT(ws->subscriptions), |
| .is_client = 0, |
| .fd = uuid, |
| }; |
| return ws; |
| } |
| static void destroy_ws(ws_s *ws) { |
| if (ws->on_close) |
| ws->on_close(ws->fd, ws->udata); |
| if (ws->msg) |
| fiobj_free(ws->msg); |
| clear_subscriptions(ws); |
| free_ws_buffer(ws, ws->buffer); |
| free(ws); |
| } |
| |
| void websocket_attach(intptr_t uuid, http_settings_s *http_settings, |
| websocket_settings_s *args, void *data, size_t length) { |
| ws_s *ws = new_websocket(uuid); |
| FIO_ASSERT_ALLOC(ws); |
| // we have an active websocket connection - prep the connection buffer |
| ws->buffer = create_ws_buffer(ws); |
| // Setup ws callbacks |
| ws->on_open = args->on_open; |
| ws->on_close = args->on_close; |
| ws->on_message = args->on_message; |
| ws->on_ready = args->on_ready; |
| ws->on_shutdown = args->on_shutdown; |
| // setup any user data |
| ws->udata = args->udata; |
| if (http_settings) { |
| // client mode? |
| ws->is_client = http_settings->is_client; |
| // buffer limits |
| ws->max_msg_size = http_settings->ws_max_msg_size; |
| // update the timeout |
| fio_timeout_set(uuid, http_settings->ws_timeout); |
| } else { |
| ws->max_msg_size = (1024 * 256); |
| fio_timeout_set(uuid, 40); |
| } |
| |
| if (data && length) { |
| if (length > ws->buffer.size) { |
| ws->buffer.size = length; |
| ws->buffer = resize_ws_buffer(ws, ws->buffer); |
| if (!ws->buffer.data) { |
| // no memory. |
| fio_attach(uuid, (fio_protocol_s *)ws); |
| websocket_close(ws); |
| return; |
| } |
| } |
| memcpy(ws->buffer.data, data, length); |
| ws->length = length; |
| } |
| // update the protocol object, cleaning up the old one |
| fio_attach(uuid, (fio_protocol_s *)ws); |
| // allow the on_open and on_data to take over the control. |
| fio_force_event(uuid, FIO_EVENT_ON_DATA); |
| } |
| |
| /******************************************************************************* |
| Writing to the Websocket |
| */ |
| #define WS_MAX_FRAME_SIZE \ |
| (FIO_MEMORY_BLOCK_ALLOC_LIMIT - 4096) // should be less then `unsigned short` |
| |
| static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text, |
| char first, char last, char client) { |
| if (len <= WS_MAX_FRAME_SIZE) { |
| void *buff = fio_malloc(len + 16); |
| len = (client ? websocket_client_wrap(buff, data, len, (text ? 1 : 2), |
| first, last, 0) |
| : websocket_server_wrap(buff, data, len, (text ? 1 : 2), |
| first, last, 0)); |
| fio_write2(fd, .data.buffer = buff, .length = len, |
| .after.dealloc = fio_free); |
| } else { |
| /* frame fragmentation is better for large data then large frames */ |
| while (len > WS_MAX_FRAME_SIZE) { |
| websocket_write_impl(fd, data, WS_MAX_FRAME_SIZE, text, first, 0, client); |
| data = ((uint8_t *)data) + WS_MAX_FRAME_SIZE; |
| first = 0; |
| len -= WS_MAX_FRAME_SIZE; |
| } |
| websocket_write_impl(fd, data, len, text, first, 1, client); |
| } |
| return; |
| } |
| |
| /* ***************************************************************************** |
| Multi-client broadcast optimizations |
| ***************************************************************************** */ |
| |
| static void websocket_optimize_free(fio_msg_s *msg, void *metadata) { |
| fiobj_free((FIOBJ)metadata); |
| (void)msg; |
| } |
| |
| static inline fio_msg_metadata_s websocket_optimize(fio_str_info_s msg, |
| unsigned char opcode) { |
| FIOBJ out = fiobj_str_buf(msg.len + 10); |
| fiobj_str_resize(out, |
| websocket_server_wrap(fiobj_obj2cstr(out).data, msg.data, |
| msg.len, opcode, 1, 1, 0)); |
| fio_msg_metadata_s ret = { |
| .on_finish = websocket_optimize_free, |
| .metadata = (void *)out, |
| }; |
| return ret; |
| } |
| static fio_msg_metadata_s websocket_optimize_generic(fio_str_info_s ch, |
| fio_str_info_s msg, |
| uint8_t is_json) { |
| fio_str_s tmp = FIO_STR_INIT_EXISTING(ch.data, ch.len, 0); // don't free |
| tmp.dealloc = NULL; |
| unsigned char opcode = 2; |
| if (tmp.len <= (2 << 19) && fio_str_utf8_valid(&tmp)) { |
| opcode = 1; |
| } |
| fio_msg_metadata_s ret = websocket_optimize(msg, opcode); |
| ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB; |
| return ret; |
| (void)ch; |
| (void)is_json; |
| } |
| |
| static fio_msg_metadata_s websocket_optimize_text(fio_str_info_s ch, |
| fio_str_info_s msg, |
| uint8_t is_json) { |
| fio_msg_metadata_s ret = websocket_optimize(msg, 1); |
| ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB_TEXT; |
| return ret; |
| (void)ch; |
| (void)is_json; |
| } |
| |
| static fio_msg_metadata_s websocket_optimize_binary(fio_str_info_s ch, |
| fio_str_info_s msg, |
| uint8_t is_json) { |
| fio_msg_metadata_s ret = websocket_optimize(msg, 2); |
| ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB_BINARY; |
| return ret; |
| (void)ch; |
| (void)is_json; |
| } |
| |
| /** |
| * Enables (or disables) broadcast optimizations. |
| * |
| * When using WebSocket pub/sub system is originally optimized for either |
| * non-direct transmission (messages are handled by callbacks) or direct |
| * transmission to 1-3 clients per channel (on average), meaning that the |
| * majority of the messages are meant for a single recipient (or multiple |
| * callback recipients) and only some are expected to be directly transmitted to |
| * a group. |
| * |
| * However, when most messages are intended for direct transmission to more than |
| * 3 clients (on average), certain optimizations can be made to improve memory |
| * consumption (minimize duplication or WebSocket network data). |
| * |
| * This function allows enablement (or disablement) of these optimizations. |
| * These optimizations include: |
| * |
| * * WEBSOCKET_OPTIMIZE_PUBSUB - optimize all direct transmission messages, |
| * best attempt to detect Text vs. Binary data. |
| * * WEBSOCKET_OPTIMIZE_PUBSUB_TEXT - optimize direct pub/sub text messages. |
| * * WEBSOCKET_OPTIMIZE_PUBSUB_BINARY - optimize direct pub/sub binary messages. |
| * |
| * Note: to disable an optimization it should be disabled the same amount of |
| * times it was enabled - multiple optimization enablements for the same type |
| * are merged, but reference counted (disabled when reference is zero). |
| */ |
| void websocket_optimize4broadcasts(intptr_t type, int enable) { |
| static intptr_t generic = 0; |
| static intptr_t text = 0; |
| static intptr_t binary = 0; |
| fio_msg_metadata_s (*callback)(fio_str_info_s, fio_str_info_s, uint8_t); |
| intptr_t *counter; |
| switch ((0 - type)) { |
| case (0 - WEBSOCKET_OPTIMIZE_PUBSUB): |
| counter = &generic; |
| callback = websocket_optimize_generic; |
| break; |
| case (0 - WEBSOCKET_OPTIMIZE_PUBSUB_TEXT): |
| counter = &text; |
| callback = websocket_optimize_text; |
| break; |
| case (0 - WEBSOCKET_OPTIMIZE_PUBSUB_BINARY): |
| counter = &binary; |
| callback = websocket_optimize_binary; |
| break; |
| default: |
| return; |
| } |
| if (enable) { |
| if (fio_atomic_add(counter, 1) == 1) { |
| fio_message_metadata_callback_set(callback, 1); |
| } |
| } else { |
| if (fio_atomic_sub(counter, 1) == 0) { |
| fio_message_metadata_callback_set(callback, 0); |
| } |
| } |
| } |
| |
| /* ***************************************************************************** |
| Subscription handling |
| ***************************************************************************** */ |
| |
| typedef struct { |
| void (*on_message)(ws_s *ws, fio_str_info_s channel, fio_str_info_s msg, |
| void *udata); |
| void (*on_unsubscribe)(void *udata); |
| void *udata; |
| } websocket_sub_data_s; |
| |
| static inline void websocket_on_pubsub_message_direct_internal(fio_msg_s *msg, |
| uint8_t txt) { |
| fio_protocol_s *pr = |
| fio_protocol_try_lock((intptr_t)msg->udata1, FIO_PR_LOCK_WRITE); |
| if (!pr) { |
| if (errno == EBADF) |
| return; |
| fio_message_defer(msg); |
| return; |
| } |
| FIOBJ message = FIOBJ_INVALID; |
| FIOBJ pre_wrapped = FIOBJ_INVALID; |
| if (!((ws_s *)pr)->is_client) { |
| /* pre-wrapping is only for client data */ |
| switch (txt) { |
| case 0: |
| pre_wrapped = |
| (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB_BINARY); |
| break; |
| case 1: |
| pre_wrapped = |
| (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB_TEXT); |
| break; |
| case 2: |
| pre_wrapped = (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB); |
| break; |
| default: |
| break; |
| } |
| if (pre_wrapped) { |
| // FIO_LOG_DEBUG( |
| // "pub/sub WebSocket optimization route for pre-wrapped message."); |
| fiobj_send_free((intptr_t)msg->udata1, fiobj_dup(pre_wrapped)); |
| goto finish; |
| } |
| } |
| if (txt == 2) { |
| /* unknown text state */ |
| fio_str_s tmp = |
| FIO_STR_INIT_STATIC2(msg->msg.data, msg->msg.len); // don't free |
| txt = (tmp.len >= (2 << 14) ? 0 : fio_str_utf8_valid(&tmp)); |
| } |
| websocket_write((ws_s *)pr, msg->msg, txt & 1); |
| fiobj_free(message); |
| finish: |
| fio_protocol_unlock(pr, FIO_PR_LOCK_WRITE); |
| } |
| |
| static void websocket_on_pubsub_message_direct(fio_msg_s *msg) { |
| websocket_on_pubsub_message_direct_internal(msg, 2); |
| } |
| |
| static void websocket_on_pubsub_message_direct_txt(fio_msg_s *msg) { |
| websocket_on_pubsub_message_direct_internal(msg, 1); |
| } |
| |
| static void websocket_on_pubsub_message_direct_bin(fio_msg_s *msg) { |
| websocket_on_pubsub_message_direct_internal(msg, 0); |
| } |
| |
| static void websocket_on_pubsub_message(fio_msg_s *msg) { |
| fio_protocol_s *pr = |
| fio_protocol_try_lock((intptr_t)msg->udata1, FIO_PR_LOCK_TASK); |
| if (!pr) { |
| if (errno == EBADF) |
| return; |
| fio_message_defer(msg); |
| return; |
| } |
| websocket_sub_data_s *d = msg->udata2; |
| |
| if (d->on_message) |
| d->on_message((ws_s *)pr, msg->channel, msg->msg, d->udata); |
| fio_protocol_unlock(pr, FIO_PR_LOCK_TASK); |
| } |
| |
| static void websocket_on_unsubscribe(void *u1, void *u2) { |
| websocket_sub_data_s *d = u2; |
| if (d->on_unsubscribe) { |
| d->on_unsubscribe(d->udata); |
| } |
| |
| if ((intptr_t)d->on_message == (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB) { |
| websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB, 0); |
| } else if ((intptr_t)d->on_message == |
| (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB_TEXT) { |
| websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_TEXT, 0); |
| } else if ((intptr_t)d->on_message == |
| (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB_BINARY) { |
| websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_BINARY, 0); |
| } |
| free(d); |
| (void)u1; |
| } |
| |
| /** |
| * Returns a subscription ID on success and 0 on failure. |
| */ |
| #undef websocket_subscribe |
| uintptr_t websocket_subscribe(struct websocket_subscribe_s args) { |
| if (!args.ws || !fio_is_valid(args.ws->fd)) |
| goto error; |
| websocket_sub_data_s *d = malloc(sizeof(*d)); |
| FIO_ASSERT_ALLOC(d); |
| *d = (websocket_sub_data_s){ |
| .udata = args.udata, |
| .on_message = args.on_message, |
| .on_unsubscribe = args.on_unsubscribe, |
| }; |
| void (*handler)(fio_msg_s *) = websocket_on_pubsub_message; |
| if (!args.on_message) { |
| intptr_t br_type; |
| if (args.force_binary) { |
| br_type = WEBSOCKET_OPTIMIZE_PUBSUB_BINARY; |
| handler = websocket_on_pubsub_message_direct_bin; |
| } else if (args.force_text) { |
| br_type = WEBSOCKET_OPTIMIZE_PUBSUB_TEXT; |
| handler = websocket_on_pubsub_message_direct_txt; |
| } else { |
| br_type = WEBSOCKET_OPTIMIZE_PUBSUB; |
| handler = websocket_on_pubsub_message_direct; |
| } |
| websocket_optimize4broadcasts(br_type, 1); |
| d->on_message = |
| (void (*)(ws_s *, fio_str_info_s, fio_str_info_s, void *))br_type; |
| } |
| subscription_s *sub = |
| fio_subscribe(.channel = args.channel, .match = args.match, |
| .on_unsubscribe = websocket_on_unsubscribe, |
| .on_message = handler, .udata1 = (void *)args.ws->fd, |
| .udata2 = d); |
| if (!sub) { |
| /* don't free `d`, return (`d` freed by fio_subscribe) */ |
| return 0; |
| } |
| fio_ls_s *pos; |
| fio_lock(&args.ws->sub_lock); |
| pos = fio_ls_push(&args.ws->subscriptions, sub); |
| fio_unlock(&args.ws->sub_lock); |
| |
| return (uintptr_t)pos; |
| error: |
| if (args.on_unsubscribe) |
| args.on_unsubscribe(args.udata); |
| return 0; |
| } |
| |
| /** |
| * Unsubscribes from a channel. |
| */ |
| void websocket_unsubscribe(ws_s *ws, uintptr_t subscription_id) { |
| fio_unsubscribe((subscription_s *)((fio_ls_s *)subscription_id)->obj); |
| fio_lock(&ws->sub_lock); |
| fio_ls_remove((fio_ls_s *)subscription_id); |
| fio_unlock(&ws->sub_lock); |
| |
| (void)ws; |
| } |
| |
| /******************************************************************************* |
| The API implementation |
| */ |
| |
| /** Returns the opaque user data associated with the websocket. */ |
| void *websocket_udata_get(ws_s *ws) { return ws->udata; } |
| |
| /** Returns the the process specific connection's UUID (see `libsock`). */ |
| intptr_t websocket_uuid(ws_s *ws) { return ws->fd; } |
| |
| /** Sets the opaque user data associated with the websocket. |
| * Returns the old value, if any. */ |
| void *websocket_udata_set(ws_s *ws, void *udata) { |
| void *old = ws->udata; |
| ws->udata = udata; |
| return old; |
| } |
| |
| /** |
| * Returns 1 if the WebSocket connection is in Client mode (connected to a |
| * remote server) and 0 if the connection is in Server mode (a connection |
| * established using facil.io's HTTP server). |
| */ |
| uint8_t websocket_is_client(ws_s *ws) { return ws->is_client; } |
| |
| /** Writes data to the websocket. Returns -1 on failure (0 on success). */ |
| int websocket_write(ws_s *ws, fio_str_info_s msg, uint8_t is_text) { |
| if (fio_is_valid(ws->fd)) { |
| websocket_write_impl(ws->fd, msg.data, msg.len, is_text, 1, 1, |
| ws->is_client); |
| return 0; |
| } |
| return -1; |
| } |
| /** Closes a websocket connection. */ |
| void websocket_close(ws_s *ws) { |
| fio_write2(ws->fd, .data.buffer = "\x88\x00", .length = 2, |
| .after.dealloc = FIO_DEALLOC_NOOP); |
| fio_close(ws->fd); |
| return; |
| } |