blob: ae4833aa50f0133c35741de1c1353e53607d3eb8 [file] [log] [blame] [raw]
/*
copyright: Boaz segev, 2016
license: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
/** \file
The libreact-socket is a socket wrapper, using a user level buffer, non-blocking
sockets and some helper functions.
*/
#include "libsock.h"
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/resource.h>
#include <netdb.h>
#include <stdatomic.h> // http://en.cppreference.com/w/c/atomic
#include <sys/uio.h>
#ifndef DEBUG_SOCKLIB
#define DEBUG_SOCKLIB 0
#endif
#ifndef USE_SENDFILE
#if defined(__linux__) /* linux sendfile works, but isn't implemented */
#include <sys/sendfile.h>
#define USE_SENDFILE 1
#elif defined(__unix__) /* BSD sendfile should work, but isn't implemented */
#define USE_SENDFILE 0
#elif defined(__APPLE__) /* Apple sendfile was broken and probably still is */
#define USE_SENDFILE 1
#else /* sendfile might not be available - always set to 0 */
#define USE_SENDFILE 0
#endif
#endif
/* *****************************************************************************
Avoid including the "libreact.h" file, the following is all we need.
*/
// struct Reactor; // in the libreact_socket.h file
#pragma weak reactor_add
int reactor_add(struct Reactor* reactor, int fd) {
fprintf(stderr, "ERROR: Reactor library bypassed or missing!\n");
return -1;
}
#pragma weak reactor_close
void reactor_close(struct Reactor* resactor, int fd) {}
/* *****************************************************************************
User land Buffer
*/
// packet sizes
#ifndef BUFFER_PACKET_REAL_SIZE
#define BUFFER_PACKET_REAL_SIZE \
((sizeof(sock_packet_s) * 1) + BUFFER_PACKET_SIZE)
#endif
// The global packet container pool
static struct {
sock_packet_s* _Atomic pool;
sock_packet_s* initialized;
} ContainerPool = {NULL};
// grab a packet from the pool
sock_packet_s* sock_checkout_packet(void) {
sock_packet_s* packet = atomic_load(&ContainerPool.pool);
while (packet) {
if (atomic_compare_exchange_weak(&ContainerPool.pool, &packet,
packet->metadata.next))
break;
}
if (packet == NULL) {
packet = malloc(BUFFER_PACKET_REAL_SIZE);
}
if (!packet)
return NULL;
*packet = (sock_packet_s){.buffer = packet + 1, .metadata.next = NULL};
return packet;
}
// return a packet to the pool, or free it (when the pool is full).
void sock_free_packet(sock_packet_s* packet) {
sock_packet_s* next = packet;
while (next) {
if (next->metadata.is_fd)
close((int)((ssize_t)next->buffer));
else if (next->metadata.is_file)
fclose(next->buffer);
else if (next->metadata.external)
free(next->buffer);
next = next->metadata.next;
}
if (ContainerPool.initialized) {
while (packet) {
next = packet->metadata.next;
if (packet >= ContainerPool.initialized &&
packet < (ContainerPool.initialized +
(sizeof(sock_packet_s) * BUFFER_MAX_PACKET_POOL))) {
packet->metadata.next = atomic_load(&ContainerPool.pool);
for (;;) {
if (atomic_compare_exchange_weak(&ContainerPool.pool,
&packet->metadata.next, packet))
break;
}
} else {
free(packet);
}
packet = next;
}
} else {
while (packet) {
next = packet->metadata.next;
free(packet);
packet = next;
}
}
}
/* *****************************************************************************
Socket related data and internal helpers
*/
/** a map for all the active file descriptors that were added to any reactor -
* do NOT edit! */
static struct FDData {
pthread_mutex_t lock;
sock_packet_s* packet;
struct Reactor* owner;
sock_rw_hook_s* rw_hooks;
uint32_t sent;
unsigned open : 1;
unsigned client : 1;
unsigned close : 1;
unsigned rsv : 5;
}* fd_map = NULL;
static size_t fdmax = 0;
void sock_clear(int fd) {
if (!fd_map || fdmax <= fd)
return;
struct FDData* sfd = fd_map + fd;
pthread_mutex_lock(&sfd->lock);
if (sfd->packet)
sock_free_packet(sfd->packet);
if (sfd->rw_hooks && sfd->rw_hooks->on_clear)
sfd->rw_hooks->on_clear(fd, sfd->rw_hooks);
*sfd = (struct FDData){.lock = sfd->lock};
pthread_mutex_unlock(&sfd->lock);
}
static void destroy_all_fd_data(void) {
if (fd_map) {
while (fdmax--) { // include 0 in countdown
sock_clear(fdmax);
pthread_mutex_destroy(&fd_map[fdmax].lock);
}
free(fd_map);
fd_map = NULL;
}
if (ContainerPool.initialized) {
ContainerPool.pool = NULL;
free(ContainerPool.initialized);
ContainerPool.initialized = NULL;
}
}
static void init_pool_data(void) {
ContainerPool.initialized =
calloc(BUFFER_PACKET_REAL_SIZE, BUFFER_MAX_PACKET_POOL);
void* initaddr = ContainerPool.initialized;
for (size_t i = 0; i < (BUFFER_MAX_PACKET_POOL - 1); i++) {
// By filling up the packet pool, we avoid memory fragmentation (unless
// packets are freed).
initaddr += BUFFER_PACKET_REAL_SIZE;
((sock_packet_s*)initaddr)->metadata.next =
(initaddr + BUFFER_PACKET_REAL_SIZE);
}
ContainerPool.pool = ContainerPool.initialized;
}
int8_t init_socklib(size_t max) {
if (!fd_map)
atexit(destroy_all_fd_data);
if (ContainerPool.initialized == NULL)
init_pool_data();
struct FDData* n_map = calloc(sizeof(*n_map), max);
if (!n_map)
return -1;
if (fd_map) {
memcpy(n_map, fd_map, sizeof(*n_map) * fdmax);
}
fd_map = n_map;
while (fdmax < max) {
if (pthread_mutex_init(&fd_map[fdmax++].lock, NULL)) {
perror("init_socklib Couldn't initialize a mutex!");
fdmax--;
exit(1);
}
}
fdmax = max;
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
if (DEBUG_SOCKLIB)
fprintf(stderr,
"\nInitialized fd_map for %lu elements, each one %lu bytes.\n"
"overall: %lu bytes.\n"
"Initialized packet pool for %d elements, each one %lu bytes.\n"
"overall: %lu bytes.\n"
"=== Total: %lu bytes\n==========\n\n",
max, sizeof(*fd_map), sizeof(*fd_map) * max, BUFFER_MAX_PACKET_POOL,
BUFFER_PACKET_REAL_SIZE,
BUFFER_PACKET_REAL_SIZE * BUFFER_MAX_PACKET_POOL,
(BUFFER_PACKET_REAL_SIZE * BUFFER_MAX_PACKET_POOL) +
(sizeof(*fd_map) * max));
#endif
return 0;
}
#define require_mem(fd) \
do { \
if ((fd) >= fdmax && init_socklib((fd))) { \
perror("Couldn't allocate memory for fd_map (sock_API)"); \
exit(1); \
} \
} while (0);
#define require_conn(fd) \
do { \
if ((fd) >= fdmax || fd_map[(fd)].open == 0) \
return -1; \
} while (0);
/* *****************************************************************************
Machine wide and Helper API
*/
/**
Gets the maximum number of file descriptors this process can be allowed to
access.
Returns -1 on error.
*/
ssize_t sock_max_capacity(void) {
// get current limits
static ssize_t flim = 0;
if (flim)
return flim;
#ifdef _SC_OPEN_MAX
flim = sysconf(_SC_OPEN_MAX) - 1;
#elif defined(OPEN_MAX)
flim = OPEN_MAX - 1;
#endif
// try to maximize limits - collect max and set to max
struct rlimit rlim;
getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
// rlim.rlim_max);
rlim.rlim_cur = rlim.rlim_max;
setrlimit(RLIMIT_NOFILE, &rlim);
getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
// rlim.rlim_max);
// if the current limit is higher than it was, update
if (flim < rlim.rlim_cur)
flim = rlim.rlim_cur;
// return what we have
return flim;
}
/**
Sets a socket to non blocking state.
*/
inline int sock_set_non_block(int fd) // Thanks to Bjorn Reese
{
/* If they have O_NONBLOCK, use the Posix way to do it */
#if defined(O_NONBLOCK)
/* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
int flags;
if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
flags = 0;
// printf("flags initial value was %d\n", flags);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#else
/* Otherwise, use the old way of doing it */
static int flags = 1;
return ioctl(fd, FIOBIO, &flags);
#endif
}
int sock_listen(char* address, char* port) {
int srvfd;
// setup the address
struct addrinfo hints;
struct addrinfo* servinfo; // will point to the results
memset(&hints, 0, sizeof hints); // make sure the struct is empty
hints.ai_family = AF_UNSPEC; // don't care IPv4 or IPv6
hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
hints.ai_flags = AI_PASSIVE; // fill in my IP for me
if (getaddrinfo(address, port, &hints, &servinfo)) {
perror("addr err");
return -1;
}
// get the file descriptor
srvfd =
socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
if (srvfd <= 0) {
perror("socket err");
freeaddrinfo(servinfo);
return -1;
}
// make sure the socket is non-blocking
if (sock_set_non_block(srvfd) < 0) {
perror("couldn't set socket as non blocking! ");
freeaddrinfo(servinfo);
close(srvfd);
return -1;
}
// avoid the "address taken"
{
int optval = 1;
setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
// bind the address to the socket
{
int bound = 0;
for (struct addrinfo* p = servinfo; p != NULL; p = p->ai_next) {
if (!bind(srvfd, p->ai_addr, p->ai_addrlen))
bound = 1;
}
if (!bound) {
perror("bind err");
freeaddrinfo(servinfo);
close(srvfd);
return -1;
}
}
freeaddrinfo(servinfo);
// listen in
if (listen(srvfd, SOMAXCONN) < 0) {
perror("couldn't start listening");
close(srvfd);
return -1;
}
return srvfd;
}
/* *****************************************************************************
Normal (raw) socket operation API
*/
/**
`sock_accept` accepts a new socket connection from the listening socket
`server_fd`, allowing the use of `sock_` functions with this new file
descriptor.
The reactor is registered as the owner of the client socket and the client
socket is added to the reactor, when possible.
*/
int sock_accept(struct Reactor* owner, int server_fd) {
static socklen_t cl_addrlen = 0;
int client;
#ifdef SOCK_NONBLOCK
client = accept4(server_fd, NULL, &cl_addrlen, SOCK_NONBLOCK);
if (client <= 0)
return -1;
#else
client = accept(server_fd, NULL, &cl_addrlen);
if (client <= 0)
return -1;
sock_set_non_block(client);
#endif
// attach the new client socket to the reactor.
if (sock_open(owner, client)) {
close(client);
return -1;
}
return client;
}
/**
`sock_connect` is similar to `sock_accept` but should be used to initiate a
client connection to the address requested.
Returns the new file descriptor fd. Retruns -1 on error.
*/
int sock_connect(struct Reactor* owner, char* address, char* port) {
return -1;
}
/**
`sock_open` takes an existing file decriptor `fd` and initializes it's status as
open and available for `sock_API` calls.
If a reactor pointer `owner` is provided, the `fd` will be attached to the
reactor.
Retruns -1 on error and 0 on success.
*/
int sock_open(struct Reactor* owner, int fd) {
require_mem(fd);
// attach the new client socket to the reactor.
if (owner && reactor_add(owner, fd) < 0) {
return -1;
}
struct FDData* sfd = fd_map + fd;
if (sfd->open) { // remove old connection buffer and data.
sock_clear(fd);
}
sfd->open = 1;
sfd->owner = owner;
return 0;
}
/**
`sock_attach` sets the reactor owner for a socket and attaches the socket to the
reactor.
Use this function when the socket was already opened with no reactor association
and it's data (buffer etc') is already initialized.
This is useful for a delayed attachment, where some more testing is required
before attaching a socket to a reactor.
Retruns -1 on error and 0 on success.
*/
int sock_attach(struct Reactor* owner, int fd) {
require_conn(fd);
fd_map[fd].owner = owner;
return (reactor_add(owner, fd) < 0);
}
/** Returns the state of the socket, similar to calling `fcntl(fd, F_GETFL)`.
* Returns -1 if the connection is closed. */
int sock_status(int fd) {
if (fd < fdmax && fd_map[fd].open)
return fcntl(fd, F_GETFL);
return -1;
}
/**
`sock_read` attempts to read up to count bytes from file descriptor fd into the
buffer starting at buf.
It's behavior should conform to the native `read` implementations, except some
data might be available in the `fd`'s buffer while it is not available to be
read using sock_read (i.e., when using a transport layer, such as TLS).
Also, some internal buffering will be used in cases where the transport layer
data available is larger then the data requested.
*/
ssize_t sock_read(int fd, void* buf, size_t count) {
require_conn(fd);
ssize_t i_read;
struct FDData* fd_d = fd_map + fd;
if (fd_d->rw_hooks && fd_d->rw_hooks->read)
i_read = fd_d->rw_hooks->read(fd, buf, count);
else
i_read = recv(fd, buf, count, 0);
if (i_read > 0)
return i_read;
if (i_read && (errno & (EWOULDBLOCK | EAGAIN)))
return 0;
fd_d->close = 1;
return -1;
}
/**
`sock_write2_fn` is the actual function behind the macro `sock_write2`.
*/
ssize_t sock_write2_fn(sock_write_info_s options) {
if (!options.fd || !options.buffer)
return -1;
require_conn(options.fd);
if (!options.length && !options.file)
options.length = strlen(options.buffer);
sock_packet_s* packet = sock_checkout_packet();
if (!packet) {
if (options.file)
fclose((void*)options.buffer);
return -1;
}
packet->metadata.can_interrupt = 1;
packet->metadata.urgent = options.urgent;
if (options.file || options.is_fd) {
packet->buffer = (void*)options.buffer;
packet->length = options.length;
packet->metadata.is_file = options.file;
packet->metadata.is_fd = options.is_fd;
return sock_send_packet(options.fd, packet);
} else {
if (options.move) {
packet->buffer = (void*)options.buffer;
packet->length = options.length;
packet->metadata.external = 1;
return sock_send_packet(options.fd, packet);
} else {
if (options.length <= BUFFER_PACKET_SIZE) {
memcpy(packet->buffer, options.buffer, options.length);
packet->length = options.length;
return sock_send_packet(options.fd, packet);
} else {
sock_packet_s* last_packet = packet;
size_t counter = 0;
while (options.length > BUFFER_PACKET_SIZE) {
memcpy(last_packet->buffer,
options.buffer + (counter * BUFFER_PACKET_SIZE),
BUFFER_PACKET_SIZE);
last_packet->length = BUFFER_PACKET_SIZE;
options.length -= BUFFER_PACKET_SIZE;
++counter;
last_packet->metadata.next = sock_checkout_packet();
if (last_packet->metadata.next) {
last_packet = last_packet->metadata.next;
continue;
} else {
sock_free_packet(packet);
return -1;
}
}
memcpy(last_packet->buffer,
options.buffer + (counter * BUFFER_PACKET_SIZE), options.length);
last_packet->length = options.length;
return sock_send_packet(options.fd, packet);
}
}
}
// how did we get here?
return -1;
}
/*
Helper function for speed optimizations
We MUST continue sending data until all the data was sent OR we gen an EAGAIN |
EWOULDBLOCK error - otherwise the `on_ready` event will never get called and our
`sock_flush` function son't get called.
*/
inline static ssize_t sock_flush_in_lock(int fd) {
struct FDData* sfd = fd_map + fd;
sock_packet_s* packet;
ssize_t sent;
re_flush:
sent = 0;
packet = sfd->packet;
if (!packet) {
if (sfd->close)
return -1;
return 0;
}
packet->metadata.can_interrupt = 0;
if (USE_SENDFILE && packet->metadata.is_fd && sfd->rw_hooks == NULL) {
#if defined(__linux__) /* linux sendfile API */
sent = sendfile64(fd, (int)((ssize_t)packet->buffer), NULL,
packet->length - sfd->sent);
if (sent == 0) {
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sock_free_packet(packet);
sfd->sent = 0;
goto re_flush;
} else if (sent < 0) {
if (errno & (EAGAIN | EWOULDBLOCK))
return 0;
else {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
perror("linux sendfile error");
#endif
return -1;
}
}
sfd->sent += sent;
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Linux sendfile %ld / %u bytes from %lu total.\n", fd,
sent, sfd->sent, packet->length);
#endif
// fprintf(stderr, "Linux sendfile sent %ld/%u from %lu\n", sent, sfd->sent,
// packet->length);
if (sfd->sent >= packet->length) {
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sock_free_packet(packet);
sfd->sent = 0;
goto re_flush;
}
return 0;
#elif defined(__unix__) /* BSD API */
int sendfile(int fd, int s, off_t offset, size_t nbytes,
struct sf_hdtr* hdtr, off_t* sbytes, int flags);
off_t act_sent = packet->length - sfd->sent;
off_t pos_start = lseek((int)((ssize_t)packet->buffer), 0, SEEK_CUR);
if (sendfile((int)((ssize_t)packet->buffer), fd, pos_start,
(size_t)act_sent, NULL, &act_sent, 0)) {
if ((errno & (EAGAIN | EWOULDBLOCK)) && act_sent == 0)
return 0;
else if ((errno & (EAGAIN | EWOULDBLOCK)) == 0)
return -1;
}
if (act_sent == 0) {
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sock_free_packet(packet);
sfd->sent = 0;
goto re_flush;
}
pos_start += act_sent;
sfd->sent += act_sent;
if (sfd->sent >= packet->length) {
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sfd->sent = 0;
sock_free_packet(packet);
goto re_flush;
}
lseek((int)((ssize_t)packet->buffer), pos_start, SEEK_SET);
goto re_flush;
#elif defined(__APPLE__) /* Apple API */
off_t act_sent = packet->length - sfd->sent;
off_t pos_start = lseek((int)((ssize_t)packet->buffer), 0, SEEK_CUR);
if (sendfile((int)((ssize_t)packet->buffer), fd, pos_start, &act_sent, NULL,
0)) {
if ((errno & (EAGAIN | EWOULDBLOCK)) && act_sent == 0)
return 0;
else if ((errno & (EAGAIN | EWOULDBLOCK)) == 0)
return -1;
}
if (act_sent == 0) {
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sock_free_packet(packet);
sfd->sent = 0;
goto re_flush;
}
pos_start += act_sent;
sfd->sent += act_sent;
if (sfd->sent >= packet->length) {
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sock_free_packet(packet);
sfd->sent = 0;
goto re_flush;
}
lseek((int)((ssize_t)packet->buffer), pos_start, SEEK_SET);
goto re_flush;
#else /* sendfile might not be available - always set to 0 */
};
if (0) {
#endif
} else if (packet->metadata.is_fd || packet->metadata.is_file) {
// how much data are we expecting to send...?
ssize_t i_exp = (BUFFER_FILE_READ_SIZE > packet->length)
? packet->length
: BUFFER_FILE_READ_SIZE;
// flag telling us if the file was read into the internal buffer
if (packet->metadata.internal_flag == 0) {
ssize_t i_read;
if (packet->metadata.is_fd)
i_read = read((int)((ssize_t)packet->buffer), packet + 1, i_exp);
else
i_read = fread(packet + 1, 1, i_exp, packet->buffer);
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Read %ld from a file packet, preparing to send.\n",
fd, i_read);
#endif
if (i_read <= 0) {
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sock_free_packet(packet);
sfd->sent = 0;
goto re_flush;
} else {
packet->metadata.internal_flag = 1;
}
}
// send the data
if (sfd->rw_hooks && sfd->rw_hooks->write)
sent = sfd->rw_hooks->write(fd, (((void*)(packet + 1)) + sfd->sent),
i_exp - sfd->sent);
else
sent = write(fd, (((void*)(packet + 1)) + sfd->sent), i_exp - sfd->sent);
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Sent file %ld / %ld bytes from %lu total.\n", fd,
sent, i_exp, packet->length);
#endif
// review result and update packet data
if (sent == 0) {
return 0; // nothing to do.
} else if (sent > 0) {
// we finished sending the amount of data we wanted to send.
if (sfd->sent + sent >= i_exp) {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) rotating file buffer (marking for read).\n", fd);
#endif
packet->metadata.internal_flag = 0;
sfd->sent = 0;
packet->length -= i_exp;
if (packet->length == 0) {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Finished sending file.\n", fd);
#endif
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sock_free_packet(packet);
}
}
// return 0;
goto re_flush;
} else if (errno & (EAGAIN | EWOULDBLOCK)) {
return 0;
} else
return -1;
}
// send data packets
if (sfd->rw_hooks && sfd->rw_hooks->write)
sent = sfd->rw_hooks->write(fd, packet->buffer + sfd->sent,
packet->length - sfd->sent);
else
sent = write(fd, packet->buffer + sfd->sent, packet->length - sfd->sent);
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Sent data %ld / %ld bytes from %lu total.\n", fd, sent,
packet->length - sfd->sent, packet->length);
#endif
// handle results
if (sent > 0) {
sfd->sent += sent;
if (sfd->sent >= packet->length) {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) rotating data buffer packet.\n", fd);
#endif
sfd->packet = packet->metadata.next;
packet->metadata.next = NULL;
sock_free_packet(packet);
sfd->sent = 0;
if (sfd->packet == NULL && sfd->close)
return -1;
}
// return 0;
goto re_flush;
} else if (sent == 0) {
return 0; // nothing to do.
} else if (errno & (EAGAIN | EWOULDBLOCK)) {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Sending data error -1 is okay (avoid blocking).\n",
fd);
#endif
return 0;
} else
return -1;
return 0;
}
/**
Attches a packet to a socket's output buffer and calls `sock_flush` for the
socket.
The ownership of the packet's memory and it's resources is transferred to the
`sock_API` for automatic management. i.e., if an error occurs, the packet's
memory will be automatically released (or returned to the pool).
Returns -1 on error. Returns 0 on success. **Always** retains ownership of the
packet's memory and it's resources.
*/
ssize_t sock_send_packet(int fd, sock_packet_s* packet) {
if (fd >= fdmax || !fd_map[(fd)].open || packet->length == 0) {
sock_free_packet(packet);
return -1;
}
struct FDData* sfd = fd_map + fd;
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
if (packet->metadata.is_fd || packet->metadata.is_file)
fprintf(stderr, "(%d) sending a file packet with length %lu\n", fd,
packet->length);
else
fprintf(stderr,
"(%d) sending packet with length %lu\n"
"(first char == %c(%d), last char == %c(%d))\n"
"%.*s\n============\n",
fd, packet->length, ((char*)packet->buffer)[0],
((char*)packet->buffer)[0],
((char*)packet->buffer)[packet->length - 1],
((char*)packet->buffer)[packet->length - 1], (int)packet->length,
(char*)packet->buffer);
#endif
// applys the move logic for either urgent or non urgent packets
pthread_mutex_lock(&fd_map[(fd)].lock);
sock_packet_s *tail, **pos = &(fd_map[fd].packet);
if (packet->metadata.urgent) {
while (*pos && (!(*pos)->metadata.next ||
!(*pos)->metadata.next->metadata.can_interrupt)) {
pos = &((*pos)->metadata.next);
}
} else {
while (*pos) {
pos = &((*pos)->metadata.next);
}
}
tail = (*pos);
*pos = packet;
if (tail) {
pos = &(packet->metadata.next);
while (*pos)
pos = &((*pos)->metadata.next);
*pos = tail;
}
ssize_t sent = sock_flush_in_lock(fd);
pthread_mutex_unlock(&sfd->lock);
if (sent < 0 && sfd->open)
goto close_socket;
return 0;
close_socket:
if (sfd->owner)
reactor_close(sfd->owner, fd);
else
close(fd);
sock_clear(fd);
return 0;
}
/**
`sock_flush` writes the data in the internal buffer to the file descriptor fd
and closes the fd once it's marked for closure (and all the data was sent).
The number of bytes actually written to the fd will be returned. 0 will be
returned if no data was written and -1 will be returned if an error occured or
if the connection was closed.
The connection is closed automatically if an error occured (and if open).
*/
ssize_t sock_flush(int fd) {
if (fd >= fdmax || fd_map[fd].open == 0)
return -1;
struct FDData* sfd = fd_map + fd;
if (sfd->packet == NULL) {
if (sfd->close) {
goto close_socket;
}
return 0;
}
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "\n=========\n(%d) sock_flush called.\n", fd);
#endif
ssize_t sent;
pthread_mutex_lock(&sfd->lock);
sent = sock_flush_in_lock(fd);
pthread_mutex_unlock(&sfd->lock);
if (sent < 0 && sfd->open)
goto close_socket;
return 0;
close_socket:
if (sfd->owner)
reactor_close(sfd->owner, fd);
else
close(fd);
sock_clear(fd);
return -1;
}
/**
`sock_close` marks the connection for disconnection once all the data was
sent.
The actual disconnection will be managed by the `sock_flush` function.
`sock_flash` will automatically be called.
If a reactor pointer is provied, the reactor API will be used and the
`on_close`
callback should be called.
*/
void sock_close(struct Reactor* reactor, int fd) {
require_mem(fd);
struct FDData* sfd = fd_map + fd;
if (!sfd->open)
return;
sfd->owner = reactor;
sfd->close = 1;
sock_flush(fd);
}
/**
`sock_force_close` closes the connection immediately, without adhering to any
protocol restrictions and without sending any remaining data in the connection
buffer.
If a reactor pointer is provied, the reactor API will be used and the
`on_close`
callback should be called.
*/
void sock_force_close(struct Reactor* reactor, int fd) {
require_mem(fd);
struct FDData* sfd = fd_map + fd;
if (!sfd->open)
return;
if (!reactor)
reactor = sfd->owner;
if (reactor)
reactor_close(reactor, fd);
else {
shutdown(fd, SHUT_RDWR);
close(fd);
}
sock_clear(fd);
}
/* *****************************************************************************
RW Hooks implementation
*/
/** Gets a socket RW hooks. */
struct sock_rw_hook_s* sock_rw_hook_get(int fd) {
if (fd >= fdmax || fd_map[fd].open == 0)
return NULL;
return fd_map[fd].rw_hooks;
}
/** Sets a socket RW hook. */
int sock_rw_hook_set(int fd, struct sock_rw_hook_s* rw_hook) {
if (fd >= fdmax || fd_map[fd].open == 0)
return -1;
fd_map[fd].rw_hooks = rw_hook;
return 0;
}