/*
copyright: Boaz Segev, 2016-2019
license: MIT

Feel free to copy, use and enjoy according to the license provided.
*/
#define FIO_INCLUDE_STR
#include <fio.h>

/* subscription lists have a long lifetime */
#define FIO_FORCE_MALLOC_TMP 1
#define FIO_INCLUDE_LINKED_LIST
#include <fio.h>

#include <fiobj.h>

#include <http.h>
#include <http_internal.h>

#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <websocket_parser.h>

#if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__)
#include <endian.h>
#if !defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) &&                 \
    __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
#define __BIG_ENDIAN__
#endif
#endif

/*******************************************************************************
Buffer management - update to change the way the buffer is handled.
*/
struct buffer_s {
  void *data;
  size_t size;
};

#pragma weak create_ws_buffer
/** returns a buffer_s struct, with a buffer (at least) `size` long. */
struct buffer_s create_ws_buffer(ws_s *owner);

#pragma weak resize_ws_buffer
/** returns a buffer_s struct, with a buffer (at least) `size` long. */
struct buffer_s resize_ws_buffer(ws_s *owner, struct buffer_s);

#pragma weak free_ws_buffer
/** releases an existing buffer. */
void free_ws_buffer(ws_s *owner, struct buffer_s);

/** Sets the initial buffer size. (4Kb)*/
#define WS_INITIAL_BUFFER_SIZE 4096UL

/*******************************************************************************
Buffer management - simple implementation...
Since Websocket connections have a long life expectancy, optimizing this part of
the code probably wouldn't offer a high performance boost.
*/

// buffer increments by 4,096 Bytes (4Kb)
#define round_up_buffer_size(size) (((size) >> 12) + 1) << 12

struct buffer_s create_ws_buffer(ws_s *owner) {
  (void)(owner);
  struct buffer_s buff;
  buff.size = WS_INITIAL_BUFFER_SIZE;
  buff.data = malloc(buff.size);
  return buff;
}

struct buffer_s resize_ws_buffer(ws_s *owner, struct buffer_s buff) {
  buff.size = round_up_buffer_size(buff.size);
  void *tmp = realloc(buff.data, buff.size);
  if (!tmp) {
    free_ws_buffer(owner, buff);
    buff.data = NULL;
    buff.size = 0;
  }
  buff.data = tmp;
  return buff;
}
void free_ws_buffer(ws_s *owner, struct buffer_s buff) {
  (void)(owner);
  if (buff.data)
    free(buff.data);
}

#undef round_up_buffer_size

/*******************************************************************************
Create/Destroy the websocket object (prototypes)
*/

static ws_s *new_websocket();
static void destroy_ws(ws_s *ws);

/*******************************************************************************
The Websocket object (protocol + parser)
*/
struct ws_s {
  /** The Websocket protocol */
  fio_protocol_s protocol;
  /** connection data */
  intptr_t fd;
  /** callbacks */
  void (*on_message)(ws_s *ws, fio_str_info_s msg, uint8_t is_text);
  void (*on_shutdown)(ws_s *ws);
  void (*on_ready)(ws_s *ws);
  void (*on_open)(ws_s *ws);
  void (*on_close)(intptr_t uuid, void *udata);
  /** Opaque user data. */
  void *udata;
  /** The maximum websocket message size */
  size_t max_msg_size;
  /** active pub/sub subscriptions */
  fio_ls_s subscriptions;
  fio_lock_i sub_lock;
  /** socket buffer. */
  struct buffer_s buffer;
  /** data length (how much of the buffer actually used). */
  size_t length;
  /** message buffer. */
  FIOBJ msg;
  /** latest text state. */
  uint8_t is_text;
  /** websocket connection type. */
  uint8_t is_client;
};

/* *****************************************************************************
Create/Destroy the websocket subscription objects
***************************************************************************** */

static inline void clear_subscriptions(ws_s *ws) {
  fio_lock(&ws->sub_lock);
  while (fio_ls_any(&ws->subscriptions)) {
    fio_unsubscribe(fio_ls_pop(&ws->subscriptions));
  }
  fio_unlock(&ws->sub_lock);
}

/* *****************************************************************************
Callbacks - Required functions for websocket_parser.h
***************************************************************************** */

static void websocket_on_unwrapped(void *ws_p, void *msg, uint64_t len,
                                   char first, char last, char text,
                                   unsigned char rsv) {
  ws_s *ws = ws_p;
  if (last && first) {
    ws->on_message(ws, (fio_str_info_s){.data = msg, .len = len},
                   (uint8_t)text);
    return;
  }
  if (first) {
    ws->is_text = (uint8_t)text;
    if (ws->msg == FIOBJ_INVALID)
      ws->msg = fiobj_str_buf(len);
    fiobj_str_resize(ws->msg, 0);
  }
  fiobj_str_write(ws->msg, msg, len);
  if (last) {
    ws->on_message(ws, fiobj_obj2cstr(ws->msg), ws->is_text);
  }

  (void)rsv;
}
static void websocket_on_protocol_ping(void *ws_p, void *msg_, uint64_t len) {
  ws_s *ws = ws_p;
  if (msg_) {
    void *buff = malloc(len + 16);
    len = (((ws_s *)ws)->is_client
               ? websocket_client_wrap(buff, msg_, len, 10, 1, 1, 0)
               : websocket_server_wrap(buff, msg_, len, 10, 1, 1, 0));
    fio_write2(ws->fd, .data.buffer = buff, .length = len);
  } else {
    if (((ws_s *)ws)->is_client) {
      fio_write2(ws->fd, .data.buffer = "\x89\x80mask", .length = 2,
                 .after.dealloc = FIO_DEALLOC_NOOP);
    } else {
      fio_write2(ws->fd, .data.buffer = "\x89\x00", .length = 2,
                 .after.dealloc = FIO_DEALLOC_NOOP);
    }
  }
}
static void websocket_on_protocol_pong(void *ws_p, void *msg, uint64_t len) {
  (void)len;
  (void)msg;
  (void)ws_p;
}
static void websocket_on_protocol_close(void *ws_p) {
  ws_s *ws = ws_p;
  fio_close(ws->fd);
}
static void websocket_on_protocol_error(void *ws_p) {
  ws_s *ws = ws_p;
  fio_close(ws->fd);
}

/*******************************************************************************
The Websocket Protocol implementation
*/

#define ws_protocol(fd) ((ws_s *)(server_get_protocol(fd)))

static void ws_ping(intptr_t fd, fio_protocol_s *ws) {
  (void)(ws);
  if (((ws_s *)ws)->is_client) {
    fio_write2(fd, .data.buffer = "\x89\x80MASK", .length = 6,
               .after.dealloc = FIO_DEALLOC_NOOP);
  } else {
    fio_write2(fd, .data.buffer = "\x89\x00", .length = 2,
               .after.dealloc = FIO_DEALLOC_NOOP);
  }
}

static void on_close(intptr_t uuid, fio_protocol_s *_ws) {
  destroy_ws((ws_s *)_ws);
  (void)uuid;
}

static void on_ready(intptr_t fduuid, fio_protocol_s *ws) {
  (void)(fduuid);
  if (((ws_s *)ws)->on_ready)
    ((ws_s *)ws)->on_ready((ws_s *)ws);
}

static uint8_t on_shutdown(intptr_t fd, fio_protocol_s *ws) {
  (void)(fd);
  if (ws && ((ws_s *)ws)->on_shutdown)
    ((ws_s *)ws)->on_shutdown((ws_s *)ws);
  if (((ws_s *)ws)->is_client) {
    fio_write2(fd, .data.buffer = "\x8a\x80MASK", .length = 6,
               .after.dealloc = FIO_DEALLOC_NOOP);
  } else {
    fio_write2(fd, .data.buffer = "\x8a\x00", .length = 2,
               .after.dealloc = FIO_DEALLOC_NOOP);
  }
  return 0;
}

static void on_data(intptr_t sockfd, fio_protocol_s *ws_) {
  ws_s *const ws = (ws_s *)ws_;
  if (ws == NULL)
    return;
  struct websocket_packet_info_s info =
      websocket_buffer_peek(ws->buffer.data, ws->length);
  const uint64_t raw_length = info.packet_length + info.head_length;
  /* test expected data amount */
  if (ws->max_msg_size < raw_length) {
    /* too big */
    websocket_close(ws);
    return;
  }
  /* test buffer capacity */
  if (raw_length > ws->buffer.size) {
    ws->buffer.size = (size_t)raw_length;
    ws->buffer = resize_ws_buffer(ws, ws->buffer);
    if (!ws->buffer.data) {
      // no memory.
      websocket_close(ws);
      return;
    }
  }

  const ssize_t len = fio_read(sockfd, (uint8_t *)ws->buffer.data + ws->length,
                               ws->buffer.size - ws->length);
  if (len <= 0) {
    return;
  }
  ws->length = websocket_consume(ws->buffer.data, ws->length + len, ws,
                                 (~(ws->is_client) & 1));

  fio_force_event(sockfd, FIO_EVENT_ON_DATA);
}

static void on_data_first(intptr_t sockfd, fio_protocol_s *ws_) {
  ws_s *const ws = (ws_s *)ws_;
  if (ws->on_open)
    ws->on_open(ws);
  ws->protocol.on_data = on_data;
  ws->protocol.on_ready = on_ready;

  if (ws->length) {
    ws->length = websocket_consume(ws->buffer.data, ws->length, ws,
                                   (~(ws->is_client) & 1));
  }
  fio_force_event(sockfd, FIO_EVENT_ON_DATA);
  fio_force_event(sockfd, FIO_EVENT_ON_READY);
}

/* later */
static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text,
                                 char first, char last, char client);

/*******************************************************************************
Create/Destroy the websocket object
*/

static ws_s *new_websocket(intptr_t uuid) {
  // allocate the protocol object
  ws_s *ws = malloc(sizeof(*ws));
  *ws = (ws_s){
      .protocol.ping = ws_ping,
      .protocol.on_data = on_data_first,
      .protocol.on_close = on_close,
      .protocol.on_ready = NULL /* filled in after `on_open` */,
      .protocol.on_shutdown = on_shutdown,
      .subscriptions = FIO_LS_INIT(ws->subscriptions),
      .is_client = 0,
      .fd = uuid,
  };
  return ws;
}
static void destroy_ws(ws_s *ws) {
  if (ws->on_close)
    ws->on_close(ws->fd, ws->udata);
  if (ws->msg)
    fiobj_free(ws->msg);
  clear_subscriptions(ws);
  free_ws_buffer(ws, ws->buffer);
  free(ws);
}

void websocket_attach(intptr_t uuid, http_settings_s *http_settings,
                      websocket_settings_s *args, void *data, size_t length) {
  ws_s *ws = new_websocket(uuid);
  FIO_ASSERT_ALLOC(ws);
  // we have an active websocket connection - prep the connection buffer
  ws->buffer = create_ws_buffer(ws);
  // Setup ws callbacks
  ws->on_open = args->on_open;
  ws->on_close = args->on_close;
  ws->on_message = args->on_message;
  ws->on_ready = args->on_ready;
  ws->on_shutdown = args->on_shutdown;
  // setup any user data
  ws->udata = args->udata;
  if (http_settings) {
    // client mode?
    ws->is_client = http_settings->is_client;
    // buffer limits
    ws->max_msg_size = http_settings->ws_max_msg_size;
    // update the timeout
    fio_timeout_set(uuid, http_settings->ws_timeout);
  } else {
    ws->max_msg_size = (1024 * 256);
    fio_timeout_set(uuid, 40);
  }

  if (data && length) {
    if (length > ws->buffer.size) {
      ws->buffer.size = length;
      ws->buffer = resize_ws_buffer(ws, ws->buffer);
      if (!ws->buffer.data) {
        // no memory.
        fio_attach(uuid, (fio_protocol_s *)ws);
        websocket_close(ws);
        return;
      }
    }
    memcpy(ws->buffer.data, data, length);
    ws->length = length;
  }
  // update the protocol object, cleaning up the old one
  fio_attach(uuid, (fio_protocol_s *)ws);
  // allow the on_open and on_data to take over the control.
  fio_force_event(uuid, FIO_EVENT_ON_DATA);
}

/*******************************************************************************
Writing to the Websocket
*/
#define WS_MAX_FRAME_SIZE                                                      \
  (FIO_MEMORY_BLOCK_ALLOC_LIMIT - 4096) // should be less then `unsigned short`

static void websocket_write_impl(intptr_t fd, void *data, size_t len, char text,
                                 char first, char last, char client) {
  if (len <= WS_MAX_FRAME_SIZE) {
    void *buff = fio_malloc(len + 16);
    len = (client ? websocket_client_wrap(buff, data, len, (text ? 1 : 2),
                                          first, last, 0)
                  : websocket_server_wrap(buff, data, len, (text ? 1 : 2),
                                          first, last, 0));
    fio_write2(fd, .data.buffer = buff, .length = len,
               .after.dealloc = fio_free);
  } else {
    /* frame fragmentation is better for large data then large frames */
    while (len > WS_MAX_FRAME_SIZE) {
      websocket_write_impl(fd, data, WS_MAX_FRAME_SIZE, text, first, 0, client);
      data = ((uint8_t *)data) + WS_MAX_FRAME_SIZE;
      first = 0;
      len -= WS_MAX_FRAME_SIZE;
    }
    websocket_write_impl(fd, data, len, text, first, 1, client);
  }
  return;
}

/* *****************************************************************************
Multi-client broadcast optimizations
***************************************************************************** */

static void websocket_optimize_free(fio_msg_s *msg, void *metadata) {
  fiobj_free((FIOBJ)metadata);
  (void)msg;
}

static inline fio_msg_metadata_s websocket_optimize(fio_str_info_s msg,
                                                    unsigned char opcode) {
  FIOBJ out = fiobj_str_buf(msg.len + 10);
  fiobj_str_resize(out,
                   websocket_server_wrap(fiobj_obj2cstr(out).data, msg.data,
                                         msg.len, opcode, 1, 1, 0));
  fio_msg_metadata_s ret = {
      .on_finish = websocket_optimize_free,
      .metadata = (void *)out,
  };
  return ret;
}
static fio_msg_metadata_s websocket_optimize_generic(fio_str_info_s ch,
                                                     fio_str_info_s msg,
                                                     uint8_t is_json) {
  fio_str_s tmp = FIO_STR_INIT_EXISTING(ch.data, ch.len, 0); // don't free
  tmp.dealloc = NULL;
  unsigned char opcode = 2;
  if (tmp.len <= (2 << 19) && fio_str_utf8_valid(&tmp)) {
    opcode = 1;
  }
  fio_msg_metadata_s ret = websocket_optimize(msg, opcode);
  ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB;
  return ret;
  (void)ch;
  (void)is_json;
}

static fio_msg_metadata_s websocket_optimize_text(fio_str_info_s ch,
                                                  fio_str_info_s msg,
                                                  uint8_t is_json) {
  fio_msg_metadata_s ret = websocket_optimize(msg, 1);
  ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB_TEXT;
  return ret;
  (void)ch;
  (void)is_json;
}

static fio_msg_metadata_s websocket_optimize_binary(fio_str_info_s ch,
                                                    fio_str_info_s msg,
                                                    uint8_t is_json) {
  fio_msg_metadata_s ret = websocket_optimize(msg, 2);
  ret.type_id = WEBSOCKET_OPTIMIZE_PUBSUB_BINARY;
  return ret;
  (void)ch;
  (void)is_json;
}

/**
 * Enables (or disables) broadcast optimizations.
 *
 * When using WebSocket pub/sub system is originally optimized for either
 * non-direct transmission (messages are handled by callbacks) or direct
 * transmission to 1-3 clients per channel (on average), meaning that the
 * majority of the messages are meant for a single recipient (or multiple
 * callback recipients) and only some are expected to be directly transmitted to
 * a group.
 *
 * However, when most messages are intended for direct transmission to more than
 * 3 clients (on average), certain optimizations can be made to improve memory
 * consumption (minimize duplication or WebSocket network data).
 *
 * This function allows enablement (or disablement) of these optimizations.
 * These optimizations include:
 *
 * * WEBSOCKET_OPTIMIZE_PUBSUB - optimize all direct transmission messages,
 *                               best attempt to detect Text vs. Binary data.
 * * WEBSOCKET_OPTIMIZE_PUBSUB_TEXT - optimize direct pub/sub text messages.
 * * WEBSOCKET_OPTIMIZE_PUBSUB_BINARY - optimize direct pub/sub binary messages.
 *
 * Note: to disable an optimization it should be disabled the same amount of
 * times it was enabled - multiple optimization enablements for the same type
 * are merged, but reference counted (disabled when reference is zero).
 */
void websocket_optimize4broadcasts(intptr_t type, int enable) {
  static intptr_t generic = 0;
  static intptr_t text = 0;
  static intptr_t binary = 0;
  fio_msg_metadata_s (*callback)(fio_str_info_s, fio_str_info_s, uint8_t);
  intptr_t *counter;
  switch ((0 - type)) {
  case (0 - WEBSOCKET_OPTIMIZE_PUBSUB):
    counter = &generic;
    callback = websocket_optimize_generic;
    break;
  case (0 - WEBSOCKET_OPTIMIZE_PUBSUB_TEXT):
    counter = &text;
    callback = websocket_optimize_text;
    break;
  case (0 - WEBSOCKET_OPTIMIZE_PUBSUB_BINARY):
    counter = &binary;
    callback = websocket_optimize_binary;
    break;
  default:
    return;
  }
  if (enable) {
    if (fio_atomic_add(counter, 1) == 1) {
      fio_message_metadata_callback_set(callback, 1);
    }
  } else {
    if (fio_atomic_sub(counter, 1) == 0) {
      fio_message_metadata_callback_set(callback, 0);
    }
  }
}

/* *****************************************************************************
Subscription handling
***************************************************************************** */

typedef struct {
  void (*on_message)(ws_s *ws, fio_str_info_s channel, fio_str_info_s msg,
                     void *udata);
  void (*on_unsubscribe)(void *udata);
  void *udata;
} websocket_sub_data_s;

static inline void websocket_on_pubsub_message_direct_internal(fio_msg_s *msg,
                                                               uint8_t txt) {
  fio_protocol_s *pr =
      fio_protocol_try_lock((intptr_t)msg->udata1, FIO_PR_LOCK_WRITE);
  if (!pr) {
    if (errno == EBADF)
      return;
    fio_message_defer(msg);
    return;
  }
  FIOBJ message = FIOBJ_INVALID;
  FIOBJ pre_wrapped = FIOBJ_INVALID;
  if (!((ws_s *)pr)->is_client) {
    /* pre-wrapping is only for client data */
    switch (txt) {
    case 0:
      pre_wrapped =
          (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB_BINARY);
      break;
    case 1:
      pre_wrapped =
          (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB_TEXT);
      break;
    case 2:
      pre_wrapped = (FIOBJ)fio_message_metadata(msg, WEBSOCKET_OPTIMIZE_PUBSUB);
      break;
    default:
      break;
    }
    if (pre_wrapped) {
      // FIO_LOG_DEBUG(
      //     "pub/sub WebSocket optimization route for pre-wrapped message.");
      fiobj_send_free((intptr_t)msg->udata1, fiobj_dup(pre_wrapped));
      goto finish;
    }
  }
  if (txt == 2) {
    /* unknown text state */
    fio_str_s tmp =
        FIO_STR_INIT_STATIC2(msg->msg.data, msg->msg.len); // don't free
    txt = (tmp.len >= (2 << 14) ? 0 : fio_str_utf8_valid(&tmp));
  }
  websocket_write((ws_s *)pr, msg->msg, txt & 1);
  fiobj_free(message);
finish:
  fio_protocol_unlock(pr, FIO_PR_LOCK_WRITE);
}

static void websocket_on_pubsub_message_direct(fio_msg_s *msg) {
  websocket_on_pubsub_message_direct_internal(msg, 2);
}

static void websocket_on_pubsub_message_direct_txt(fio_msg_s *msg) {
  websocket_on_pubsub_message_direct_internal(msg, 1);
}

static void websocket_on_pubsub_message_direct_bin(fio_msg_s *msg) {
  websocket_on_pubsub_message_direct_internal(msg, 0);
}

static void websocket_on_pubsub_message(fio_msg_s *msg) {
  fio_protocol_s *pr =
      fio_protocol_try_lock((intptr_t)msg->udata1, FIO_PR_LOCK_TASK);
  if (!pr) {
    if (errno == EBADF)
      return;
    fio_message_defer(msg);
    return;
  }
  websocket_sub_data_s *d = msg->udata2;

  if (d->on_message)
    d->on_message((ws_s *)pr, msg->channel, msg->msg, d->udata);
  fio_protocol_unlock(pr, FIO_PR_LOCK_TASK);
}

static void websocket_on_unsubscribe(void *u1, void *u2) {
  websocket_sub_data_s *d = u2;
  if (d->on_unsubscribe) {
    d->on_unsubscribe(d->udata);
  }

  if ((intptr_t)d->on_message == (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB) {
    websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB, 0);
  } else if ((intptr_t)d->on_message ==
             (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB_TEXT) {
    websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_TEXT, 0);
  } else if ((intptr_t)d->on_message ==
             (intptr_t)WEBSOCKET_OPTIMIZE_PUBSUB_BINARY) {
    websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_BINARY, 0);
  }
  free(d);
  (void)u1;
}

/**
 * Returns a subscription ID on success and 0 on failure.
 */
#undef websocket_subscribe
uintptr_t websocket_subscribe(struct websocket_subscribe_s args) {
  if (!args.ws || !fio_is_valid(args.ws->fd))
    goto error;
  websocket_sub_data_s *d = malloc(sizeof(*d));
  FIO_ASSERT_ALLOC(d);
  *d = (websocket_sub_data_s){
      .udata = args.udata,
      .on_message = args.on_message,
      .on_unsubscribe = args.on_unsubscribe,
  };
  void (*handler)(fio_msg_s *) = websocket_on_pubsub_message;
  if (!args.on_message) {
    intptr_t br_type;
    if (args.force_binary) {
      br_type = WEBSOCKET_OPTIMIZE_PUBSUB_BINARY;
      handler = websocket_on_pubsub_message_direct_bin;
    } else if (args.force_text) {
      br_type = WEBSOCKET_OPTIMIZE_PUBSUB_TEXT;
      handler = websocket_on_pubsub_message_direct_txt;
    } else {
      br_type = WEBSOCKET_OPTIMIZE_PUBSUB;
      handler = websocket_on_pubsub_message_direct;
    }
    websocket_optimize4broadcasts(br_type, 1);
    d->on_message =
        (void (*)(ws_s *, fio_str_info_s, fio_str_info_s, void *))br_type;
  }
  subscription_s *sub =
      fio_subscribe(.channel = args.channel, .match = args.match,
                    .on_unsubscribe = websocket_on_unsubscribe,
                    .on_message = handler, .udata1 = (void *)args.ws->fd,
                    .udata2 = d);
  if (!sub) {
    /* don't free `d`, return (`d` freed by fio_subscribe) */
    return 0;
  }
  fio_ls_s *pos;
  fio_lock(&args.ws->sub_lock);
  pos = fio_ls_push(&args.ws->subscriptions, sub);
  fio_unlock(&args.ws->sub_lock);

  return (uintptr_t)pos;
error:
  if (args.on_unsubscribe)
    args.on_unsubscribe(args.udata);
  return 0;
}

/**
 * Unsubscribes from a channel.
 */
void websocket_unsubscribe(ws_s *ws, uintptr_t subscription_id) {
  fio_unsubscribe((subscription_s *)((fio_ls_s *)subscription_id)->obj);
  fio_lock(&ws->sub_lock);
  fio_ls_remove((fio_ls_s *)subscription_id);
  fio_unlock(&ws->sub_lock);

  (void)ws;
}

/*******************************************************************************
The API implementation
*/

/** Returns the opaque user data associated with the websocket. */
void *websocket_udata_get(ws_s *ws) { return ws->udata; }

/** Returns the the process specific connection's UUID (see `libsock`). */
intptr_t websocket_uuid(ws_s *ws) { return ws->fd; }

/** Sets the opaque user data associated with the websocket.
 * Returns the old value, if any. */
void *websocket_udata_set(ws_s *ws, void *udata) {
  void *old = ws->udata;
  ws->udata = udata;
  return old;
}

/**
 * Returns 1 if the WebSocket connection is in Client mode (connected to a
 * remote server) and 0 if the connection is in Server mode (a connection
 * established using facil.io's HTTP server).
 */
uint8_t websocket_is_client(ws_s *ws) { return ws->is_client; }

/** Writes data to the websocket. Returns -1 on failure (0 on success). */
int websocket_write(ws_s *ws, fio_str_info_s msg, uint8_t is_text) {
  if (fio_is_valid(ws->fd)) {
    websocket_write_impl(ws->fd, msg.data, msg.len, is_text, 1, 1,
                         ws->is_client);
    return 0;
  }
  return -1;
}
/** Closes a websocket connection. */
void websocket_close(ws_s *ws) {
  fio_write2(ws->fd, .data.buffer = "\x88\x00", .length = 2,
             .after.dealloc = FIO_DEALLOC_NOOP);
  fio_close(ws->fd);
  return;
}
