blob: a74cb586ac918e4027f55321b9ad39d1e6aa328d [file] [log] [blame] [raw]
/*
copyright: Boaz segev, 2016
license: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
/** \file
The libreact-socket is a socket wrapper, using a user level buffer, non-blocking
sockets and some helper functions.
*/
#include "libsock.h"
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/resource.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <netdb.h>
#include <stdatomic.h> // http://en.cppreference.com/w/c/atomic
#include <sys/uio.h>
#ifndef DEBUG_SOCKLIB
#define DEBUG_SOCKLIB 0
#endif
#ifndef NOTIFY_ON_CLOSE
#define NOTIFY_ON_CLOSE 0
#endif
#ifndef USE_SENDFILE
#if defined(__linux__) /* linux sendfile works, but isn't implemented */
#include <sys/sendfile.h>
#define USE_SENDFILE 1
#elif defined(__unix__) /* BSD sendfile should work, but isn't implemented */
#define USE_SENDFILE 0
#elif defined(__APPLE__) /* Apple sendfile was broken and probably still is */
#define USE_SENDFILE 0
#else /* sendfile might not be available - always set to 0 */
#define USE_SENDFILE 0
#endif
#endif
#ifndef BUFFER_ALLOW_MALLOC
/**
Setting the `BUFFER_ALLOW_MALLOC` to 1 will allod dynamic buffer packet
allocation.
In some cases (not all), this might improve performance or prevent buffer packet
starvation (depends how you use the library).
*/
#define BUFFER_ALLOW_MALLOC 0
#endif
#ifndef FD_USE_SPIN_LOCK
/**
Socket `write` / `flush` / `read` contension uses a spinlock instead of a mutex
*/
#define FD_USE_SPIN_LOCK 1
#endif
/* *****************************************************************************
Support `libreact` on_close callback, if exist.
*/
#pragma weak reactor_on_close
void reactor_on_close(intptr_t uuid) {}
#pragma weak reactor_remove
int reactor_remove(intptr_t uuid) {
return -1;
}
/* *****************************************************************************
Support timeout setting.
*/
#pragma weak sock_touch
void sock_touch(intptr_t uuid) {}
/* *****************************************************************************
Library Core Data
*/
typedef struct {
#if !defined(FD_USE_SPIN_LOCK) || FD_USE_SPIN_LOCK != 1
pthread_mutex_t lock;
#endif
sock_packet_s* packet;
sock_rw_hook_s* rw_hooks;
fduuid_u fduuid;
uint32_t sent;
#if defined(FD_USE_SPIN_LOCK) && FD_USE_SPIN_LOCK == 1
atomic_flag lock;
#endif
unsigned open : 1;
unsigned close : 1;
unsigned rsv : 6;
} fd_info_s;
static fd_info_s* fd_info = NULL;
static size_t fd_capacity = 0;
static struct {
#if !defined(FD_USE_SPIN_LOCK) || FD_USE_SPIN_LOCK != 1
pthread_mutex_t lock;
#endif
sock_packet_s* pool;
sock_packet_s* allocated;
#if defined(FD_USE_SPIN_LOCK) && FD_USE_SPIN_LOCK == 1
atomic_flag lock;
#endif
} buffer_pool;
#define validate_mem() (fd_info == NULL)
#define validate_connection(fd_uuid) \
(validate_mem() || sock_uuid2fd(fd_uuid) < 0 || \
fd_info[sock_uuid2fd(fd_uuid)].fduuid.uuid != (fd_uuid) || \
fd_info[sock_uuid2fd(fd_uuid)].open == 0)
#if defined(FD_USE_SPIN_LOCK) && FD_USE_SPIN_LOCK == 1
#define lock_fd(ffd) \
while (atomic_flag_test_and_set(&((ffd)->lock))) { \
sched_yield(); \
}
#define unlock_fd(ffd) atomic_flag_clear(&(ffd)->lock)
#define lock_pool() \
while (atomic_flag_test_and_set(&buffer_pool.lock)) { \
sched_yield(); \
}
#define unlock_pool() atomic_flag_clear(&buffer_pool.lock)
#else
#define lock_fd(ffd) pthread_mutex_lock(&(ffd)->lock)
#define unlock_fd(ffd) pthread_mutex_unlock(&(ffd)->lock)
#define lock_pool() pthread_mutex_lock(&buffer_pool.lock)
#define unlock_pool() pthread_mutex_unlock(&buffer_pool.lock)
#endif
#define LIB_SOCK_STATE_OPEN 1
#define LIB_SOCK_STATE_CLOSED 0
/**
Sets / Updates the socket connection data.
*/
static inline void set_fd(int fd, _Bool state) {
//
fd_info_s old_data = fd_info[fd];
// lock and update
lock_fd(fd_info + fd);
fd_info[fd] = (fd_info_s){
.fduuid.data.counter = fd_info[fd].fduuid.data.counter + state,
.fduuid.data.fd = fd,
.lock = fd_info[fd].lock,
.open = state,
};
// should be called within the lock.
if (old_data.rw_hooks && old_data.rw_hooks->on_clear)
old_data.rw_hooks->on_clear(old_data.fduuid.uuid, old_data.rw_hooks);
// unlock
unlock_fd(fd_info + fd);
// clear old data
if (old_data.packet)
sock_free_packet(old_data.packet);
// call callback if exists
if (old_data.open) {
// if (state == LIB_SOCK_STATE_OPEN)
// printf(
// "STRONG FD COLLISION PROTECTION: A new connection was accepted "
// "while the old one was marked as open.\n");
reactor_remove(old_data.fduuid.uuid);
reactor_on_close(old_data.fduuid.uuid);
}
}
/* *****************************************************************************
Machine wide and Helper API
*/
/**
Gets the maximum number of file descriptors this process can be allowed to
access.
Returns -1 on error.
*/
ssize_t sock_max_capacity(void) {
// get current limits
static ssize_t flim = 0;
if (flim)
return flim;
#ifdef _SC_OPEN_MAX
flim = sysconf(_SC_OPEN_MAX);
#elif defined(OPEN_MAX)
flim = OPEN_MAX;
#endif
// try to maximize limits - collect max and set to max
struct rlimit rlim;
getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
// rlim.rlim_max);
rlim.rlim_cur = rlim.rlim_max;
setrlimit(RLIMIT_NOFILE, &rlim);
getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
// rlim.rlim_max);
// if the current limit is higher than it was, update
if (flim < rlim.rlim_cur)
flim = rlim.rlim_cur;
// return what we have
return flim;
}
/**
Sets a socket to non blocking state.
*/
inline int sock_set_non_block(int fd) // Thanks to Bjorn Reese
{
/* If they have O_NONBLOCK, use the Posix way to do it */
#if defined(O_NONBLOCK)
/* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
int flags;
if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
flags = 0;
// printf("flags initial value was %d\n", flags);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#else
/* Otherwise, use the old way of doing it */
static int flags = 1;
return ioctl(fd, FIOBIO, &flags);
#endif
}
intptr_t sock_listen(const char* address, const char* port) {
int srvfd;
// setup the address
struct addrinfo hints;
struct addrinfo* servinfo; // will point to the results
memset(&hints, 0, sizeof hints); // make sure the struct is empty
hints.ai_family = AF_UNSPEC; // don't care IPv4 or IPv6
hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
hints.ai_flags = AI_PASSIVE; // fill in my IP for me
if (getaddrinfo(address, port, &hints, &servinfo)) {
// perror("addr err");
return -1;
}
// get the file descriptor
srvfd =
socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
if (srvfd <= 0) {
// perror("socket err");
freeaddrinfo(servinfo);
return -1;
}
// make sure the socket is non-blocking
if (sock_set_non_block(srvfd) < 0) {
// perror("couldn't set socket as non blocking! ");
freeaddrinfo(servinfo);
close(srvfd);
return -1;
}
// avoid the "address taken"
{
int optval = 1;
setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
// bind the address to the socket
{
int bound = 0;
for (struct addrinfo* p = servinfo; p != NULL; p = p->ai_next) {
if (!bind(srvfd, p->ai_addr, p->ai_addrlen))
bound = 1;
}
if (!bound) {
// perror("bind err");
freeaddrinfo(servinfo);
close(srvfd);
return -1;
}
}
freeaddrinfo(servinfo);
// listen in
if (listen(srvfd, SOMAXCONN) < 0) {
// perror("couldn't start listening");
close(srvfd);
return -1;
}
set_fd(srvfd, LIB_SOCK_STATE_OPEN);
return fd_info[srvfd].fduuid.uuid;
}
/* *****************************************************************************
User land Buffer
*/
// packet sizes
#ifndef BUFFER_PACKET_REAL_SIZE
#define BUFFER_PACKET_REAL_SIZE \
((sizeof(sock_packet_s) * 1) + BUFFER_PACKET_SIZE)
#endif
/**
Checks out a `sock_packet_s` from the packet pool, transfering the
ownership
of the memory to the calling function. returns NULL if the pool was empty and
memory allocation had failed.
Every checked out buffer packet comes with an attached buffer of
BUFFER_PACKET_SIZE bytes. This buffer is accessible using the `packet->buffer`
pointer (which can be safely overwritten to point to an external buffer).
This attached buffer is safely and automatically freed or returned to the memory
pool once `sock_send_packet` or `sock_free_packet` are called.
*/
sock_packet_s* sock_checkout_packet(void) {
sock_packet_s* packet;
lock_pool();
packet = buffer_pool.pool;
if (packet)
buffer_pool.pool = packet->metadata.next;
unlock_pool();
#if defined(BUFFER_ALLOW_MALLOC) && BUFFER_ALLOW_MALLOC == 1
if (packet == NULL)
packet = malloc(BUFFER_PACKET_REAL_SIZE);
#endif
// zero out memort and set buffer location.
if (packet) {
*packet = (sock_packet_s){
.buffer = packet + 1,
};
}
return packet;
}
/**
Use `sock_free_packet` to free unused packets that were checked-out using
`sock_checkout_packet`.
NEVER use `free`, for any packet checked out using the pool management function
`sock_checkout_packet`.
*/
void sock_free_packet(sock_packet_s* packet) {
sock_packet_s* next = packet;
if (packet == NULL)
return;
for (;;) {
if (next->metadata.is_fd) {
if (next->metadata.keep_open == 0)
close((int)((ssize_t)next->buffer));
} else if (next->metadata.external)
free(next->buffer);
if (next->metadata.next == NULL)
break; /* next now holds the last packet in the chain. */
next = next->metadata.next;
}
#if defined(BUFFER_ALLOW_MALLOC) && BUFFER_ALLOW_MALLOC == 1
if (packet >= buffer_pool.allocated &&
packet < (buffer_pool.allocated +
(BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL))) {
lock_pool();
next->metadata.next = buffer_pool.pool;
buffer_pool.pool = packet;
unlock_pool();
// #if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
// fprintf(stderr, "Packet checked in (pool = %p).\n", packet);
// #endif
} else {
free(packet);
}
#else
lock_pool();
next->metadata.next = buffer_pool.pool;
buffer_pool.pool = packet;
unlock_pool();
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "Packet checked in (pool = %p).\n", packet);
#endif
#endif
}
/* *****************************************************************************
Library Initialization
*/
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
/** For development use, no deep testing */
void libsock_test(void);
#endif
static void destroy_lib_data(void) {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "\nDestroying all libsock data\n");
#endif
buffer_pool.pool = NULL;
buffer_pool.allocated = NULL;
#if defined(FD_USE_SPIN_LOCK) && FD_USE_SPIN_LOCK == 1
// no destruction required
#else
pthread_mutex_destroy(&buffer_pool.lock);
#endif
if (fd_info) {
while (fd_capacity--) { // include 0 in countdown
set_fd(fd_capacity, LIB_SOCK_STATE_CLOSED);
#if defined(FD_USE_SPIN_LOCK) && FD_USE_SPIN_LOCK == 1
// no destruction required
#else
pthread_mutex_destroy(&fd_info[fd_capacity].lock);
#endif
}
munmap(fd_info, (BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL) +
(sizeof(fd_info_s) * fd_capacity));
fd_info = NULL;
}
}
int8_t sock_lib_init(void) {
if (fd_capacity)
return (fd_info == NULL) ? -1 : 0;
fd_capacity = sock_max_capacity();
size_t fd_map_mem_size = sizeof(fd_info_s) * fd_capacity;
size_t buffer_mem_size = BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL;
void* buff_mem;
buff_mem = mmap(NULL, fd_map_mem_size + buffer_mem_size,
PROT_READ | PROT_WRITE | PROT_EXEC,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
// MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (buff_mem == MAP_FAILED || buff_mem == NULL) {
return -1;
}
// pthread_mutexattr_init(&mtx_attr);
// pthread_mutexattr_setpshared(&mtx_attr, PTHREAD_PROCESS_SHARED);
// assign memory addresses.
fd_info = buff_mem;
buff_mem += fd_map_mem_size;
buffer_pool.allocated = buff_mem;
// initialize packet buffer
for (size_t i = 0; i < BUFFER_PACKET_POOL - 1; i++) {
*((sock_packet_s*)buff_mem) =
(sock_packet_s){.metadata.next = buff_mem + BUFFER_PACKET_REAL_SIZE};
buff_mem += BUFFER_PACKET_REAL_SIZE;
}
buffer_pool.pool = buffer_pool.allocated;
for (size_t i = 0; i < fd_capacity; i++) {
fd_info[i] = (fd_info_s){0};
#if defined(FD_USE_SPIN_LOCK) && FD_USE_SPIN_LOCK == 1
atomic_flag_clear(&fd_info[i].lock);
#else
pthread_mutex_init(&fd_info[i].lock, NULL);
#endif
// pthread_mutex_init(&fd_info[i].lock, &mtx_attr);
}
#if defined(FD_USE_SPIN_LOCK) && FD_USE_SPIN_LOCK == 1
// no destruction required
#else
pthread_mutex_init(&buffer_pool.lock);
#endif
// pthread_mutexattr_destroy(&mtx_attr);
atexit(destroy_lib_data);
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
libsock_test();
fprintf(stderr,
"\nInitialized fd_info for %lu elements, each one %lu bytes.\n"
"overall: %lu bytes.\n"
"Initialized packet pool for %d elements, each one %lu bytes.\n"
"overall: %lu bytes.\n"
"=== Total: %lu bytes\n==========\n\n",
fd_capacity, sizeof(*fd_info), sizeof(*fd_info) * fd_capacity,
BUFFER_PACKET_POOL, BUFFER_PACKET_REAL_SIZE,
BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL,
(BUFFER_PACKET_REAL_SIZE * BUFFER_PACKET_POOL) +
(sizeof(*fd_info) * fd_capacity));
#endif
return 0;
}
/* *****************************************************************************
Core socket operation API
*/
/**
`sock_accept` accepts a new socket connection from the listening socket
`server_fd`, allowing the use of `sock_` functions with this new file
descriptor.
The reactor is registered as the owner of the client socket and the client
socket is added to the reactor, when possible.
*/
intptr_t sock_accept(intptr_t server_fd) {
if (validate_mem())
return -1;
static socklen_t cl_addrlen = 0;
int client;
#ifdef SOCK_NONBLOCK
client = accept4(sock_uuid2fd(server_fd), NULL, &cl_addrlen, SOCK_NONBLOCK);
if (client <= 0)
return -1;
#else
client = accept(sock_uuid2fd(server_fd), NULL, &cl_addrlen);
if (client <= 0)
return -1;
sock_set_non_block(client);
#endif
set_fd(client, LIB_SOCK_STATE_OPEN);
return fd_info[client].fduuid.uuid;
}
/**
`sock_connect` is similar to `sock_accept` but should be used to initiate a
client connection to the address requested.
Returns the new file descriptor fd. Retruns -1 on error.
*/
intptr_t sock_connect(char* address, char* port) {
if (validate_mem()) {
return -1;
}
int fd;
// setup the address
struct addrinfo hints;
struct addrinfo* addrinfo; // will point to the results
memset(&hints, 0, sizeof hints); // make sure the struct is empty
hints.ai_family = AF_UNSPEC; // don't care IPv4 or IPv6
hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
hints.ai_flags = AI_PASSIVE; // fill in my IP for me
if (getaddrinfo(address, port, &hints, &addrinfo)) {
return -1;
}
// get the file descriptor
fd =
socket(addrinfo->ai_family, addrinfo->ai_socktype, addrinfo->ai_protocol);
if (fd <= 0) {
freeaddrinfo(addrinfo);
return -1;
}
// make sure the socket is non-blocking
if (sock_set_non_block(fd) < 0) {
freeaddrinfo(addrinfo);
close(fd);
return -1;
}
if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0 &&
errno != EINPROGRESS) {
close(fd);
freeaddrinfo(addrinfo);
return -1;
}
freeaddrinfo(addrinfo);
set_fd(fd, LIB_SOCK_STATE_OPEN);
return fd_info[fd].fduuid.uuid;
}
/**
`sock_open` takes an existing file decriptor `fd` and initializes it's status as
open and available for `sock_API` calls.
If a reactor was initialized, the `fd` will be attached to the reactor.
Retruns -1 on error and 0 on success.
*/
intptr_t sock_open(int fd) {
if (validate_mem())
return -1;
// update local data
set_fd(fd, LIB_SOCK_STATE_OPEN);
return fd_info[fd].fduuid.uuid;
}
/**
`sock_fd2uuid` takes an existing file decriptor `fd` and returns it's active
`uuid`.
If the file descriptor is marked as closed (wasn't opened / registered with
`libsock`) the function returns -1;
If the file descriptor was closed remotely (or not using `libsock`), a false
positive would be returned. However, use of this uuid will result in the
fd being closed.
Returns -1 on error. Returns a valid socket (non-random) UUID.
*/
intptr_t sock_fd2uuid(int fd) {
if (fd_info[fd].open) {
return fd_info[fd].fduuid.uuid;
}
return -1;
}
/**
Returns 1 if the fduuid_u refers to a valid and open, socket.
Returns 0 if not.
*/
int sock_isvalid(intptr_t uuid) {
if (validate_connection(uuid))
return 0;
return 1;
}
/* *****************************************************************************
Socket Buffer Management - not MT safe
*/
#define ERR_OK (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN)
#define ERR_TRY_AGAIN (errno == EINTR)
#define ERR_ON_FAILED(sfd) \
{ \
(sfd)->close = 1; \
sock_free_packet((sfd)->packet); \
(sfd)->packet = NULL; \
}
#define FORCE_ROTATE_PACKETS(sfd, pckt) \
{ \
(sfd)->packet = (pckt)->metadata.next; \
(pckt)->metadata.next = NULL; \
sock_free_packet(pckt); \
sfd->sent = 0; \
}
#define ROTATE_PACKETS(sfd, pckt, _sent, ret_val) \
{ \
(sfd)->sent += (_sent); \
if ((sfd)->sent >= (pckt)->length) { \
FORCE_ROTATE_PACKETS((sfd), (pckt)) \
return ret_val; \
} \
}
#if defined(__linux__) /* linux sendfile API */
inline static int send_file_os(int fd, fd_info_s* sfd, sock_packet_s* packet) {
size_t sent;
restart:
sent = sendfile64(fd, (int)((ssize_t)packet->buffer),
&packet->metadata.offset, packet->length - sfd->sent);
if (sent == 0) {
FORCE_ROTATE_PACKETS(sfd, packet);
return 0;
} else if (sent < 0) {
if (ERR_OK) {
return 1;
} else if (ERR_TRY_AGAIN) {
goto restart;
} else {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
perror("Linux sendfile failed");
#endif
ERR_ON_FAILED(sfd);
return -1;
}
}
ROTATE_PACKETS(sfd, packet, sent, 0);
return 0;
}
#elif defined(__APPLE__) || defined(__unix__) /* BSD / Apple API */
inline static int send_file_os(int fd, fd_info_s* sfd, sock_packet_s* packet) {
off_t act_sent;
restart:
act_sent = packet->length - sfd->sent;
#if defined(__APPLE__)
if (sendfile((int)((ssize_t)packet->buffer), fd, packet->metadata.offset,
&act_sent, NULL, 0) < 0 &&
act_sent == 0)
#else
if (sendfile((int)((ssize_t)packet->buffer), fd, packet->metadata.offset,
(size_t)act_sent, NULL, &act_sent, 0) < 0 &&
act_sent == 0)
#endif
{
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
perror("Apple/BSD sendfile errno");
#endif
if (ERR_OK) {
packet->metadata.offset += act_sent;
return 1;
} else if (ERR_TRY_AGAIN) {
goto restart;
} else {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
perror("Apple/BSD sendfile failed");
#endif
ERR_ON_FAILED(sfd);
return -1;
}
}
if (act_sent == 0) {
FORCE_ROTATE_PACKETS(sfd, packet);
return 0;
}
packet->metadata.offset += act_sent;
ROTATE_PACKETS(sfd, packet, act_sent, 0);
return 0;
}
#endif
/* returns 0 if the underlying buffer isn't full, and returns a value if the
* underlying buffer is full or unavailable. */
inline static int send_file(int fd, fd_info_s* sfd, sock_packet_s* packet) {
ssize_t sent;
restart:
/* File (fd) Packets */
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr,
"(%d) flushing a file packet with %u/%lu (%lu). start at: %lld\n", fd,
sfd->sent, packet->length, packet->length - sfd->sent,
packet->metadata.offset);
#endif
/* USE_SENDFILE ? */
if (USE_SENDFILE && sfd->rw_hooks == NULL) {
return send_file_os(fd, sfd, packet);
} else {
// how much data are we expecting to send...?
ssize_t i_exp = (BUFFER_FILE_READ_SIZE > packet->length)
? packet->length
: BUFFER_FILE_READ_SIZE;
// flag telling us if the file was read into the internal buffer
if (packet->metadata.internal_flag == 0) {
ssize_t i_read;
i_read = pread((int)((ssize_t)packet->buffer), packet + 1, i_exp,
packet->metadata.offset);
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Read %ld from a file packet, preparing to send.\n",
fd, i_read);
#endif
if (i_read <= 0) {
FORCE_ROTATE_PACKETS(sfd, packet);
return 0;
} else {
packet->metadata.offset += i_read;
packet->metadata.internal_flag = 1;
}
}
// send the data
if (sfd->rw_hooks && sfd->rw_hooks->write)
sent = sfd->rw_hooks->write(sfd->fduuid.uuid,
(((void*)(packet + 1)) + sfd->sent),
i_exp - sfd->sent);
else
sent = write(fd, (((void*)(packet + 1)) + sfd->sent), i_exp - sfd->sent);
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Sent file %ld / %ld bytes from %lu total.\n", fd,
sent, i_exp, packet->length);
#endif
// review result and update packet data
if (sent == 0) {
return 1; // nothing to do?
} else if (sent > 0) {
// did we finished sending the amount of data we wanted to send?
sfd->sent += sent;
if (sfd->sent >= i_exp) {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) rotating file buffer (marking for read).\n", fd);
#endif
packet->metadata.internal_flag = 0;
sfd->sent = 0;
packet->length -= i_exp;
if (packet->length == 0) {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Finished sending file.\n", fd);
#endif
FORCE_ROTATE_PACKETS(sfd, packet);
return 0;
}
}
// return 0;
} else if (ERR_OK) {
return 1;
} else if (ERR_TRY_AGAIN) {
goto restart;
} else {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
perror("send failed");
#endif
ERR_ON_FAILED(sfd);
return -1;
}
}
return 1;
}
/* flushes but doesn't close the socket. flags fd for close on error. */
inline static void sock_flush_unsafe(int fd) {
fd_info_s* sfd = fd_info + fd;
sock_packet_s* packet;
ssize_t sent;
if ((packet = sfd->packet) != NULL)
sock_touch(sfd->fduuid.uuid);
for (;;) {
if ((packet = sfd->packet) == NULL)
return;
packet->metadata.can_interrupt = 0;
if (packet->metadata.is_fd == 0) {
/* Data Packets are more likely */
/* send data */
if (sfd->rw_hooks && sfd->rw_hooks->write)
sent =
sfd->rw_hooks->write(sfd->fduuid.uuid, packet->buffer + sfd->sent,
packet->length - sfd->sent);
else
sent =
write(fd, packet->buffer + sfd->sent, packet->length - sfd->sent);
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "(%d) Sent data %ld / %ld bytes from %lu total.\n", fd,
sent, packet->length - sfd->sent, packet->length);
#endif
/* review and update sent state */
if (sent > 0) {
ROTATE_PACKETS(sfd, packet, sent, ;);
continue;
} else if (sent == 0) {
return; // nothing to do?
} else if (ERR_OK) {
return;
} else if (ERR_TRY_AGAIN) {
continue;
} else {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
perror("send failed");
#endif
ERR_ON_FAILED(sfd);
return;
}
} else {
if (send_file(fd, sfd, packet))
return;
}
}
}
#undef ERR_ON_FAILED
#undef FORCE_ROTATE_PACKETS
#undef ROTATE_PACKETS
/* places a packet in the socket's buffer */
inline static ssize_t sock_send_packet_unsafe(int fd, sock_packet_s* packet) {
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
if (packet->metadata.is_fd)
fprintf(stderr, "(%d) sending a file packet with length %u/%lu\n", fd,
fd_info[fd].sent, packet->length);
else
fprintf(stderr,
"(%d) sending packet with length %lu\n"
"(first char == %c(%d), last char == %c(%d))\n"
"%.*s\n============\n",
fd, packet->length, ((char*)packet->buffer)[0],
((char*)packet->buffer)[0],
((char*)packet->buffer)[packet->length - 1],
((char*)packet->buffer)[packet->length - 1], (int)packet->length,
(char*)packet->buffer);
#endif
fd_info_s* sfd = fd_info + fd;
if (sfd->packet == NULL) {
sfd->packet = packet;
sock_flush_unsafe(fd);
return 0;
} else if (packet->metadata.urgent == 0) {
sock_packet_s* pos = sfd->packet;
while (pos->metadata.next)
pos = pos->metadata.next;
pos->metadata.next = packet;
sock_flush_unsafe(fd);
return 0;
} else {
sock_packet_s* pos = sfd->packet;
if (pos->metadata.can_interrupt) {
sfd->packet = packet;
while (packet->metadata.next)
packet = packet->metadata.next;
packet->metadata.next = pos;
} else {
while (pos->metadata.next &&
pos->metadata.next->metadata.can_interrupt == 0)
pos = pos->metadata.next;
sock_packet_s* tail = pos->metadata.next;
pos->metadata.next = packet;
if (tail) {
while (packet->metadata.next)
packet = packet->metadata.next;
packet->metadata.next = tail;
}
}
sock_flush_unsafe(fd);
return 0;
}
return -1;
}
/* *****************************************************************************
Read/Write socket operation API
*/
/**
`sock_read` attempts to read up to count bytes from file descriptor fd into
the
buffer starting at buf.
It's behavior should conform to the native `read` implementations, except some
data might be available in the `fd`'s buffer while it is not available to be
read using sock_read (i.e., when using a transport layer, such as TLS).
Also, some internal buffering will be used in cases where the transport layer
data available is larger then the data requested.
*/
ssize_t sock_read(intptr_t uuid, void* buf, size_t count) {
if (validate_connection(uuid)) {
errno = ENODEV;
return -1;
}
ssize_t i_read;
fd_info_s* sfd = fd_info + sock_uuid2fd(uuid);
if (sfd->rw_hooks && sfd->rw_hooks->read)
i_read = sfd->rw_hooks->read(uuid, buf, count);
else
i_read = read(sock_uuid2fd(uuid), buf, count);
if (i_read > 0) {
sock_touch(uuid);
return i_read;
}
if (i_read == -1 && (ERR_OK || ERR_TRY_AGAIN))
return 0;
// return value of 0 or -1 with other errors will prompt a closure.
// sfd->close = 1;
#if defined(NOTIFY_ON_CLOSE) && NOTIFY_ON_CLOSE == 1
fprintf(stderr, "Read Error for %lu bytes from fd %d (closing))\n", count,
sock_uuid2fd(uuid));
#endif
sock_close(uuid);
return -1;
}
/**
`sock_write2_fn` is the actual function behind the macro `sock_write2`.
*/
ssize_t sock_write2_fn(sock_write_info_s options) {
if (validate_connection(options.fduuid) || !options.buffer)
return -1;
if (!options.length && !options.is_fd)
options.length = strlen(options.buffer);
if (options.length == 0)
return -1;
sock_packet_s* packet = sock_checkout_packet();
while (packet == NULL) {
sock_flush_all();
packet = sock_checkout_packet();
}
packet->metadata.can_interrupt = 1;
packet->metadata.urgent = options.urgent;
if (options.is_fd) {
packet->buffer = (void*)options.buffer;
packet->length = options.length;
packet->metadata.is_fd = options.is_fd;
packet->metadata.offset = options.offset;
return sock_send_packet(options.fduuid, packet);
} else {
if (options.move) {
packet->buffer = (void*)options.buffer;
packet->length = options.length;
packet->metadata.external = 1;
return sock_send_packet(options.fduuid, packet);
} else {
if (options.length <= BUFFER_PACKET_SIZE) {
memcpy(packet->buffer, options.buffer, options.length);
packet->length = options.length;
return sock_send_packet(options.fduuid, packet);
} else {
lock_fd(fd_info + sock_uuid2fd(options.fduuid));
sock_packet_s* last_packet = packet;
size_t counter = 0;
while (options.length > BUFFER_PACKET_SIZE) {
memcpy(last_packet->buffer,
options.buffer + (counter * BUFFER_PACKET_SIZE),
BUFFER_PACKET_SIZE);
last_packet->length = BUFFER_PACKET_SIZE;
options.length -= BUFFER_PACKET_SIZE;
++counter;
last_packet->metadata.next = sock_checkout_packet();
if (last_packet->metadata.next) {
last_packet = last_packet->metadata.next;
continue;
} else {
packet->metadata.can_interrupt = ~options.urgent;
if (sock_send_packet_unsafe(sock_uuid2fd(options.fduuid), packet))
goto multi_send_error;
while (packet == NULL) {
sock_flush_all();
packet = sock_checkout_packet();
}
packet->metadata.urgent = options.urgent;
last_packet = packet;
}
}
memcpy(last_packet->buffer,
options.buffer + (counter * BUFFER_PACKET_SIZE), options.length);
last_packet->length = options.length;
if (sock_send_packet_unsafe(sock_uuid2fd(options.fduuid), packet))
goto multi_send_error;
unlock_fd(fd_info + sock_uuid2fd(options.fduuid));
return 0;
multi_send_error:
unlock_fd(fd_info + sock_uuid2fd(options.fduuid));
return -1;
}
}
}
// how did we get here?
return -1;
}
/**
Attches a packet to a socket's output buffer and calls `sock_flush` for the
socket.
The ownership of the packet's memory and it's resources is transferred to the
`sock_API` for automatic management. i.e., if an error occurs, the packet's
memory will be automatically released (or returned to the pool).
Returns -1 on error. Returns 0 on success. **Always** retains ownership of the
packet's memory and it's resources.
*/
ssize_t sock_send_packet(intptr_t uuid, sock_packet_s* packet) {
if (validate_connection(uuid) || packet->length == 0) {
sock_free_packet(packet);
return -1;
}
fd_info_s* sfd = fd_info + sock_uuid2fd(uuid);
lock_fd(sfd);
sock_send_packet_unsafe(sock_uuid2fd(uuid), packet);
unlock_fd(sfd);
if (sfd->packet == NULL && sfd->close)
goto close_socket;
return 0;
close_socket:
sock_force_close(uuid);
return 0;
}
/**
`sock_flush` writes the data in the internal buffer to the file descriptor fd
and closes the fd once it's marked for closure (and all the data was sent).
The number of bytes actually written to the fd will be returned. 0 will be
returned if no data was written and -1 will be returned if an error occured or
if the connection was closed.
The connection is closed automatically if an error occured (and if open).
*/
ssize_t sock_flush(intptr_t uuid) {
if (validate_connection(uuid)) {
return -1;
}
fd_info_s* sfd = fd_info + sock_uuid2fd(uuid);
if (sfd->packet == NULL) {
// make sure the rw_hook finished sending all it's data.
if (sfd->rw_hooks && sfd->rw_hooks->flush) {
lock_fd(sfd);
ssize_t val;
if ((val = sfd->rw_hooks->flush(uuid)) > 0) {
unlock_fd(sfd);
return 0;
}
if ((val = sfd->rw_hooks->flush(uuid)) < 0)
sfd->close = 1;
unlock_fd(sfd);
}
if (sfd->close) {
goto close_socket;
}
return 0;
}
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
fprintf(stderr, "\n=========\n(%d) sock_flush called.\n", sock_uuid2fd(uuid));
#endif
lock_fd(sfd);
sock_flush_unsafe(sock_uuid2fd(uuid));
unlock_fd(sfd);
if (sfd->packet == NULL && sfd->close)
goto close_socket;
return 0;
close_socket:
sock_force_close(uuid);
return -1;
}
/**
`sock_flush_strong` performs the same action as `sock_flush` but returns only
after all the data was sent. This is an "active" wait, polling isn't
performed.
*/
void sock_flush_strong(intptr_t uuid) {
fd_info_s* sfd = fd_info + sock_uuid2fd(uuid);
while (validate_connection(uuid) == 0 && sfd->packet) {
sock_flush(uuid);
sched_yield();
}
return;
}
/**
Calls `sock_flush` for each file descriptor that's buffer isn't empty.
*/
void sock_flush_all(void) {
register fd_info_s* fds = fd_info;
for (size_t i = 0; i < fd_capacity; i++) {
if (fds[i].packet == NULL)
continue;
sock_flush(fd_info[i].fduuid.uuid);
}
}
/**
`sock_close` marks the connection for disconnection once all the data was
sent.
The actual disconnection will be managed by the `sock_flush` function.
`sock_flash` will automatically be called.
If a reactor pointer is provied, the reactor API will be used and the
`on_close`
callback should be called.
*/
void sock_close(intptr_t uuid) {
#if defined(NOTIFY_ON_CLOSE) && NOTIFY_ON_CLOSE == 1
fprintf(stderr, "called sock_close for %lu (%d)\n", uuid, sock_uuid2fd(uuid));
#endif
if (validate_connection(uuid))
return;
fd_info[sock_uuid2fd(uuid)].close = 1;
sock_flush(uuid);
}
/**
`sock_force_close` closes the connection immediately, without adhering to any
protocol restrictions and without sending any remaining data in the connection
buffer.
If a reactor pointer is provied, the reactor API will be used and the
`on_close`
callback should be called.
*/
void sock_force_close(intptr_t uuid) {
#if defined(NOTIFY_ON_CLOSE) && NOTIFY_ON_CLOSE == 1
fprintf(stderr, "called sock_force_close for %lu (%d)\n", uuid,
sock_uuid2fd(uuid));
#endif
if (validate_connection(uuid))
return;
shutdown(sock_uuid2fd(uuid), SHUT_RDWR);
close(sock_uuid2fd(uuid));
set_fd(sock_uuid2fd(uuid), LIB_SOCK_STATE_CLOSED);
}
/* *****************************************************************************
RW Hooks implementation
*/
/** Gets a socket RW hooks. */
struct sock_rw_hook_s* sock_rw_hook_get(intptr_t uuid) {
if (validate_connection(uuid))
return NULL;
return fd_info[sock_uuid2fd(uuid)].rw_hooks;
}
/** Sets a socket RW hook. */
int sock_rw_hook_set(intptr_t uuid, sock_rw_hook_s* rw_hook) {
if (validate_connection(uuid))
return -1;
lock_fd(fd_info + sock_uuid2fd(uuid));
fd_info[sock_uuid2fd(uuid)].rw_hooks = rw_hook;
unlock_fd(fd_info + sock_uuid2fd(uuid));
return 0;
}
/* *****************************************************************************
Minor tests
*/
#if defined(DEBUG_SOCKLIB) && DEBUG_SOCKLIB == 1
#define THREADS_FOR_TEST 128
#define THREADS_EACH_COLLECTS 32
static void* take_in_out(void* _) {
sock_packet_s* p[THREADS_EACH_COLLECTS];
for (size_t i = 0; i < 1024 / THREADS_EACH_COLLECTS; i++) {
for (size_t i = 0; i < THREADS_EACH_COLLECTS; i++) {
p[i] = sock_checkout_packet();
fprintf(stderr, "Thread %ld owns %p\n", (intptr_t)_, p[i]);
}
for (size_t i = 0; i < THREADS_EACH_COLLECTS; i++) {
if (p[i])
sock_free_packet(p[i]);
fprintf(stderr, "Thread %ld frees %p\n", (intptr_t)_, p[i]);
}
}
return _;
}
void libsock_test(void) {
sock_packet_s *p, *pl;
fprintf(stderr, "Testing packet pool ");
for (size_t i = 0; i < BUFFER_PACKET_POOL * 2; i++) {
pl = p = sock_checkout_packet();
while (buffer_pool.pool) {
pl->metadata.next = sock_checkout_packet();
pl = pl->metadata.next;
}
sock_free_packet(p);
}
pthread_t threads[THREADS_FOR_TEST];
for (size_t i = 0; i < THREADS_FOR_TEST; i++) {
pthread_create(threads + i, NULL, take_in_out, (void*)((intptr_t)i));
}
void* res;
for (size_t i = 0; i < THREADS_FOR_TEST; i++) {
pthread_join(threads[i], &res);
}
fprintf(stderr, "***threads finished\n\n\n\n");
size_t count = 1;
pl = p = sock_checkout_packet();
while (buffer_pool.pool) {
++count;
pl->metadata.next = sock_checkout_packet();
pl = pl->metadata.next;
}
sock_free_packet(p);
fprintf(stderr, " - %s (%lu/%d)\n",
(count == BUFFER_PACKET_POOL) ? "Pass" : "Fail", count,
BUFFER_PACKET_POOL);
}
#endif