blob: ada4b7778f07ca365f16e0c76f8ec3d975000cbc [file] [log] [blame] [raw]
/*
Copyright: Boaz Segev, 2016-2017
License: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#include "spnlock.h"
#include "evio.h"
#include "facil.h"
#include "fio_hashmap.h"
#include "fio_llist.h"
#include "fiobj4sock.h"
#include "fio_mem.h"
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/wait.h>
#if !defined(__GNUC__) && !defined(__clang__)
#define __attribute__(...)
#endif
/* *****************************************************************************
Patch for OSX version < 10.12 from https://stackoverflow.com/a/9781275/4025095
***************************************************************************** */
#if defined(__MACH__) && !defined(CLOCK_REALTIME)
#include <sys/time.h>
#define CLOCK_REALTIME 0
#define clock_gettime patch_clock_gettime
// clock_gettime is not implemented on older versions of OS X (< 10.12).
// If implemented, CLOCK_REALTIME will have already been defined.
static inline int patch_clock_gettime(int clk_id, struct timespec *t) {
struct timeval now;
int rv = gettimeofday(&now, NULL);
if (rv)
return rv;
t->tv_sec = now.tv_sec;
t->tv_nsec = now.tv_usec * 1000;
return 0;
(void)clk_id;
}
#endif
/* *****************************************************************************
Data Structures
***************************************************************************** */
typedef struct ProtocolMetadata {
spn_lock_i locks[3];
unsigned rsv : 8;
} protocol_metadata_s;
union protocol_metadata_union_u {
size_t opaque;
protocol_metadata_s meta;
};
#define prt_meta(prt) (((union protocol_metadata_union_u *)(&(prt)->rsv))->meta)
struct connection_data_s {
protocol_s *protocol;
time_t active;
uint8_t timeout;
spn_lock_i scheduled;
spn_lock_i lock;
};
static struct facil_data_s {
spn_lock_i global_lock;
uint8_t need_review;
uint8_t spindown;
uint16_t active;
uint16_t threads;
pid_t parent;
pool_pt thread_pool;
ssize_t capacity;
size_t connection_count;
struct timespec last_cycle;
struct connection_data_s conn[];
} * facil_data;
#define fd_data(fd) (facil_data->conn[(fd)])
#define uuid_data(uuid) fd_data(sock_uuid2fd((uuid)))
// #define uuid_prt_meta(uuid) prt_meta(uuid_data((uuid)).protocol)
/** locks a connection's protocol returns a pointer that need to be unlocked. */
inline static protocol_s *protocol_try_lock(intptr_t fd,
enum facil_protocol_lock_e type) {
errno = 0;
if (spn_trylock(&fd_data(fd).lock))
goto would_block;
protocol_s *pr = fd_data(fd).protocol;
if (!pr) {
spn_unlock(&fd_data(fd).lock);
errno = EBADF;
return NULL;
}
if (spn_trylock(&prt_meta(pr).locks[type])) {
spn_unlock(&fd_data(fd).lock);
goto would_block;
}
spn_unlock(&fd_data(fd).lock);
return pr;
would_block:
errno = EWOULDBLOCK;
return NULL;
}
/** See `facil_protocol_try_lock` for details. */
inline static void protocol_unlock(protocol_s *pr,
enum facil_protocol_lock_e type) {
spn_unlock(&prt_meta(pr).locks[type]);
}
/* *****************************************************************************
Internal Protocol Names
***************************************************************************** */
static const char *LISTENER_PROTOCOL_NAME =
"listening protocol __facil_internal__";
static const char *CONNECTOR_PROTOCOL_NAME = "connect protocol __internal__";
static const char *TIMER_PROTOCOL_NAME = "timer protocol __facil_internal__";
/* *****************************************************************************
Event deferring (declarations)
***************************************************************************** */
static void deferred_on_close(void *uuid_, void *pr_);
static void deferred_on_shutdown(void *arg, void *arg2);
static void deferred_on_ready(void *arg, void *arg2);
static void deferred_on_data(void *uuid, void *arg2);
static void deferred_ping(void *arg, void *arg2);
/* *****************************************************************************
Overriding `defer` to use `evio` when waiting for events
***************************************************************************** */
/* if the FIO_DEDICATED_SYSTEM is defined threads are activated more often. */
#if FIO_DEDICATED_SYSTEM
void defer_thread_wait(pool_pt pool, void *p_thr) {
(void)pool;
(void)p_thr;
evio_wait(EVIO_TICK);
}
#endif
/* *****************************************************************************
Event Handlers (evio)
***************************************************************************** */
void sock_flush_defer(void *arg, void *ignored) {
(void)ignored;
switch (sock_flush((intptr_t)arg)) {
case 1:
evio_add_write(sock_uuid2fd((intptr_t)arg), (void *)arg);
break;
case 0:
defer(deferred_on_ready, arg, NULL);
break;
}
}
void evio_on_ready(void *arg) { defer(sock_flush_defer, arg, NULL); }
void evio_on_close(void *arg) { sock_force_close((intptr_t)arg); }
void evio_on_error(void *arg) { sock_force_close((intptr_t)arg); }
void evio_on_data(void *arg) { defer(deferred_on_data, arg, NULL); }
/* *****************************************************************************
Mock Protocol Callbacks and Service Funcions
***************************************************************************** */
static void mock_on_ev(intptr_t uuid, protocol_s *protocol) {
(void)uuid;
(void)protocol;
}
static void mock_on_data(intptr_t uuid, protocol_s *protocol) {
facil_quite(uuid);
(void)protocol;
}
static void mock_on_close(intptr_t uuid, protocol_s *protocol) {
(void)protocol;
(void)uuid;
}
static uint8_t mock_on_shutdown(intptr_t uuid, protocol_s *protocol) {
return 0;
(void)protocol;
(void)uuid;
}
static uint8_t mock_on_shutdown_internal(intptr_t uuid, protocol_s *protocol) {
return 0;
(void)protocol;
(void)uuid;
}
static void mock_ping(intptr_t uuid, protocol_s *protocol) {
(void)protocol;
sock_force_close(uuid);
}
static void mock_ping2(intptr_t uuid, protocol_s *protocol) {
(void)protocol;
uuid_data(uuid).active = facil_last_tick().tv_sec;
if (uuid_data(uuid).timeout == 255)
return;
protocol->ping = mock_ping;
uuid_data(uuid).timeout = 8;
sock_close(uuid);
}
/* Support for the default pub/sub cluster engine */
#pragma weak pubsub_cluster_init
void pubsub_cluster_init(void) {}
#pragma weak pubsub_cluster_on_fork_start
void pubsub_cluster_on_fork_start(void) {}
#pragma weak pubsub_cluster_on_fork_end
void pubsub_cluster_on_fork_end(void) {}
#pragma weak pubsub_cluster_cleanup
void pubsub_cluster_cleanup(void) {}
/* other cleanup concern */
#pragma weak fio_cli_end
void fio_cli_end(void) {}
/* *****************************************************************************
Core Callbacks for forking / starting up / cleaning up
***************************************************************************** */
typedef struct {
fio_ls_embd_s node;
void (*func)(void *);
void *arg;
} callback_data_s;
typedef struct {
spn_lock_i lock;
fio_ls_embd_s callbacks;
} callback_collection_s;
static callback_collection_s callback_collection[FIO_CALL_AT_EXIT + 1];
static inline void facil_core_callback_ensure(callback_collection_s *c) {
if (c->callbacks.next)
return;
c->callbacks = (fio_ls_embd_s)FIO_LS_INIT(c->callbacks);
}
/** Adds a callback to the list of callbacks to be called for the event. */
void facil_core_callback_add(callback_type_e c_type, void (*func)(void *),
void *arg) {
if (!func || (int)c_type < 0 || c_type > FIO_CALL_AT_EXIT)
return;
spn_lock(&callback_collection[c_type].lock);
facil_core_callback_ensure(&callback_collection[c_type]);
callback_data_s *tmp = malloc(sizeof(*tmp));
*tmp = (callback_data_s){.func = func, .arg = arg};
fio_ls_embd_push(&callback_collection[c_type].callbacks, &tmp->node);
spn_unlock(&callback_collection[c_type].lock);
}
/** Removes a callback from the list of callbacks to be called for the event. */
int facil_core_callback_remove(callback_type_e c_type, void (*func)(void *),
void *arg) {
if ((int)c_type < 0 || c_type > FIO_CALL_AT_EXIT)
return -1;
spn_lock(&callback_collection[c_type].lock);
FIO_LS_EMBD_FOR(&callback_collection[c_type].callbacks, pos) {
callback_data_s *tmp = (FIO_LS_EMBD_OBJ(callback_data_s, node, pos));
if (tmp->func == func && tmp->arg == arg) {
fio_ls_embd_remove(&tmp->node);
free(tmp);
goto success;
}
}
spn_unlock(&callback_collection[c_type].lock);
return -1;
success:
spn_unlock(&callback_collection[c_type].lock);
return -0;
}
/** Forces all the existing callbacks to run, as if the event occured. */
void facil_core_callback_force(callback_type_e c_type) {
if ((int)c_type < 0 || c_type > FIO_CALL_AT_EXIT)
return;
/* copy collection */
fio_ls_embd_s copy = FIO_LS_INIT(copy);
spn_lock(&callback_collection[c_type].lock);
facil_core_callback_ensure(&callback_collection[c_type]);
FIO_LS_EMBD_FOR(&callback_collection[c_type].callbacks, pos) {
callback_data_s *tmp = fio_malloc(sizeof(*tmp));
*tmp = *(FIO_LS_EMBD_OBJ(callback_data_s, node, pos));
fio_ls_embd_push(&copy, &tmp->node);
}
spn_unlock(&callback_collection[c_type].lock);
/* run callbacks + free data */
while (fio_ls_embd_any(&copy)) {
callback_data_s *tmp =
FIO_LS_EMBD_OBJ(callback_data_s, node, fio_ls_embd_shift(&copy));
if (tmp->func) {
tmp->func(tmp->arg);
}
fio_free(tmp);
}
}
/** Clears all the existing callbacks for the event. */
void facil_core_callback_clear(callback_type_e c_type) {
if ((int)c_type < 0 || c_type > FIO_CALL_AT_EXIT)
return;
spn_lock(&callback_collection[c_type].lock);
facil_core_callback_ensure(&callback_collection[c_type]);
while (fio_ls_embd_any(&callback_collection[c_type].callbacks)) {
callback_data_s *tmp = FIO_LS_EMBD_OBJ(
callback_data_s, node,
fio_ls_embd_shift(&callback_collection[c_type].callbacks));
free(tmp);
}
spn_unlock(&callback_collection[c_type].lock);
}
/* *****************************************************************************
External initialization / deconstruction
***************************************************************************** */
/* perform initialization for external services. */
static void facil_external_init(void) {
sock_on_fork();
fio_malloc_after_fork();
defer_on_fork();
pubsub_cluster_on_fork_start();
}
/* perform stage 2 initialization for external services. */
static void facil_external_init2(void) { pubsub_cluster_on_fork_end(); }
/* perform cleanup for external services. */
static void facil_external_cleanup(void) {}
#pragma weak http_lib_init
void http_lib_init(void) {}
#pragma weak http_lib_cleanup
void http_lib_cleanup(void) {}
/* perform initialization for external services. */
static void facil_external_root_init(void) {
http_lib_init();
pubsub_cluster_init();
}
/* perform cleanup for external services. */
static void facil_external_root_cleanup(void) {
http_lib_cleanup();
pubsub_cluster_cleanup();
fio_cli_end();
}
/* *****************************************************************************
Deferred event handlers
***************************************************************************** */
static void deferred_on_close(void *uuid_, void *pr_) {
protocol_s *pr = pr_;
if (pr->rsv)
goto postpone;
pr->on_close((intptr_t)uuid_, pr);
return;
postpone:
defer(deferred_on_close, uuid_, pr_);
}
static void deferred_on_shutdown(void *arg, void *arg2) {
if (!uuid_data(arg).protocol) {
return;
}
protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_TASK);
if (!pr) {
if (errno == EBADF)
return;
goto postpone;
}
uuid_data(arg).active = facil_data->last_cycle.tv_sec;
uint8_t r = pr->on_shutdown((intptr_t)arg, pr);
if (r) {
if (r == 255) {
uuid_data(arg).timeout = 0;
} else {
spn_add(&facil_data->connection_count, 1);
uuid_data(arg).timeout = r;
}
pr->ping = mock_ping2;
protocol_unlock(pr, FIO_PR_LOCK_TASK);
} else {
spn_add(&facil_data->connection_count, 1);
uuid_data(arg).timeout = 8;
pr->ping = mock_ping;
protocol_unlock(pr, FIO_PR_LOCK_TASK);
sock_close((intptr_t)arg);
}
return;
postpone:
defer(deferred_on_shutdown, arg, NULL);
(void)arg2;
}
static void deferred_on_ready(void *arg, void *arg2) {
if (!uuid_data(arg).protocol) {
return;
}
protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_WRITE);
if (!pr) {
if (errno == EBADF)
return;
goto postpone;
}
pr->on_ready((intptr_t)arg, pr);
protocol_unlock(pr, FIO_PR_LOCK_WRITE);
return;
postpone:
defer(deferred_on_ready, arg, NULL);
(void)arg2;
}
static void deferred_on_data(void *uuid, void *arg2) {
if (!uuid_data(uuid).protocol || sock_isclosed((intptr_t)uuid)) {
return;
}
protocol_s *pr = protocol_try_lock(sock_uuid2fd(uuid), FIO_PR_LOCK_TASK);
if (!pr) {
if (errno == EBADF)
return;
goto postpone;
}
spn_unlock(&uuid_data(uuid).scheduled);
pr->on_data((intptr_t)uuid, pr);
protocol_unlock(pr, FIO_PR_LOCK_TASK);
if (!spn_trylock(&uuid_data(uuid).scheduled)) {
evio_add_read(sock_uuid2fd((intptr_t)uuid), uuid);
}
return;
postpone:
if (arg2) {
/* the event is being forced, so force rescheduling */
defer(deferred_on_data, (void *)uuid, (void *)1);
} else {
/* the protocol was locked, so there might not be any need for the event */
evio_add_read(sock_uuid2fd((intptr_t)uuid), uuid);
}
return;
}
static void deferred_ping(void *arg, void *arg2) {
if (!uuid_data(arg).protocol ||
(uuid_data(arg).timeout &&
(uuid_data(arg).timeout >
(facil_data->last_cycle.tv_sec - uuid_data(arg).active)))) {
return;
}
protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_WRITE);
if (!pr)
goto postpone;
pr->ping((intptr_t)arg, pr);
protocol_unlock(pr, FIO_PR_LOCK_WRITE);
return;
postpone:
defer(deferred_ping, arg, NULL);
(void)arg2;
}
/* *****************************************************************************
Forcing IO events
***************************************************************************** */
void facil_force_event(intptr_t uuid, enum facil_io_event ev) {
switch (ev) {
case FIO_EVENT_ON_DATA:
spn_trylock(&uuid_data(uuid).scheduled);
defer(deferred_on_data, (void *)uuid, (void *)1);
break;
case FIO_EVENT_ON_TIMEOUT:
defer(deferred_ping, (void *)uuid, NULL);
break;
case FIO_EVENT_ON_READY:
evio_on_ready((void *)uuid);
break;
}
}
/**
* Temporarily prevents `on_data` events from firing.
*
* The `on_data` event will be automatically rescheduled when (if) the socket's
* outgoing buffer fills up or when `facil_force_event` is called with
* `FIO_EVENT_ON_DATA`.
*/
void facil_quite(intptr_t uuid) {
if (sock_isvalid(uuid))
spn_trylock(&uuid_data(uuid).scheduled);
}
/* *****************************************************************************
Socket callbacks
***************************************************************************** */
void sock_on_close(intptr_t uuid) {
// fprintf(stderr, "INFO: facil.io, on-close called for %u (set to %p)\n",
// (unsigned int)sock_uuid2fd(uuid), (void
// *)uuid_data(uuid).protocol);
spn_lock(&uuid_data(uuid).lock);
struct connection_data_s old_data = uuid_data(uuid);
uuid_data(uuid) = (struct connection_data_s){.lock = uuid_data(uuid).lock};
spn_unlock(&uuid_data(uuid).lock);
if (old_data.protocol) {
defer(deferred_on_close, (void *)uuid, old_data.protocol);
if (facil_data->active == 0 && old_data.timeout) {
spn_sub(&facil_data->connection_count, 1);
}
}
}
void sock_touch(intptr_t uuid) {
if (facil_data && facil_data->active)
uuid_data(uuid).active = facil_data->last_cycle.tv_sec;
}
/* *****************************************************************************
Initialization and Cleanup
***************************************************************************** */
static spn_lock_i facil_libinit_lock = SPN_LOCK_INIT;
/** Rounds up any size to the nearest page alignment (assumes 4096 bytes per
* page) */
#define round_size(size) (((size) & (~4095)) + (4096 * (!!((size)&4095))))
static void facil_libcleanup(void) {
/* free memory */
spn_lock(&facil_libinit_lock);
facil_core_callback_force(FIO_CALL_AT_EXIT);
if (facil_data) {
facil_external_root_cleanup();
// defer_perform(); /* perform any lingering cleanup tasks? */
size_t mem_size = sizeof(*facil_data) + ((size_t)facil_data->capacity *
sizeof(struct connection_data_s));
munmap(facil_data, round_size(mem_size));
facil_data = NULL;
}
spn_unlock(&facil_libinit_lock);
}
static void facil_lib_init(void) {
ssize_t capa = sock_max_capacity();
if (capa < 0) {
perror("ERROR: socket capacity unknown / failure");
exit(ENOMEM);
}
size_t mem_size =
sizeof(*facil_data) + ((size_t)capa * sizeof(struct connection_data_s));
spn_lock(&facil_libinit_lock);
if (facil_data)
goto finish;
facil_data = mmap(NULL, round_size(mem_size), PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (!facil_data || facil_data == MAP_FAILED) {
perror("ERROR: Couldn't initialize the facil.io library");
exit(0);
}
memset(facil_data, 0, mem_size);
*facil_data = (struct facil_data_s){
.capacity = capa,
.parent = getpid(),
};
facil_external_root_init();
atexit(facil_libcleanup);
#ifdef DEBUG
if (FACIL_PRINT_STATE)
fprintf(stderr,
"Initialized the facil.io library.\n"
"facil.io's memory footprint per connection == %lu Bytes X %lu\n"
"=== facil.io's memory footprint: %lu ===\n\n",
(unsigned long)sizeof(struct connection_data_s),
(unsigned long)facil_data->capacity, (unsigned long)mem_size);
#endif
finish:
spn_unlock(&facil_libinit_lock);
clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle);
}
/** Sets to shutdown flag for the current process.
*
* If a cluster is used and this function is called for a worker, a new worker
* will respawn.
*
* This MUST be signal safe (don't call any functions, just use flags).
*/
static void facil_stop(void) {
if (!facil_data)
return;
facil_data->active = 0;
}
/* *****************************************************************************
The listenning protocol
***************************************************************************** */
#undef facil_listen
struct ListenerProtocol {
protocol_s protocol;
void (*on_open)(void *uuid, void *udata);
void *udata;
void (*on_start)(intptr_t uuid, void *udata);
void (*on_finish)(intptr_t uuid, void *udata);
char *port;
char *address;
uint8_t quite;
};
static void listener_ping(intptr_t uuid, protocol_s *plistener) {
// fprintf(stderr, "*** Listener Ping Called for %ld\n", sock_uuid2fd(uuid));
uuid_data(uuid).active = facil_data->last_cycle.tv_sec;
return;
(void)plistener;
}
static void listener_on_data(intptr_t uuid, protocol_s *plistener) {
for (int i = 0; i < 4; ++i) {
intptr_t new_client = sock_accept(uuid);
if (new_client == -1) {
if (errno == EWOULDBLOCK || errno == EAGAIN || errno == ECONNABORTED ||
errno == ECONNRESET)
return;
perror("ERROR: socket accept error");
return;
}
// to defer or not to defer...? TODO: answer the question
struct ListenerProtocol *listener = (struct ListenerProtocol *)plistener;
defer(listener->on_open, (void *)new_client, listener->udata);
}
facil_force_event(uuid, FIO_EVENT_ON_DATA);
}
static void free_listenner(void *li) { free(li); }
static void listener_on_close(intptr_t uuid, protocol_s *plistener) {
struct ListenerProtocol *listener = (void *)plistener;
if (listener->on_finish) {
listener->on_finish(uuid, listener->udata);
}
if (FACIL_PRINT_STATE && facil_data->parent == getpid()) {
if (listener->port) {
fprintf(stderr, "* Stopped listening on port %s\n", listener->port);
} else {
fprintf(stderr, "* Stopped listening on Unix Socket %s\n",
listener->address);
}
}
if (!listener->port) {
unlink(listener->address);
}
free_listenner(listener);
}
static inline struct ListenerProtocol *
listener_alloc(struct facil_listen_args settings) {
size_t port_len = 0;
size_t addr_len = 0;
if (settings.port) {
port_len = strlen(settings.port) + 1;
}
if (settings.address) {
addr_len = strlen(settings.address) + 1;
}
struct ListenerProtocol *listener =
malloc(sizeof(*listener) + addr_len + port_len);
if (listener) {
*listener = (struct ListenerProtocol){
.protocol.service = LISTENER_PROTOCOL_NAME,
.protocol.on_data = listener_on_data,
.protocol.on_close = listener_on_close,
.protocol.on_shutdown = mock_on_shutdown_internal,
.protocol.ping = listener_ping,
.on_open = (void (*)(void *, void *))settings.on_open,
.udata = settings.udata,
.on_start = settings.on_start,
.on_finish = settings.on_finish,
};
if (settings.port) {
listener->port = (char *)(listener + 1);
memcpy(listener->port, settings.port, port_len);
}
if (settings.address) {
listener->address = (char *)(listener + 1);
listener->address += port_len;
memcpy(listener->address, settings.address, addr_len);
}
return listener;
}
return NULL;
}
inline static void listener_on_start(int fd) {
intptr_t uuid = sock_fd2uuid((int)fd);
if (uuid < 0) {
fprintf(stderr, "ERROR: listening socket dropped?\n");
kill(0, SIGINT);
exit(4);
}
if (evio_add(fd, (void *)uuid) < 0) {
perror("Couldn't register listening socket");
kill(0, SIGINT);
exit(4);
}
fd_data(fd).active = facil_data->last_cycle.tv_sec;
// call the on_init callback
struct ListenerProtocol *listener =
(struct ListenerProtocol *)uuid_data(uuid).protocol;
if (listener->on_start) {
listener->on_start(uuid, listener->udata);
}
}
/**
Listens to a server with the following server settings (which MUST include
a default protocol).
*/
intptr_t facil_listen(struct facil_listen_args settings) {
if (!facil_data)
facil_lib_init();
if (settings.on_open == NULL) {
errno = EINVAL;
return -1;
}
if (!settings.port || settings.port[0] == 0 ||
(settings.port[0] == '0' && settings.port[1] == 0)) {
settings.port = NULL;
}
intptr_t uuid = sock_listen(settings.address, settings.port);
if (uuid == -1) {
return -1;
}
protocol_s *protocol = (void *)listener_alloc(settings);
facil_attach(uuid, protocol);
if (!protocol) {
sock_close(uuid);
return -1;
}
if (FACIL_PRINT_STATE && facil_data->parent == getpid()) {
if (settings.port)
fprintf(stderr, "* Listening on port %s\n", settings.port);
else
fprintf(stderr, "* Listening on Unix Socket at %s\n", settings.address);
}
return uuid;
}
/* *****************************************************************************
Connect (as client)
***************************************************************************** */
struct ConnectProtocol {
protocol_s protocol;
void (*on_connect)(void *uuid, void *udata);
void (*on_fail)(intptr_t uuid, void *udata);
void *udata;
intptr_t uuid;
uint8_t opened;
};
/* The first `ready` signal is fired when a connection was established */
static void connector_on_ready(intptr_t uuid, protocol_s *_connector) {
struct ConnectProtocol *connector = (void *)_connector;
sock_touch(uuid);
if (connector->opened == 0) {
connector->opened = 1;
facil_set_timeout(uuid, 0); /* remove connection timeout settings */
connector->on_connect((void *)uuid, connector->udata);
evio_add_write(sock_uuid2fd(uuid), (void *)uuid);
}
return;
}
/* If data events reach this protocol, delay their execution. */
static void connector_on_data(intptr_t uuid, protocol_s *connector) {
(void)connector;
(void)uuid;
}
/* Failed to connect */
static void connector_on_close(intptr_t uuid, protocol_s *pconnector) {
struct ConnectProtocol *connector = (void *)pconnector;
if (connector->opened == 0 && connector->on_fail)
connector->on_fail(connector->uuid, connector->udata);
free(connector);
(void)uuid;
}
#undef facil_connect
intptr_t facil_connect(struct facil_connect_args opt) {
intptr_t uuid = -1;
if (!opt.on_connect || (!opt.address && !opt.port))
goto error;
if (!opt.timeout)
opt.timeout = 30;
struct ConnectProtocol *connector = malloc(sizeof(*connector));
if (!connector)
goto error;
*connector = (struct ConnectProtocol){
.on_connect = (void (*)(void *, void *))opt.on_connect,
.on_fail = opt.on_fail,
.udata = opt.udata,
.protocol.service = CONNECTOR_PROTOCOL_NAME,
.protocol.on_data = connector_on_data,
.protocol.on_ready = connector_on_ready,
.protocol.on_close = connector_on_close,
.opened = 0,
};
uuid = connector->uuid = sock_connect(opt.address, opt.port);
/* check for errors, always invoke the on_fail if required */
if (uuid == -1) {
free(connector);
goto error;
}
if (facil_attach(uuid, &connector->protocol) == -1) {
/* facil_attach calls the failure / `on_close` callback */
sock_close(uuid);
return -1;
}
uuid_data(uuid).active = facil_last_tick().tv_sec;
facil_set_timeout(uuid, opt.timeout);
return uuid;
error:
if (opt.on_fail)
opt.on_fail(uuid, opt.udata);
return -1;
}
#define facil_connect(...) \
facil_connect((struct facil_connect_args){__VA_ARGS__})
/* *****************************************************************************
Timers
***************************************************************************** */
/* *******
Timer Protocol
******* */
typedef struct {
protocol_s protocol;
size_t milliseconds;
size_t repetitions;
void (*task)(void *);
void (*on_finish)(void *);
void *arg;
} timer_protocol_s;
#define prot2timer(protocol) (*((timer_protocol_s *)(protocol)))
static void timer_on_data(intptr_t uuid, protocol_s *protocol) {
prot2timer(protocol).task(prot2timer(protocol).arg);
if (prot2timer(protocol).repetitions == 0)
goto reschedule;
prot2timer(protocol).repetitions -= 1;
if (prot2timer(protocol).repetitions)
goto reschedule;
sock_force_close(uuid);
return;
reschedule:
spn_trylock(&uuid_data(uuid).scheduled);
evio_set_timer(sock_uuid2fd(uuid), (void *)uuid,
prot2timer(protocol).milliseconds);
}
static void timer_on_close(intptr_t uuid, protocol_s *protocol) {
prot2timer(protocol).on_finish(prot2timer(protocol).arg);
free(protocol);
(void)uuid;
}
static void timer_ping(intptr_t uuid, protocol_s *protocol) {
sock_touch(uuid);
(void)protocol;
}
static inline timer_protocol_s *timer_alloc(void (*task)(void *), void *arg,
size_t milliseconds,
size_t repetitions,
void (*on_finish)(void *)) {
if (!on_finish)
on_finish = (void (*)(void *))mock_on_close;
timer_protocol_s *t = malloc(sizeof(*t));
if (t)
*t = (timer_protocol_s){
.protocol.service = TIMER_PROTOCOL_NAME,
.protocol.on_data = timer_on_data,
.protocol.on_close = timer_on_close,
.protocol.on_shutdown = mock_on_shutdown_internal,
.protocol.ping = timer_ping,
.arg = arg,
.task = task,
.on_finish = on_finish,
.milliseconds = milliseconds,
.repetitions = repetitions,
};
return t;
}
inline static void timer_on_server_start(int fd) {
if (evio_set_timer(fd, (void *)sock_fd2uuid(fd),
prot2timer(fd_data(fd).protocol).milliseconds)) {
perror("Couldn't register a required timed event.");
kill(0, SIGINT);
exit(4);
}
}
/**
* Creates a system timer (at the cost of 1 file descriptor).
*
* The task will repeat `repetitions` times. If `repetitions` is set to 0, task
* will repeat forever.
*
* Returns -1 on error or the new file descriptor on succeess.
*
* The `on_finish` handler is always called (even on error).
*/
int facil_run_every(size_t milliseconds, size_t repetitions,
void (*task)(void *), void *arg,
void (*on_finish)(void *)) {
if (task == NULL)
goto error_fin;
timer_protocol_s *protocol = NULL;
intptr_t uuid = -1;
int fd = evio_open_timer();
if (fd == -1) {
perror("ERROR: couldn't create a timer fd");
goto error;
}
uuid = sock_open(fd);
if (uuid == -1)
goto error;
protocol = timer_alloc(task, arg, milliseconds, repetitions, on_finish);
if (protocol == NULL)
goto error;
facil_attach(uuid, (protocol_s *)protocol);
if (evio_isactive() && evio_set_timer(fd, (void *)uuid, milliseconds) == -1) {
/* don't goto error because the protocol is attached. */
const int old = errno;
sock_force_close(uuid);
errno = old;
return -1;
}
return 0;
error:
if (uuid != -1) {
const int old = errno;
sock_close(uuid);
errno = old;
} else if (fd != -1) {
const int old = errno;
close(fd);
errno = old;
}
error_fin:
if (on_finish) {
const int old = errno;
on_finish(arg);
errno = old;
}
return -1;
}
/* *****************************************************************************
Running the server
***************************************************************************** */
volatile uint8_t facil_signal_children_flag = 0;
/**
* Signals all workers to shutdown, which might invoke a respawning of the
* workers unless the shutdown signal was received.
*
* NOT signal safe.
*/
#pragma weak facil_cluster_signal_children
void facil_cluster_signal_children(void) {
if (facil_data->parent != getpid()) {
facil_stop();
}
}
static inline void facil_internal_poll(void) {
if (facil_signal_children_flag) {
facil_signal_children_flag = 0;
facil_cluster_signal_children();
}
}
static inline void facil_internal_poll_reset(void) {
facil_signal_children_flag = 0;
}
static void print_pid(void *arg, void *ignr) {
(void)arg;
(void)ignr;
fprintf(stderr, "* %d is running.\n", getpid());
}
static void facil_review_timeout(void *arg, void *ignr) {
(void)ignr;
protocol_s *tmp;
time_t review = facil_data->last_cycle.tv_sec;
intptr_t fd = (intptr_t)arg;
uint16_t timeout = fd_data(fd).timeout;
if (!timeout)
timeout = 300; /* enforced timout settings */
if (!fd_data(fd).protocol || (fd_data(fd).active + timeout >= review))
goto finish;
tmp = protocol_try_lock(fd, FIO_PR_LOCK_STATE);
if (!tmp)
goto reschedule;
if (prt_meta(tmp).locks[FIO_PR_LOCK_TASK] ||
prt_meta(tmp).locks[FIO_PR_LOCK_WRITE])
goto unlock;
defer(deferred_ping, (void *)sock_fd2uuid((int)fd), NULL);
unlock:
protocol_unlock(tmp, FIO_PR_LOCK_STATE);
finish:
do {
fd++;
} while (!fd_data(fd).protocol && (fd < facil_data->capacity));
if (facil_data->capacity <= fd) {
facil_data->need_review = 1;
return;
}
reschedule:
defer(facil_review_timeout, (void *)fd, NULL);
}
static void perform_idle(void *arg, void *ignr) {
facil_core_callback_force(FIO_CALL_ON_IDLE);
(void)arg;
(void)ignr;
}
/* reactor pattern cycling - common */
static void facil_cycle_schedule_events(void) {
static int idle = 0;
clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle);
facil_internal_poll();
int events;
if (defer_has_queue()) {
events = evio_review(0);
if (events < 0) {
return;
}
if (events > 0) {
idle = 1;
}
} else {
events = evio_review(EVIO_TICK);
if (events < 0)
return;
if (events > 0) {
idle = 1;
} else if (idle) {
defer(perform_idle, NULL, NULL);
idle = 0;
}
}
static time_t last_to_review = 0;
if (facil_data->need_review &&
facil_data->last_cycle.tv_sec != last_to_review) {
last_to_review = facil_data->last_cycle.tv_sec;
facil_data->need_review = 0;
defer(facil_review_timeout, (void *)0, NULL);
}
}
/* reactor pattern cycling during cleanup */
static void facil_cycle_unwind(void *ignr, void *ignr2) {
if (facil_data->connection_count) {
facil_cycle_schedule_events();
defer(facil_cycle_unwind, ignr, ignr2);
return;
}
pool_pt pool = facil_data->thread_pool;
facil_data->thread_pool = NULL;
defer_pool_stop(pool);
return;
(void)ignr;
(void)ignr2;
}
/* reactor pattern cycling */
static void facil_cycle(void *ignr, void *ignr2) {
facil_cycle_schedule_events();
if (facil_data->active) {
defer(facil_cycle, ignr, ignr2);
return;
}
/* switch to winding down */
if (FACIL_PRINT_STATE && !facil_data->spindown) {
pid_t pid = getpid();
if (pid != facil_data->parent)
fprintf(stderr, "* (%d) Detected exit signal.\n", getpid());
else
fprintf(stderr, "* Server Detected exit signal.\n");
}
pool_pt pool = facil_data->thread_pool;
facil_data->thread_pool = NULL;
defer_pool_stop(pool);
return;
(void)ignr;
(void)ignr2;
}
/**
OVERRIDE THIS to replace the default `fork` implementation or to inject hooks
into the forking function.
Behaves like the system's `fork`.
*/
#pragma weak facil_fork
int facil_fork(void) { return (int)fork(); }
/** This will be called by child processes, make sure to unlock any existing
* locks.
*
* Known locks:
* * `defer` tasks lock.
* * `sock` packet pool lock.
* * `sock` connection lock (per connection).
* * `facil` global lock.
* * `facil` pub/sub lock.
* * `facil` connection data lock (per connection data).
* * `facil` protocol lock (per protocol object, placed in `rsv`).
* * `pubsub` pubsub global lock (should be initialized in facil_external_init.
* * `pubsub` pubsub client lock (should be initialized in facil_external_init.
*/
static void facil_worker_startup(uint8_t sentinel) {
facil_internal_poll_reset();
evio_create();
clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle);
facil_external_init();
if (facil_data->active == 1) {
/* single process */
for (int i = 0; i < facil_data->capacity; i++) {
errno = 0;
fd_data(i).lock = SPN_LOCK_INIT;
if (fd_data(i).protocol) {
fd_data(i).protocol->rsv = 0;
if (fd_data(i).protocol->service == LISTENER_PROTOCOL_NAME)
listener_on_start(i);
else if (fd_data(i).protocol->service == TIMER_PROTOCOL_NAME)
timer_on_server_start(i);
else {
evio_add(i, (void *)sock_fd2uuid(i));
}
}
}
} else if (sentinel == 0) {
/* child process */
for (int i = 0; i < facil_data->capacity; i++) {
errno = 0;
fd_data(i).lock = SPN_LOCK_INIT;
if (fd_data(i).protocol) {
fd_data(i).protocol->rsv = 0;
if (fd_data(i).protocol->service == LISTENER_PROTOCOL_NAME)
listener_on_start(i);
else if (fd_data(i).protocol->service == TIMER_PROTOCOL_NAME)
timer_on_server_start(i);
else {
/* prevent normal connections from being shared across workers */
intptr_t uuid = sock_fd2uuid(i);
if (uuid != -1) {
sock_force_close(uuid);
} else {
sock_on_close(uuid);
}
}
}
}
} else {
/* sentinel process - ignore listening sockets, but keep them open */
for (int i = 0; i < facil_data->capacity; i++) {
fd_data(i).lock = SPN_LOCK_INIT;
if (fd_data(i).protocol) {
fd_data(i).protocol->rsv = 0;
if (fd_data(i).protocol->service == TIMER_PROTOCOL_NAME)
timer_on_server_start(i);
else if (fd_data(i).protocol->service != LISTENER_PROTOCOL_NAME) {
evio_add(i, (void *)sock_fd2uuid(i));
}
}
}
}
/* Clear all existing events & flush `facil_cycle` out. */
{
facil_data->spindown = 1;
uint16_t old_active = facil_data->active;
facil_data->active = 0;
defer_perform();
facil_data->active = old_active;
facil_data->spindown = 0;
}
/* call any external startup callbacks. */
facil_external_init2();
/* add cycling to the defer queue to setup the reactor pattern. */
facil_data->need_review = 1;
defer(facil_cycle, NULL, NULL);
/* Call the on_start callbacks. */
facil_core_callback_force(FIO_CALL_ON_START);
if (FACIL_PRINT_STATE) {
if (sentinel || facil_data->parent == getpid()) {
fprintf(stderr,
"Server is running %u %s X %u %s, press ^C to stop\n"
"* Detected capacity: %zd open file limit\n"
"* Root pid: %d\n",
facil_data->active, facil_data->active > 1 ? "workers" : "worker",
facil_data->threads,
facil_data->threads > 1 ? "threads" : "thread",
facil_data->capacity, facil_data->parent);
} else {
defer(print_pid, NULL, NULL);
}
}
facil_data->thread_pool =
defer_pool_start((sentinel ? 1 : facil_data->threads));
if (facil_data->thread_pool)
defer_pool_wait(facil_data->thread_pool);
}
static void facil_worker_cleanup(void) {
facil_data->active = 0;
facil_cluster_signal_children();
facil_core_callback_force(FIO_CALL_ON_SHUTDOWN);
for (int i = 0; i <= facil_data->capacity; ++i) {
intptr_t uuid;
if ((uuid = sock_fd2uuid(i)) >= 0) {
defer(deferred_on_shutdown, (void *)uuid, NULL);
}
}
facil_data->thread_pool = defer_pool_start(facil_data->threads);
if (facil_data->thread_pool) {
defer(facil_cycle_unwind, NULL, NULL);
defer_pool_wait(facil_data->thread_pool);
facil_data->thread_pool = NULL;
}
fprintf(stderr, "* %d cleaning up.\n", getpid());
/* close leftovers */
for (int i = 0; i <= facil_data->capacity; ++i) {
intptr_t uuid;
if (fd_data(i).protocol && (uuid = sock_fd2uuid(i)) >= 0) {
sock_force_close(uuid);
}
}
defer_perform();
if (facil_data->parent == getpid()) {
kill(0, SIGINT);
while (wait(NULL) != -1)
;
facil_external_cleanup();
}
facil_core_callback_force(FIO_CALL_ON_FINISH);
defer_perform();
evio_close();
facil_external_cleanup();
if (facil_data->parent == getpid() && FACIL_PRINT_STATE) {
fprintf(stderr, "\n --- Shutdown Complete ---\n");
}
}
static spn_lock_i fio_fork_lock = SPN_LOCK_INIT;
static void facil_sentinel_task(void *arg1, void *arg2);
static void *facil_sentinel_worker_thread(void *arg) {
errno = 0;
pid_t child = facil_fork();
/* release fork lock. */
spn_unlock(&fio_fork_lock);
if (child == -1) {
perror("FATAL ERROR: couldn't spawn worker.");
kill(facil_parent_pid(), SIGINT);
facil_stop();
return NULL;
} else if (child) {
int status;
waitpid(child, &status, 0);
#if DEBUG
if (facil_data->active) { /* !WIFEXITED(status) || WEXITSTATUS(status) */
if (!WIFEXITED(status) || WEXITSTATUS(status)) {
fprintf(stderr,
"FATAL ERROR: Child worker (%d) crashed. Stopping services in "
"DEBUG mode.\n",
child);
facil_core_callback_force(FIO_CALL_ON_CHILD_CRUSH);
} else {
fprintf(
stderr,
"INFO (FATAL): Child worker (%d) shutdown. Stopping services in "
"DEBUG mode.\n",
child);
}
kill(0, SIGINT);
}
#else
if (facil_data->active) {
/* don't call any functions while forking. */
spn_lock(&fio_fork_lock);
if (!WIFEXITED(status) || WEXITSTATUS(status)) {
fprintf(stderr,
"ERROR: Child worker (%d) crashed. Respawning worker.\n",
child);
facil_core_callback_force(FIO_CALL_ON_CHILD_CRUSH);
} else {
fprintf(stderr,
"INFO: Child worker (%d) shutdown. Respawning worker.\n",
child);
}
defer(facil_sentinel_task, NULL, NULL);
spn_unlock(&fio_fork_lock);
}
#endif
} else {
facil_core_callback_force(FIO_CALL_AFTER_FORK);
facil_core_callback_force(FIO_CALL_IN_CHILD);
facil_worker_startup(0);
facil_worker_cleanup();
exit(0);
}
return NULL;
(void)arg;
}
#if FIO_SENTINEL_USE_PTHREAD
static void facil_sentinel_task(void *arg1, void *arg2) {
if (!facil_data->active)
return;
spn_lock(&fio_fork_lock);
pthread_t sentinel;
if (pthread_create(&sentinel, NULL, facil_sentinel_worker_thread,
(void *)&fio_fork_lock)) {
perror("FATAL ERROR: couldn't start sentinel thread");
exit(errno);
}
pthread_detach(sentinel);
spn_lock(&fio_fork_lock); /* will wait for worker thread to release lock. */
spn_unlock(&fio_fork_lock);
facil_core_callback_force(FIO_CALL_AFTER_FORK);
(void)arg1;
(void)arg2;
}
#else
static void facil_sentinel_task(void *arg1, void *arg2) {
if (!facil_data->active)
return;
facil_core_callback_force(FIO_CALL_BEFORE_FORK);
spn_lock(&fio_fork_lock); /* will wait for worker thread to release lock. */
void *thrd =
defer_new_thread(facil_sentinel_worker_thread, (void *)&fio_fork_lock);
defer_free_thread(thrd);
spn_lock(&fio_fork_lock); /* will wait for worker thread to release lock. */
spn_unlock(&fio_fork_lock);
facil_core_callback_force(FIO_CALL_AFTER_FORK);
(void)arg1;
(void)arg2;
}
#endif
/* handles the SIGUSR1, SIGINT and SIGTERM signals. */
static void sig_int_handler(int sig) {
switch (sig) {
#if !FACIL_DISABLE_HOT_RESTART
case SIGUSR1:
facil_signal_children_flag = 1;
break;
#endif
case SIGINT: /* fallthrough */
case SIGTERM: /* fallthrough */
facil_stop();
break;
default:
break;
}
}
/* setup handling for the SIGUSR1, SIGPIPE, SIGINT and SIGTERM signals. */
static void facil_setup_signal_handler(void) {
/* setup signal handling */
struct sigaction act, old;
act.sa_handler = sig_int_handler;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGINT, &act, &old)) {
perror("couldn't set signal handler");
return;
};
if (sigaction(SIGTERM, &act, &old)) {
perror("couldn't set signal handler");
return;
};
#if !FACIL_DISABLE_HOT_RESTART
if (sigaction(SIGUSR1, &act, &old)) {
perror("couldn't set signal handler");
return;
};
#endif
act.sa_handler = SIG_IGN;
if (sigaction(SIGPIPE, &act, &old)) {
perror("couldn't set signal handler");
return;
};
}
/*
* Zombie Reaping
* With thanks to Dr Graham D Shaw.
* http://www.microhowto.info/howto/reap_zombie_processes_using_a_sigchld_handler.html
*/
static void reap_child_handler(int sig) {
(void)(sig);
int old_errno = errno;
while (waitpid(-1, NULL, WNOHANG) > 0)
;
errno = old_errno;
}
/* initializes zombie reaping for the process */
void facil_reap_children(void) {
struct sigaction sa;
sa.sa_handler = reap_child_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGCHLD, &sa, 0) == -1) {
perror("Child reaping initialization failed");
kill(0, SIGINT);
exit(errno);
}
}
/** returns facil.io's parent (root) process pid. */
pid_t facil_parent_pid(void) {
if (!facil_data)
facil_lib_init();
return facil_data->parent;
}
static inline size_t facil_detect_cpu_cores(void) {
ssize_t cpu_count = 0;
#ifdef _SC_NPROCESSORS_ONLN
cpu_count = sysconf(_SC_NPROCESSORS_ONLN);
if (cpu_count < 0) {
if (FACIL_PRINT_STATE) {
fprintf(stderr, "WARNING: CPU core count auto-detection failed.\n");
}
return 0;
}
#else
if (1 || FACIL_PRINT_STATE) {
fprintf(stderr, "WARNING: CPU core count auto-detection failed.\n");
}
#endif
return cpu_count;
}
/**
* Returns the number of expected threads / processes to be used by facil.io.
*
* The pointers should start with valid values that match the expected threads /
* processes values passed to `facil_run`.
*
* The data in the pointers will be overwritten with the result.
*/
void facil_expected_concurrency(int16_t *threads, int16_t *processes) {
if (!threads || !processes)
return;
if (!*threads && !*processes) {
/* both options set to 0 - default to cores*cores matrix */
ssize_t cpu_count = facil_detect_cpu_cores();
#if FACIL_CPU_CORES_LIMIT
if (cpu_count > FACIL_CPU_CORES_LIMIT) {
static int print_cores_warning = 1;
if (print_cores_warning) {
fprintf(
stderr,
"INFO: Detected %zu cores. Capping auto-detection of cores "
"to %zu.\n"
" Avoid this message by setting threads / workers manually.\n"
" To increase auto-detection limit, recompile with:\n"
" -DFACIL_CPU_CORES_LIMIT=%zu \n",
(size_t)cpu_count, (size_t)FACIL_CPU_CORES_LIMIT,
(size_t)cpu_count);
print_cores_warning = 0;
}
cpu_count = FACIL_CPU_CORES_LIMIT;
}
#endif
*threads = *processes = (int16_t)cpu_count;
if (cpu_count > FACIL_CPU_CORES_LIMIT) {
/* leave a core available for the kernel */
--(*processes);
}
} else if (*threads < 0 || *processes < 0) {
/* Set any option that is less than 0 be equal to cores/value */
/* Set any option equal to 0 be equal to the other option in value */
ssize_t cpu_count = facil_detect_cpu_cores();
size_t cpu_adjust = (*processes <= 0 ? 1 : 0);
if (cpu_count > 0) {
int16_t tmp_threads = 0;
if (*threads < 0)
tmp_threads = (int16_t)(cpu_count / (*threads * -1));
else if (*threads == 0)
tmp_threads = -1 * *processes;
else
tmp_threads = *threads;
if (*processes < 0)
*processes = (int16_t)(cpu_count / (*processes * -1));
else if (*processes == 0)
*processes = -1 * *threads;
*threads = tmp_threads;
if (cpu_adjust && (*processes * tmp_threads) >= cpu_count &&
cpu_count > FACIL_CPU_CORES_LIMIT) {
/* leave a core available for the kernel */
--*processes;
}
}
}
/* make sure we have at least one process and at least one thread */
if (*processes <= 0)
*processes = 1;
if (*threads <= 0)
*threads = 1;
}
#undef facil_run
void facil_run(struct facil_run_args args) {
signal(SIGPIPE, SIG_IGN);
if (!facil_data) {
facil_lib_init();
}
/* compute actual concurrency */
facil_expected_concurrency(&args.threads, &args.processes);
/* listen to SIGINT / SIGTERM */
facil_setup_signal_handler();
/* activate facil, fork if needed */
facil_data->active = (uint16_t)args.processes;
facil_data->threads = (uint16_t)args.threads;
/* call any pre-start callbacks*/
facil_core_callback_force(FIO_CALL_PRE_START);
/* initialize cluster */
if (args.processes > 1) {
for (int i = 0; i < args.processes && facil_data->active; ++i) {
facil_sentinel_task(NULL, NULL);
}
facil_worker_startup(1);
} else {
facil_worker_startup(0);
}
facil_worker_cleanup();
}
/**
* Returns the number of worker processes if facil.io is running.
*
* (1 is returned when in single process mode, otherwise the number of workers)
*/
int16_t facil_is_running(void) { return (facil_data ? facil_data->active : 0); }
/* *****************************************************************************
Setting the protocol
***************************************************************************** */
/* managing the protocol pointer array and the `on_close` callback */
static int facil_attach_state(intptr_t uuid, protocol_s *protocol,
protocol_metadata_s state) {
if (!facil_data)
facil_lib_init();
if (protocol) {
if (!protocol->on_close) {
protocol->on_close = mock_on_close;
}
if (!protocol->on_data) {
protocol->on_data = mock_on_data;
}
if (!protocol->on_ready) {
protocol->on_ready = mock_on_ev;
}
if (!protocol->ping) {
protocol->ping = mock_ping;
}
if (!protocol->on_shutdown) {
protocol->on_shutdown = mock_on_shutdown;
}
prt_meta(protocol) = state;
}
spn_lock(&uuid_data(uuid).lock);
if (!sock_isvalid(uuid)) {
spn_unlock(&uuid_data(uuid).lock);
if (protocol)
defer(deferred_on_close, (void *)uuid, protocol);
if (uuid == -1)
errno = EBADF;
else
errno = ENOTCONN;
return -1;
}
struct connection_data_s old_data = uuid_data(uuid);
uuid_data(uuid).protocol = protocol;
uuid_data(uuid).active = facil_data->last_cycle.tv_sec;
spn_unlock(&uuid_data(uuid).lock);
if (old_data.protocol) {
defer(deferred_on_close, (void *)uuid, old_data.protocol);
} else if (evio_isactive() && protocol) {
return evio_add(sock_uuid2fd(uuid), (void *)uuid);
}
return 0;
}
/** Attaches (or updates) a protocol object to a socket UUID.
* Returns -1 on error and 0 on success.
*/
int facil_attach(intptr_t uuid, protocol_s *protocol) {
return facil_attach_state(uuid, protocol, (protocol_metadata_s){.rsv = 0});
}
/**
* Attaches (or updates) a LOCKED protocol object to a socket UUID.
*/
int facil_attach_locked(intptr_t uuid, protocol_s *protocol) {
{
protocol_metadata_s state = {.rsv = 0};
spn_lock(state.locks + FIO_PR_LOCK_TASK);
return facil_attach_state(uuid, protocol, state);
}
}
/** Sets a timeout for a specific connection (only when running and valid). */
void facil_set_timeout(intptr_t uuid, uint8_t timeout) {
if (sock_isvalid(uuid) && facil_data && facil_data->active) {
uuid_data(uuid).active = facil_data->last_cycle.tv_sec;
uuid_data(uuid).timeout = timeout;
}
}
/** Gets a timeout for a specific connection. Returns 0 if there's no set
* timeout or the connection is inactive. */
uint8_t facil_get_timeout(intptr_t uuid) { return uuid_data(uuid).timeout; }
/* *****************************************************************************
Misc helpers
***************************************************************************** */
/**
Returns the last time the server reviewed any pending IO events.
*/
struct timespec facil_last_tick(void) {
if (!facil_data) {
facil_lib_init();
clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle);
}
return facil_data->last_cycle;
}
/**
* This function allows out-of-task access to a connection's `protocol_s`
* object by attempting to lock it.
*/
protocol_s *facil_protocol_try_lock(intptr_t uuid,
enum facil_protocol_lock_e type) {
if (!sock_isvalid(uuid) || !uuid_data(uuid).protocol) {
errno = EBADF;
return NULL;
}
return protocol_try_lock(sock_uuid2fd(uuid), type);
}
/** See `facil_protocol_try_lock` for details. */
void facil_protocol_unlock(protocol_s *pr, enum facil_protocol_lock_e type) {
if (!pr)
return;
protocol_unlock(pr, type);
}
/** Counts all the connections of a specific type. */
size_t facil_count(void *service) {
size_t count = 0;
for (intptr_t i = 0; i < facil_data->capacity; i++) {
void *tmp = NULL;
spn_lock(&fd_data(i).lock);
if (fd_data(i).protocol && fd_data(i).protocol->service)
tmp = (void *)fd_data(i).protocol->service;
spn_unlock(&fd_data(i).lock);
if (tmp != LISTENER_PROTOCOL_NAME && tmp != TIMER_PROTOCOL_NAME &&
(!service || (tmp == service)))
count++;
}
return count;
}
/* *****************************************************************************
Task Management - `facil_defer`, `facil_each`
***************************************************************************** */
struct task {
intptr_t origin;
void (*func)(intptr_t uuid, protocol_s *, void *arg);
void *arg;
void (*on_done)(intptr_t uuid, void *arg);
const void *service;
uint32_t count;
enum facil_protocol_lock_e task_type;
spn_lock_i lock;
};
static inline struct task *alloc_facil_task(void) {
return fio_malloc(sizeof(struct task));
}
static inline void free_facil_task(struct task *task) { fio_free(task); }
static void mock_on_task_done(intptr_t uuid, void *arg) {
(void)uuid;
(void)arg;
}
static void perform_single_task(void *v_uuid, void *v_task) {
struct task *task = v_task;
if (!uuid_data(v_uuid).protocol)
goto fallback;
protocol_s *pr = protocol_try_lock(sock_uuid2fd(v_uuid), task->task_type);
if (!pr)
goto defer;
if (pr->service == CONNECTOR_PROTOCOL_NAME) {
protocol_unlock(pr, task->task_type);
goto defer;
}
task->func((intptr_t)v_uuid, pr, task->arg);
protocol_unlock(pr, task->task_type);
free_facil_task(task);
return;
fallback:
task->on_done((intptr_t)v_uuid, task->arg);
free_facil_task(task);
return;
defer:
defer(perform_single_task, v_uuid, v_task);
return;
}
static void finish_multi_task(void *v_fd, void *v_task) {
struct task *task = v_task;
if (spn_trylock(&task->lock))
goto reschedule;
task->count--;
if (task->count) {
spn_unlock(&task->lock);
return;
}
task->on_done(task->origin, task->arg);
free_facil_task(task);
return;
reschedule:
defer(finish_multi_task, v_fd, v_task);
}
static void perform_multi_task(void *v_fd, void *v_task) {
if (!fd_data((intptr_t)v_fd).protocol) {
finish_multi_task(v_fd, v_task);
return;
}
struct task *task = v_task;
protocol_s *pr = protocol_try_lock((intptr_t)v_fd, task->task_type);
if (!pr)
goto reschedule;
if (pr->service == task->service) {
const intptr_t uuid = sock_fd2uuid((int)(intptr_t)v_fd);
task->func(uuid, pr, task->arg);
}
protocol_unlock(pr, task->task_type);
defer(finish_multi_task, v_fd, v_task);
return;
reschedule:
// fprintf(stderr, "rescheduling multi for %p\n", v_fd);
defer(perform_multi_task, v_fd, v_task);
}
static void schedule_multi_task(void *v_fd, void *v_task) {
struct task *task = v_task;
intptr_t fd = (intptr_t)v_fd;
for (size_t i = 0; i < 64; i++) {
if (!fd_data(fd).protocol)
goto finish;
if (spn_trylock(&fd_data(fd).lock))
goto reschedule;
if (!fd_data(fd).protocol ||
fd_data(fd).protocol->service != task->service || fd == task->origin) {
spn_unlock(&fd_data(fd).lock);
goto finish;
}
spn_unlock(&fd_data(fd).lock);
spn_lock(&task->lock);
task->count++;
spn_unlock(&task->lock);
defer(perform_multi_task, (void *)fd, task);
finish:
do {
fd++;
} while (!fd_data(fd).protocol && (fd < facil_data->capacity));
if (fd >= (intptr_t)facil_data->capacity)
goto complete;
}
reschedule:
schedule_multi_task((void *)fd, v_task);
return;
complete:
defer(finish_multi_task, NULL, v_task);
}
/**
* Schedules a protected connection task. The task will run within the
* connection's lock.
*
* If the connection is closed before the task can run, the
* `fallback` task wil be called instead, allowing for resource cleanup.
*/
#undef facil_defer
void facil_defer(struct facil_defer_args_s args) {
if (!args.fallback)
args.fallback = mock_on_task_done;
if (!args.type)
args.type = FIO_PR_LOCK_TASK;
if (!args.task || !uuid_data(args.uuid).protocol || args.uuid < 0 ||
!sock_isvalid(args.uuid))
goto error;
struct task *task = alloc_facil_task();
if (!task)
goto error;
*task = (struct task){
.func = args.task, .arg = args.arg, .on_done = args.fallback};
defer(perform_single_task, (void *)args.uuid, task);
return;
error:
defer((void (*)(void *, void *))args.fallback, (void *)args.uuid, args.arg);
}
/**
* Schedules a protected connection task for each `service` connection.
* The tasks will run within each of the connection's locks.
*
* Once all the tasks were performed, the `on_complete` callback will be called.
*/
#undef facil_each
int facil_each(struct facil_each_args_s args) {
if (!args.on_complete)
args.on_complete = mock_on_task_done;
if (!args.task_type)
args.task_type = FIO_PR_LOCK_TASK;
if (!args.task)
goto error;
struct task *task = alloc_facil_task();
if (!task)
goto error;
*task = (struct task){.origin = args.origin,
.func = args.task,
.arg = args.arg,
.on_done = args.on_complete,
.service = args.service,
.task_type = args.task_type,
.count = 1};
defer(schedule_multi_task, (void *)0, task);
return 0;
error:
defer((void (*)(void *, void *))args.on_complete, (void *)args.origin,
args.arg);
return -1;
}