/*
Copyright: Boaz Segev, 2016-2017
License: MIT

Feel free to copy, use and enjoy according to the license provided.
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include "libsock.h"

#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <netdb.h>
#include <stdio.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>

/* *****************************************************************************
Use spinlocks "spnlock.h".

For portability, it's possible copy "spnlock.h" directly after this line.
*/
#include "spnlock.h"

/* *****************************************************************************
Support `libreact` on_close callback, if exist.
*/

#pragma weak reactor_on_close
void reactor_on_close(intptr_t uuid) { (void)(uuid); }
#pragma weak reactor_remove
int reactor_remove(intptr_t uuid) {
  (void)(uuid);
  return -1;
}

/* *****************************************************************************
Support timeout setting.
*/
#pragma weak sock_touch
void sock_touch(intptr_t uuid) { (void)(uuid); }

/* *****************************************************************************
Support event based `write` scheduling.
*/
#pragma weak async_run
int async_run(void (*task)(void *), void *arg) {
  (void)(task);
  (void)(arg);
  return -1;
}

/* *****************************************************************************
OS Sendfile settings.
*/

#ifndef USE_SENDFILE

#if defined(__linux__) /* linux sendfile works  */
#include <sys/sendfile.h>
#define USE_SENDFILE 1
#elif defined(__unix__) /* BSD sendfile should work, but isn't tested */
#include <sys/uio.h>
#define USE_SENDFILE 0
#elif defined(__APPLE__) /* Is the apple sendfile still broken? */
#include <sys/uio.h>
#define USE_SENDFILE 1
#else /* sendfile might not be available - always set to 0 */
#define USE_SENDFILE 0
#endif

#endif

/* *****************************************************************************
Buffer and socket map memory allocation. Defaults to mmap.
*/
#ifndef USE_MALLOC
#define USE_MALLOC 0
#endif

/* *****************************************************************************
The system call to `write` (non-blocking) can be defered when using `libasync`.

However, this will not prevent `sock_write` from cycling through the sockets and
flushing them (block emulation) when both the system and the user level buffers
are full.

Defaults to 1 (defered).
*/
#ifndef SOCK_DELAY_WRITE
#define SOCK_DELAY_WRITE 1
#endif

/* *****************************************************************************
Library related helper functions
*/

/**
Sets a socket to non blocking state.
*/
inline int sock_set_non_block(int fd) // Thanks to Bjorn Reese
{
/* If they have O_NONBLOCK, use the Posix way to do it */
#if defined(O_NONBLOCK)
  /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
  int flags;
  if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
    flags = 0;
  // printf("flags initial value was %d\n", flags);
  return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#else
  /* Otherwise, use the old way of doing it */
  static int flags = 1;
  return ioctl(fd, FIOBIO, &flags);
#endif
}

/**
Gets the maximum number of file descriptors this process can be allowed to
access (== maximum fd value + 1).
*/
ssize_t sock_max_capacity(void) {
  // get current limits
  static ssize_t flim = 0;
  if (flim)
    return flim;
#ifdef _SC_OPEN_MAX
  flim = sysconf(_SC_OPEN_MAX);
#elif defined(OPEN_MAX)
  flim = OPEN_MAX;
#endif
  // try to maximize limits - collect max and set to max
  struct rlimit rlim = {.rlim_max = 0};
  getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
//        rlim.rlim_max);
#if defined(__APPLE__) /* Apple's getrlimit is broken. */
  rlim.rlim_cur = rlim.rlim_max >= OPEN_MAX ? OPEN_MAX : rlim.rlim_max;
#else
  rlim.rlim_cur = rlim.rlim_max;
#endif

  setrlimit(RLIMIT_NOFILE, &rlim);
  getrlimit(RLIMIT_NOFILE, &rlim);
  // printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
  //        rlim.rlim_max);
  // if the current limit is higher than it was, update
  if (flim < ((ssize_t)rlim.rlim_cur))
    flim = rlim.rlim_cur;
  // return what we have
  return flim;
}

/* *****************************************************************************
Library Core Data
*/

typedef struct {
  /** write buffer - a linked list */
  sock_packet_s *packet;
  /** The fd UUID for the current connection */
  fduuid_u fduuid;
  /** the amount of data sent from the current buffer packet */
  uint32_t sent;
  /** state lock */
  spn_lock_i lock;
  /* -- state flags -- */
  /** Connection is open */
  unsigned open : 1;
  /** indicated that the connection should be closed. */
  unsigned close : 1;
  /** indicated that the connection experienced an error. */
  unsigned err : 1;
  /** future flags. */
  unsigned rsv : 5;
  /* -- placement enforces padding to guaranty memory alignment -- */
  /** Read/Write hooks. */
  sock_rw_hook_s *rw_hooks;
} fd_info_s;

#define LIB_SOCK_STATE_OPEN 1
#define LIB_SOCK_STATE_CLOSED 0

static fd_info_s *fd_info = NULL;
static size_t fd_capacity = 0;

#define uuid2info(uuid) fd_info[sock_uuid2fd(uuid)]
#define is_valid(uuid)                                                         \
  (sock_uuid2fd(uuid) >= 0 && sock_uuid2fd(uuid) <= (int)fd_capacity &&        \
   fd_info[sock_uuid2fd(uuid)].fduuid.data.counter ==                          \
       ((fduuid_u *)(&uuid))->data.counter &&                                  \
   uuid2info(uuid).open)

static struct {
  sock_packet_s *pool;
  sock_packet_s *allocated;
  spn_lock_i lock;
} buffer_pool = {.lock = SPN_LOCK_INIT};

#define BUFFER_PACKET_REAL_SIZE (sizeof(sock_packet_s) + BUFFER_PACKET_SIZE)

/* reset a socket state */
static inline void set_fd(int fd, unsigned int state) {
  fd_info_s old_data;
  // lock and update
  spn_lock(&fd_info[fd].lock);
  old_data = fd_info[fd];
  fd_info[fd] = (fd_info_s){
      .fduuid.data.counter = fd_info[fd].fduuid.data.counter + state,
      .fduuid.data.fd = fd,
      .lock = fd_info[fd].lock,
      .open = state,
  };
  // unlock
  spn_unlock(&fd_info[fd].lock);
  // should be called within the lock? - no function calling within a
  // spinlock.
  if (old_data.rw_hooks && old_data.rw_hooks->on_clear)
    old_data.rw_hooks->on_clear(old_data.fduuid.uuid, old_data.rw_hooks);
  // clear old data
  if (old_data.packet)
    sock_free_packet(old_data.packet);
  // call callback if exists
  if (old_data.open) {
    // if (state == LIB_SOCK_STATE_OPEN)
    //   printf(
    //       "STRONG FD COLLISION PROTECTION: A new connection was accepted "
    //       "while the old one was marked as open.\n");
    reactor_remove(old_data.fduuid.uuid);
    reactor_on_close(old_data.fduuid.uuid);
  }
}

/**
Destroys the library data.

Call this function before calling any `libsock` functions.
*/
static void destroy_lib_data(void) {
  if (fd_info) {
    while (fd_capacity--) { // include 0 in countdown
      // if (fd_info[fd_capacity].open) {
      //   fprintf(stderr, "Socket %lu is marked as open\n", fd_capacity);
      // }
      set_fd(fd_capacity, LIB_SOCK_STATE_CLOSED);
    }
#if USE_MALLOC == 1
    free(fd_info);
#else
    munmap(fd_info,
           (BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL) +
               (sizeof(fd_info_s) * fd_capacity));
#endif
  }
  fd_info = NULL;
  buffer_pool.pool = NULL;
  buffer_pool.allocated = NULL;
  buffer_pool.lock = SPN_LOCK_INIT;
}

/**
Initializes the library.

Call this function before calling any `libsock` functions.
*/
static void sock_lib_init(void) {
  if (fd_info)
    return;

  fd_capacity = sock_max_capacity();
  size_t fd_map_mem_size = sizeof(fd_info_s) * fd_capacity;
  size_t buffer_mem_size = BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL;

  void *buff_mem;
#if USE_MALLOC == 1
  buff_mem = malloc(fd_map_mem_size + buffer_mem_size);
  if (buff_mem == NULL) {
    perror("Couldn't initialize libsock - not enough memory? ");
    exit(1);
  }
#else
  buff_mem = mmap(NULL, fd_map_mem_size + buffer_mem_size,
                  PROT_READ | PROT_WRITE | PROT_EXEC,
                  MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
  // MAP_SHARED | MAP_ANONYMOUS, -1, 0);
  if (buff_mem == MAP_FAILED || buff_mem == NULL) {
    perror("Couldn't initialize libsock - not enough memory? ");
    exit(1);
  }
#endif
  fd_info = buff_mem;
  for (size_t i = 0; i < fd_capacity; i++) {
    fd_info[i] = (fd_info_s){.lock = SPN_LOCK_INIT};
    spn_unlock(&fd_info[i].lock);
  }
  /* initialize pool */
  buffer_pool.allocated = (void *)((uintptr_t)buff_mem + fd_map_mem_size);
  buffer_pool.pool = buffer_pool.allocated;
  sock_packet_s *pos = buffer_pool.pool;
  for (size_t i = 0; i < BUFFER_PACKET_POOL - 1; i++) {
    *pos = (sock_packet_s){
        .metadata.next = (void *)(((uintptr_t)pos) + BUFFER_PACKET_REAL_SIZE),
    };
    pos = pos->metadata.next;
  }
  pos->metadata.next = 0;
  /* deallocate and manage on exit */
  atexit(destroy_lib_data);
#ifdef DEBUG
  fprintf(stderr, "\nInitialized libsock for %lu sockets, "
                  "each one requires %lu bytes.\n"
                  "overall ovearhead: %lu bytes.\n"
                  "Initialized packet pool for %d elements, "
                  "each one %lu bytes.\n"
                  "overall buffer ovearhead: %lu bytes.\n"
                  "=== Total: %lu bytes ===\n\n",
          fd_capacity, sizeof(*fd_info), sizeof(*fd_info) * fd_capacity,
          BUFFER_PACKET_POOL, BUFFER_PACKET_REAL_SIZE,
          BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL,
          (BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL) +
              (sizeof(*fd_info) * fd_capacity));
#endif
}

#define review_lib()                                                           \
  if (fd_info == NULL)                                                         \
    sock_lib_init();

/* *****************************************************************************
Read / Write internals
*/

#define ERR_OK (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN)
#define ERR_TRY_AGAIN (errno == EINTR)

static inline int sock_flush_fd_failed(int fd) {
  sock_free_packet(fd_info[fd].packet);
  fd_info[fd].packet = NULL;
  fd_info[fd].close = 1;
  fd_info[fd].err = 1;
  return 0;
}

#if USE_SENDFILE == 1

#if defined(__linux__) /* linux sendfile API */
static inline int sock_flush_os_sendfile(int fd) {
  ssize_t sent;
  sock_packet_s *packet = fd_info[fd].packet;
  sent =
      sendfile64(fd, (int)((ssize_t)packet->buffer), &packet->metadata.offset,
                 packet->length - fd_info[fd].sent);

  if (sent < 0) {
    if (ERR_OK)
      return -1;
    else if (ERR_TRY_AGAIN)
      return 0;
    else
      return sock_flush_fd_failed(fd);
  }
  if (sent == 0)
    fd_info[fd].sent = packet->length;
  fd_info[fd].sent += sent;
  return 0;
}

#elif defined(__APPLE__) || defined(__unix__) /* BSD / Apple API */

static inline int sock_flush_os_sendfile(int fd) {
  off_t act_sent;
  sock_packet_s *packet = fd_info[fd].packet;
  act_sent = packet->length - fd_info[fd].sent;

#if defined(__APPLE__)
  if (sendfile((int)((ssize_t)packet->buffer), fd, packet->metadata.offset,
               &act_sent, NULL, 0) < 0 &&
      act_sent == 0)
#else
  if (sendfile((int)((ssize_t)packet->buffer), fd, packet->metadata.offset,
               (size_t)act_sent, NULL, &act_sent, 0) < 0 &&
      act_sent == 0)
#endif
  {
    if (ERR_OK)
      return -1;
    else if (ERR_TRY_AGAIN)
      return 0;
    else
      return sock_flush_fd_failed(fd);
  }
  if (act_sent == 0) {
    fd_info[fd].sent = packet->length;
    return 0;
  }
  packet->metadata.offset += act_sent;
  fd_info[fd].sent += act_sent;
  return 0;
}
#endif

#else

static inline int sock_flush_os_sendfile(int fd) { return -1; }

#endif

static inline int sock_flush_fd(int fd) {
  if (USE_SENDFILE && fd_info[fd].rw_hooks == NULL)
    return sock_flush_os_sendfile(fd);
  ssize_t sent;
  sock_packet_s *packet = fd_info[fd].packet;
  // how much data are we expecting to send...?
  ssize_t i_exp = (BUFFER_PACKET_SIZE > packet->length) ? packet->length
                                                        : BUFFER_PACKET_SIZE;

  // read data into the internal buffer
  if (packet->metadata.internal_flag == 0) {
    ssize_t i_read;
    i_read = pread((int)((ssize_t)packet->buffer), packet + 1, i_exp,
                   packet->metadata.offset);
    if (i_read <= 0) {
      fd_info[fd].sent = fd_info[fd].packet->length;
      return 0;
    } else {
      packet->metadata.offset += i_read;
      packet->metadata.internal_flag = 1;
    }
  }
  // send the data
  if (fd_info[fd].rw_hooks && fd_info[fd].rw_hooks->write)
    sent = fd_info[fd].rw_hooks->write(
        fd_info[fd].fduuid.uuid,
        (void *)(((uintptr_t)(packet + 1)) + fd_info[fd].sent),
        i_exp - fd_info[fd].sent);
  else
    sent = write(fd, (void *)(((uintptr_t)(packet + 1)) + fd_info[fd].sent),
                 i_exp - fd_info[fd].sent);
  // review result and update packet data
  if (sent < 0) {
    if (ERR_OK)
      return -1;
    else if (ERR_TRY_AGAIN)
      return 0;
    else
      return sock_flush_fd_failed(fd);
  }
  fd_info[fd].sent += sent;
  if (fd_info[fd].sent >= i_exp) {
    packet->metadata.internal_flag = 0;
    fd_info[fd].sent = 0;
    packet->length -= i_exp;
  }
  return 0;
}

static inline int sock_flush_data(int fd) {
  ssize_t sent;
  if (fd_info[fd].rw_hooks && fd_info[fd].rw_hooks->write)
    sent = fd_info[fd].rw_hooks->write(
        fd_info[fd].fduuid.uuid,
        (void *)((uintptr_t)fd_info[fd].packet->buffer + fd_info[fd].sent),
        fd_info[fd].packet->length - fd_info[fd].sent);
  else
    sent = write(
        fd, (void *)((uintptr_t)fd_info[fd].packet->buffer + fd_info[fd].sent),
        fd_info[fd].packet->length - fd_info[fd].sent);
  if (sent < 0) {
    if (ERR_OK)
      return -1;
    else if (ERR_TRY_AGAIN)
      return 0;
    else
      return sock_flush_fd_failed(fd);
  }
  fd_info[fd].sent += sent;
  return 0;
}

static void sock_flush_unsafe(int fd) {
  while (fd_info[fd].packet) {
    if (fd_info[fd].packet->metadata.is_fd == 0) {
      if (sock_flush_data(fd))
        return;
    } else {
      if (sock_flush_fd(fd))
        return;
    }
    if (fd_info[fd].packet && fd_info[fd].packet->length <= fd_info[fd].sent) {
      sock_packet_s *packet = fd_info[fd].packet;
      fd_info[fd].packet = packet->metadata.next;
      packet->metadata.next = NULL;
      fd_info[fd].sent = 0;
      sock_free_packet(packet);
    }
  }
}

#if SOCK_DELAY_WRITE == 1

static inline void sock_flush_schd(intptr_t uuid) {
  if (async_run((void (*)(void *))sock_flush, (void *)uuid) == -1)
    goto fallback;
  return;
fallback:
  sock_flush_unsafe(sock_uuid2fd(uuid));
}

#define _write_to_sock() sock_flush_schd(sfd->fduuid.uuid)

#else

#define _write_to_sock() sock_flush_unsafe(fd)

#endif

static inline void sock_send_packet_unsafe(int fd, sock_packet_s *packet) {
  fd_info_s *sfd = fd_info + fd;
  if (sfd->packet == NULL) {
    /* no queue, nothing to check */
    sfd->packet = packet;
    _write_to_sock();
    return;

  } else if (packet->metadata.urgent == 0) {
    /* not urgent, last in line */
    sock_packet_s *pos = sfd->packet;
    while (pos->metadata.next)
      pos = pos->metadata.next;
    pos->metadata.next = packet;
    _write_to_sock();
    return;

  } else {
    /* urgent, find a spot we can interrupt */
    sock_packet_s **pos = &sfd->packet;
    while (*pos && (*pos)->metadata.can_interrupt == 0)
      pos = &(*pos)->metadata.next;
    sock_packet_s *tail = *pos;
    *pos = packet;
    if (tail) {
      pos = &packet->metadata.next;
      while (*pos)
        pos = &(*pos)->metadata.next;
      *pos = tail;
    }
  }
  _write_to_sock();
}

/* *****************************************************************************
Listen
*/

/**
Opens a listening non-blocking socket. Return's the socket's UUID.
*/
intptr_t sock_listen(const char *address, const char *port) {
  review_lib();
  int srvfd;
  // setup the address
  struct addrinfo hints;
  struct addrinfo *servinfo;       // will point to the results
  memset(&hints, 0, sizeof hints); // make sure the struct is empty
  hints.ai_family = AF_UNSPEC;     // don't care IPv4 or IPv6
  hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
  hints.ai_flags = AI_PASSIVE;     // fill in my IP for me
  if (getaddrinfo(address, port, &hints, &servinfo)) {
    // perror("addr err");
    return -1;
  }
  // get the file descriptor
  srvfd =
      socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
  if (srvfd <= 0) {
    // perror("socket err");
    freeaddrinfo(servinfo);
    return -1;
  }
  // make sure the socket is non-blocking
  if (sock_set_non_block(srvfd) < 0) {
    // perror("couldn't set socket as non blocking! ");
    freeaddrinfo(servinfo);
    close(srvfd);
    return -1;
  }
  // avoid the "address taken"
  {
    int optval = 1;
    setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
  }
  // bind the address to the socket
  {
    int bound = 0;
    for (struct addrinfo *p = servinfo; p != NULL; p = p->ai_next) {
      if (!bind(srvfd, p->ai_addr, p->ai_addrlen))
        bound = 1;
    }

    if (!bound) {
      // perror("bind err");
      freeaddrinfo(servinfo);
      close(srvfd);
      return -1;
    }
  }
  freeaddrinfo(servinfo);
  // listen in
  if (listen(srvfd, SOMAXCONN) < 0) {
    // perror("couldn't start listening");
    close(srvfd);
    return -1;
  }
  set_fd(srvfd, LIB_SOCK_STATE_OPEN);
  return fd_info[srvfd].fduuid.uuid;
}

/* *****************************************************************************
Accept
*/

intptr_t sock_accept(intptr_t srv_uuid) {
  review_lib();
  static socklen_t cl_addrlen = 0;
  int client;
#ifdef SOCK_NONBLOCK
  client = accept4(sock_uuid2fd(srv_uuid), NULL, &cl_addrlen, SOCK_NONBLOCK);
  if (client <= 0)
    return -1;
#else
  client = accept(sock_uuid2fd(srv_uuid), NULL, &cl_addrlen);
  if (client <= 0)
    return -1;
  sock_set_non_block(client);
#endif
  set_fd(client, LIB_SOCK_STATE_OPEN);
  return fd_info[client].fduuid.uuid;
}

/* *****************************************************************************
Connect
*/
intptr_t sock_connect(char *address, char *port) {
  review_lib();
  int fd;
  // setup the address
  struct addrinfo hints;
  struct addrinfo *addrinfo;       // will point to the results
  memset(&hints, 0, sizeof hints); // make sure the struct is empty
  hints.ai_family = AF_UNSPEC;     // don't care IPv4 or IPv6
  hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
  hints.ai_flags = AI_PASSIVE;     // fill in my IP for me
  if (getaddrinfo(address, port, &hints, &addrinfo)) {
    return -1;
  }
  // get the file descriptor
  fd =
      socket(addrinfo->ai_family, addrinfo->ai_socktype, addrinfo->ai_protocol);
  if (fd <= 0) {
    freeaddrinfo(addrinfo);
    return -1;
  }
  // make sure the socket is non-blocking
  if (sock_set_non_block(fd) < 0) {
    freeaddrinfo(addrinfo);
    close(fd);
    return -1;
  }

  if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0 &&
      errno != EINPROGRESS) {
    close(fd);
    freeaddrinfo(addrinfo);
    return -1;
  }
  freeaddrinfo(addrinfo);
  set_fd(fd, LIB_SOCK_STATE_OPEN);
  return fd_info[fd].fduuid.uuid;
}

/* *****************************************************************************
Open existing
*/

intptr_t sock_open(int fd) {
  review_lib();
  set_fd(fd, LIB_SOCK_STATE_OPEN);
  return fd_info[fd].fduuid.uuid;
}

/* *****************************************************************************
Information about the socket
*/

/**
Returns 1 if the uuid refers to a valid and open, socket.

Returns 0 if not.
*/
int sock_isvalid(intptr_t uuid) { return fd_info && is_valid(uuid); }

/**
`sock_fd2uuid` takes an existing file decriptor `fd` and returns it's active
`uuid`.
*/
intptr_t sock_fd2uuid(int fd) {
  return (fd_info && fd_info[fd].open) ? fd_info[fd].fduuid.uuid : -1;
}

/* *****************************************************************************
Buffer API.
*/

static inline sock_packet_s *sock_try_checkout_packet(void) {
  sock_packet_s *packet;
  spn_lock(&buffer_pool.lock);
  packet = buffer_pool.pool;
  if (packet) {
    buffer_pool.pool = packet->metadata.next;
    spn_unlock(&buffer_pool.lock);
    *packet = (sock_packet_s){.buffer = packet + 1, .metadata.next = NULL};
    return packet;
  }
  spn_unlock(&buffer_pool.lock);
  return packet;
}

/**
Checks out a `sock_packet_s` from the packet pool, transfering the
ownership of the memory to the calling function. The function will hang until
a
packet becomes available, so never check out more then a single packet at a
time.
*/
sock_packet_s *sock_checkout_packet(void) {
  review_lib();
  sock_packet_s *packet = NULL;
  for (;;) {
    spn_lock(&buffer_pool.lock);
    packet = buffer_pool.pool;
    if (packet) {
      buffer_pool.pool = packet->metadata.next;
      spn_unlock(&buffer_pool.lock);
      *packet = (sock_packet_s){
          .buffer = packet + 1, .metadata.next = NULL, .metadata.dealloc = free,
      };
      return packet;
    }
    spn_unlock(&buffer_pool.lock);
    reschedule_thread();
    sock_flush_all();
  }
}
/**
Attaches a packet to a socket's output buffer and calls `sock_flush` for the
socket.
*/
ssize_t sock_send_packet(intptr_t uuid, sock_packet_s *packet) {
  if (!fd_info || !is_valid(uuid)) {
    sock_free_packet(packet);
    return -1;
  }
  spn_lock(&uuid2info(uuid).lock);
  sock_send_packet_unsafe(sock_uuid2fd(uuid), packet);
  spn_unlock(&uuid2info(uuid).lock);
  return 0;
}

/**
Returns TRUE (non 0) if there is data waiting to be written to the socket in
the
user-land buffer.
*/
int sock_packets_pending(intptr_t uuid) {
  return fd_info && uuid2info(uuid).packet != NULL;
}

/**
Use `sock_free_packet` to free unused packets that were checked-out using
`sock_checkout_packet`.
*/
void sock_free_packet(sock_packet_s *packet) {
  sock_packet_s *next = packet;
  if (packet == NULL)
    return;
  for (;;) {
    if (next->metadata.is_fd) {
      if (next->metadata.keep_open == 0)
        close((int)((ssize_t)next->buffer));
    } else if (next->metadata.external)
      next->metadata.dealloc(next->buffer);
    if (next->metadata.next == NULL)
      break; /* next will hold the last packet in the chain. */
    next = next->metadata.next;
  }
  spn_lock(&buffer_pool.lock);
  next->metadata.next = buffer_pool.pool;
  buffer_pool.pool = packet;
  spn_unlock(&buffer_pool.lock);
}

/* *****************************************************************************
Reading
*/
ssize_t sock_read(intptr_t uuid, void *buf, size_t count) {
  if (!fd_info || !is_valid(uuid)) {
    errno = ENODEV;
    return -1;
  }
  ssize_t i_read;
  fd_info_s *sfd = fd_info + sock_uuid2fd(uuid);
  if (sfd->rw_hooks && sfd->rw_hooks->read)
    i_read = sfd->rw_hooks->read(uuid, buf, count);
  else
    i_read = read(sock_uuid2fd(uuid), buf, count);

  if (i_read > 0) {
    sock_touch(uuid);
    return i_read;
  }
  if (i_read == -1 && (ERR_OK || ERR_TRY_AGAIN))
    return 0;
  // fprintf(stderr, "Read Error for %lu bytes from fd %d (closing))\n",
  // count,
  //         sock_uuid2fd(uuid));
  sock_close(uuid);
  return -1;
}

/* *****************************************************************************
Flushing
*/

ssize_t sock_flush(intptr_t uuid) {
  if (!fd_info || !is_valid(uuid))
    return -1;
  if (uuid2info(uuid).packet == NULL)
    goto no_packet;
  spn_lock(&uuid2info(uuid).lock);
  sock_flush_unsafe(sock_uuid2fd(uuid));
  spn_unlock(&uuid2info(uuid).lock);
no_packet:
  if (uuid2info(uuid).close) {
    sock_force_close(uuid);
    return -1;
  }
  return 0;
}
/**
`sock_flush_strong` performs the same action as `sock_flush` but returns only
after all the data was sent. This is an "active" wait, polling isn't
performed.
*/
void sock_flush_strong(intptr_t uuid) {
  if (!fd_info)
    return;
  while (is_valid(uuid) && uuid2info(uuid).packet)
    sock_flush(uuid);
}
/**
Calls `sock_flush` for each file descriptor that's buffer isn't empty.
*/
void sock_flush_all(void) {
  for (size_t i = 0; i < fd_capacity; i++) {
    if (fd_info[i].packet == NULL || spn_is_locked(&fd_info[i].lock))
      continue;
    sock_flush(fd_info[i].fduuid.uuid);
  }
}

/* *****************************************************************************
Writing
*/

ssize_t sock_write2_fn(sock_write_info_s options) {
  if (!fd_info || !is_valid(options.fduuid)) {
    errno = ENODEV;
    return -1;
  }
  if (options.buffer == NULL)
    return -1;
  if (!options.length && !options.is_fd)
    options.length = strlen(options.buffer);
  if (options.length == 0)
    return -1;
  sock_packet_s *packet = sock_checkout_packet();
  packet->metadata.can_interrupt = 1;
  packet->metadata.urgent = options.urgent;

  if (options.is_fd) {
    packet->buffer = (void *)options.buffer;
    packet->length = options.length;
    packet->metadata.is_fd = options.is_fd;
    packet->metadata.offset = options.offset;
    return sock_send_packet(options.fduuid, packet);
  } else {
    if (options.move) {
      packet->buffer = (void *)options.buffer;
      packet->length = options.length;
      packet->metadata.external = 1;
      return sock_send_packet(options.fduuid, packet);
    } else {
      if (options.length <= BUFFER_PACKET_SIZE) {
        memcpy(packet->buffer, options.buffer, options.length);
        packet->length = options.length;
        return sock_send_packet(options.fduuid, packet);
      } else {
        if (packet->metadata.urgent) {
          fprintf(stderr, "Socket err:"
                          "Large data cannot be sent as an urgent packet.\n"
                          "Urgency silently ignored\n");
          packet->metadata.urgent = 0;
        }
        size_t to_cpy;
        spn_lock(&uuid2info(options.fduuid).lock);
        for (;;) {
          to_cpy = options.length > BUFFER_PACKET_SIZE ? BUFFER_PACKET_SIZE
                                                       : options.length;
          memcpy(packet->buffer, options.buffer, to_cpy);
          packet->length = to_cpy;
          options.length -= to_cpy;
          options.buffer = (void *)((uintptr_t)options.buffer + to_cpy);
          sock_send_packet_unsafe(sock_uuid2fd(options.fduuid), packet);
          if (!is_valid(options.fduuid) || uuid2info(options.fduuid).err == 1 ||
              options.length == 0)
            break;
          packet = sock_try_checkout_packet();
          while (packet == NULL) {
            sock_flush_all();
            sock_flush_unsafe(sock_uuid2fd(options.fduuid));
            packet = sock_try_checkout_packet();
          }
        }
        spn_unlock(&uuid2info(options.fduuid).lock);
        if (uuid2info(options.fduuid).packet == NULL &&
            uuid2info(options.fduuid).close) {
          sock_force_close(options.fduuid);
          return -1;
        }
        return is_valid(options.fduuid) ? 0 : -1;
      }
    }
  }
  // how did we get here?
  return -1;
}

/* *****************************************************************************
Closing.
*/

void sock_close(intptr_t uuid) {
  // fprintf(stderr, "called sock_close for %lu (%d)\n", uuid,
  // sock_uuid2fd(uuid));
  if (!fd_info || !is_valid(uuid))
    return;
  fd_info[sock_uuid2fd(uuid)].close = 1;
  sock_flush(uuid);
}

void sock_force_close(intptr_t uuid) {
  // fprintf(stderr, "called sock_force_close for %lu (%d)\n", uuid,
  //         sock_uuid2fd(uuid));
  if (!fd_info || !is_valid(uuid))
    return;
  shutdown(sock_uuid2fd(uuid), SHUT_RDWR);
  close(sock_uuid2fd(uuid));
  set_fd(sock_uuid2fd(uuid), LIB_SOCK_STATE_CLOSED);
}

/* *****************************************************************************
RW hooks implementation
*/

/** Gets a socket hook state (a pointer to the struct). */
struct sock_rw_hook_s *sock_rw_hook_get(intptr_t uuid) {
  if (!fd_info || !is_valid(uuid))
    return NULL;
  return uuid2info(uuid).rw_hooks;
}

/** Sets a socket hook state (a pointer to the struct). */
int sock_rw_hook_set(intptr_t uuid, sock_rw_hook_s *rw_hooks) {
  if (!fd_info || !is_valid(uuid))
    return -1;
  spn_lock(&(uuid2info(uuid).lock));
  uuid2info(uuid).rw_hooks = rw_hooks;
  spn_unlock(&uuid2info(uuid).lock);
  return 0;
}

/* *****************************************************************************
test
*/
#ifdef DEBUG
void sock_libtest(void) {
  sock_lib_init();
  sock_packet_s *p, *pl;
  size_t count = 0;
  fprintf(stderr, "Testing packet pool\n");
  for (size_t i = 0; i < BUFFER_PACKET_POOL * 2; i++) {
    count = 1;
    pl = p = sock_checkout_packet();
    while (buffer_pool.pool) {
      count++;
      pl->metadata.next = sock_checkout_packet();
      pl = pl->metadata.next;
    }
    sock_free_packet(p);
    // fprintf(stderr, "Collected and freed %lu packets.\n", count);
  }
  fprintf(stderr,
          "liniar (no-contention) packet checkout + free shows %lu packets. "
          "test %s\n",
          count, count == BUFFER_PACKET_POOL ? "passed." : "FAILED!");
}
#endif
