blob: a58543d4d8633aea72638f1c7219e9d187504bf2 [file] [log] [blame] [raw]
/*
Copyright: Boaz Segev, 2016-2017
License: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#include "spnlock.inc"
#include "evio.h"
#include "facil.h"
#include "fio_hashmap.h"
#include "fiobj4sock.h"
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/wait.h>
/* *****************************************************************************
Data Structures
***************************************************************************** */
typedef struct ProtocolMetadata {
spn_lock_i locks[3];
unsigned rsv : 8;
} protocol_metadata_s;
union protocol_metadata_union_u {
size_t opaque;
protocol_metadata_s meta;
};
#define prt_meta(prt) (((union protocol_metadata_union_u *)(&(prt)->rsv))->meta)
struct connection_data_s {
protocol_s *protocol;
time_t active;
uint8_t timeout;
spn_lock_i scheduled;
spn_lock_i lock;
};
static struct facil_data_s {
spn_lock_i global_lock;
uint8_t need_review;
pool_pt thread_pool;
pid_t parent;
uint16_t active;
uint16_t threads;
ssize_t capacity;
void (*on_idle)(void);
void (*on_finish)(void);
struct timespec last_cycle;
struct connection_data_s conn[];
} * facil_data;
#define fd_data(fd) (facil_data->conn[(fd)])
#define uuid_data(uuid) fd_data(sock_uuid2fd((uuid)))
// #define uuid_prt_meta(uuid) prt_meta(uuid_data((uuid)).protocol)
static inline void clear_connection_data_unsafe(intptr_t uuid,
protocol_s *protocol) {
uuid_data(uuid) =
(struct connection_data_s){.active = facil_data->last_cycle.tv_sec,
.protocol = protocol,
.lock = uuid_data(uuid).lock};
}
/** locks a connection's protocol returns a pointer that need to be unlocked. */
inline static protocol_s *protocol_try_lock(intptr_t fd,
enum facil_protocol_lock_e type) {
errno = 0;
if (spn_trylock(&fd_data(fd).lock))
goto would_block;
protocol_s *pr = fd_data(fd).protocol;
if (!pr) {
spn_unlock(&fd_data(fd).lock);
errno = EBADF;
return NULL;
}
if (spn_trylock(&prt_meta(pr).locks[type])) {
spn_unlock(&fd_data(fd).lock);
goto would_block;
}
spn_unlock(&fd_data(fd).lock);
return pr;
would_block:
errno = EWOULDBLOCK;
return NULL;
}
/** See `facil_protocol_try_lock` for details. */
inline static void protocol_unlock(protocol_s *pr,
enum facil_protocol_lock_e type) {
spn_unlock(&prt_meta(pr).locks[type]);
}
/* *****************************************************************************
Deferred event handlers
***************************************************************************** */
static void deferred_on_close(void *uuid_, void *pr_) {
protocol_s *pr = pr_;
if (pr->rsv)
goto postpone;
pr->on_close((intptr_t)uuid_, pr);
return;
postpone:
defer(deferred_on_close, uuid_, pr_);
}
static void deferred_on_shutdown(void *arg, void *arg2) {
if (!uuid_data(arg).protocol) {
return;
}
protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_WRITE);
if (!pr) {
if (errno == EBADF)
return;
goto postpone;
}
pr->on_shutdown((intptr_t)arg, pr);
protocol_unlock(pr, FIO_PR_LOCK_WRITE);
sock_close((intptr_t)arg);
return;
postpone:
defer(deferred_on_shutdown, arg, NULL);
(void)arg2;
}
static void deferred_on_ready(void *arg, void *arg2) {
if (!uuid_data(arg).protocol) {
return;
}
protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_WRITE);
if (!pr) {
if (errno == EBADF)
return;
goto postpone;
}
pr->on_ready((intptr_t)arg, pr);
if (sock_has_pending((intptr_t)arg))
evio_add_write(sock_uuid2fd((intptr_t)arg), arg);
protocol_unlock(pr, FIO_PR_LOCK_WRITE);
return;
postpone:
defer(deferred_on_ready, arg, NULL);
(void)arg2;
}
static void deferred_on_data(void *arg, void *arg2) {
if (!uuid_data(arg).protocol) {
return;
}
protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_TASK);
if (!pr) {
if (errno == EBADF)
return;
goto postpone;
}
spn_unlock(&uuid_data(arg).scheduled);
pr->on_data((intptr_t)arg, pr);
protocol_unlock(pr, FIO_PR_LOCK_TASK);
if (!spn_trylock(&uuid_data(arg).scheduled)) {
evio_add_read(sock_uuid2fd((intptr_t)arg), arg);
}
// else
// fprintf(stderr, "skipped evio_add\n");
return;
postpone:
defer(deferred_on_data, arg, NULL);
(void)arg2;
}
static void deferred_ping(void *arg, void *arg2) {
if (!uuid_data(arg).protocol ||
(uuid_data(arg).timeout &&
(uuid_data(arg).timeout >
(facil_data->last_cycle.tv_sec - uuid_data(arg).active)))) {
return;
}
protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_WRITE);
if (!pr)
goto postpone;
pr->ping((intptr_t)arg, pr);
protocol_unlock(pr, FIO_PR_LOCK_WRITE);
return;
postpone:
defer(deferred_ping, arg, NULL);
(void)arg2;
}
/* *****************************************************************************
Event Handlers (evio)
***************************************************************************** */
static void sock_flush_defer(void *arg, void *ignored) {
(void)ignored;
sock_flush((intptr_t)arg);
}
void evio_on_ready(void *arg) {
defer(sock_flush_defer, arg, NULL);
defer(deferred_on_ready, arg, NULL);
}
void evio_on_close(void *arg) { sock_force_close((intptr_t)arg); }
void evio_on_error(void *arg) { sock_force_close((intptr_t)arg); }
void evio_on_data(void *arg) { defer(deferred_on_data, arg, NULL); }
/* *****************************************************************************
Forcing IO events
***************************************************************************** */
void facil_force_event(intptr_t uuid, enum facil_io_event ev) {
switch (ev) {
case FIO_EVENT_ON_DATA:
spn_trylock(&uuid_data(uuid).scheduled);
evio_on_data((void *)uuid);
break;
case FIO_EVENT_ON_TIMEOUT:
defer(deferred_ping, (void *)uuid, NULL);
break;
case FIO_EVENT_ON_READY:
evio_on_ready((void *)uuid);
break;
}
}
/**
* Temporarily prevents `on_data` events from firing.
*
* The `on_data` event will be automatically rescheduled when (if) the socket's
* outgoing buffer fills up or when `facil_force_event` is called with
* `FIO_EVENT_ON_DATA`.
*/
void facil_quite(intptr_t uuid) {
if (sock_isvalid(uuid))
spn_trylock(&uuid_data(uuid).scheduled);
}
/* *****************************************************************************
Socket callbacks
***************************************************************************** */
void sock_on_close(intptr_t uuid) {
spn_lock(&uuid_data(uuid).lock);
protocol_s *old_protocol = uuid_data(uuid).protocol;
clear_connection_data_unsafe(uuid, NULL);
spn_unlock(&uuid_data(uuid).lock);
if (old_protocol)
defer(deferred_on_close, (void *)uuid, old_protocol);
}
void sock_touch(intptr_t uuid) {
uuid_data(uuid).active = facil_data->last_cycle.tv_sec;
}
/* *****************************************************************************
Mock Protocol Callbacks and Service Funcions
***************************************************************************** */
static void mock_on_ev(intptr_t uuid, protocol_s *protocol) {
(void)uuid;
(void)protocol;
}
static void mock_on_close(intptr_t uuid, protocol_s *protocol) {
(void)(protocol);
(void)(uuid);
}
static void mock_on_finish(intptr_t uuid, void *udata) {
(void)(udata);
(void)(uuid);
}
static void mock_ping(intptr_t uuid, protocol_s *protocol) {
(void)(protocol);
sock_force_close(uuid);
}
static void mock_idle(void) {}
/* Support for the default pub/sub cluster engine */
#pragma weak pubsub_cluster_init
void pubsub_cluster_init(void) {}
#pragma weak pubsub_cluster_on_fork
void pubsub_cluster_on_fork(void) {}
/* perform initialization for external services. */
static void facil_external_init(void) {
sock_on_fork();
pubsub_cluster_on_fork();
}
/* perform cleanup for external services. */
static void facil_external_cleanup(void) {}
#pragma weak http_lib_init
void http_lib_init(void) {}
#pragma weak http_lib_cleanup
void http_lib_cleanup(void) {}
/* perform initialization for external services. */
static void facil_external_root_init(void) {
http_lib_init();
pubsub_cluster_init();
}
/* perform cleanup for external services. */
static void facil_external_root_cleanup(void) { http_lib_cleanup(); }
/* *****************************************************************************
Initialization and Cleanup
***************************************************************************** */
static spn_lock_i facil_libinit_lock = SPN_LOCK_INIT;
static void facil_libcleanup(void) {
/* free memory */
spn_lock(&facil_libinit_lock);
if (facil_data) {
munmap(facil_data,
sizeof(*facil_data) + ((size_t)facil_data->capacity *
sizeof(struct connection_data_s)));
facil_external_root_cleanup();
facil_data = NULL;
}
spn_unlock(&facil_libinit_lock);
}
static void facil_lib_init(void) {
ssize_t capa = sock_max_capacity();
if (capa < 0) {
perror("ERROR: socket capacity unknown / failure");
exit(ENOMEM);
}
size_t mem_size =
sizeof(*facil_data) + ((size_t)capa * sizeof(struct connection_data_s));
spn_lock(&facil_libinit_lock);
if (facil_data)
goto finish;
facil_data = mmap(NULL, mem_size, PROT_READ | PROT_WRITE | PROT_EXEC,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (!facil_data) {
perror("ERROR: Couldn't initialize the facil.io library");
exit(0);
}
memset(facil_data, 0, mem_size);
*facil_data = (struct facil_data_s){.capacity = capa, .parent = getpid()};
facil_external_root_init();
atexit(facil_libcleanup);
#ifdef DEBUG
if (FACIL_PRINT_STATE)
fprintf(stderr,
"Initialized the facil.io library.\n"
"facil.io's memory footprint per connection == %lu Bytes X %lu\n"
"=== facil.io's memory footprint: %lu ===\n\n",
(unsigned long)sizeof(struct connection_data_s),
(unsigned long)facil_data->capacity, (unsigned long)mem_size);
#endif
finish:
spn_unlock(&facil_libinit_lock);
clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle);
}
/** Sets to shutdown flag for the current process.
*
* If a cluster is used and this function is called for a worker, a new worker
* will respawn.
*/
static void facil_stop(void) {
if (!facil_data)
return;
facil_data->active = 0;
if (facil_data->thread_pool)
defer_pool_stop(facil_data->thread_pool);
}
/* *****************************************************************************
The listenning protocol
***************************************************************************** */
#undef facil_listen
static const char *listener_protocol_name =
"listening protocol __facil_internal__";
struct ListenerProtocol {
protocol_s protocol;
void (*on_open)(void *uuid, void *udata);
void *udata;
void (*on_start)(intptr_t uuid, void *udata);
void (*on_finish)(intptr_t uuid, void *udata);
char *port;
char *address;
uint8_t quite;
};
static void listener_ping(intptr_t uuid, protocol_s *plistener) {
// fprintf(stderr, "*** Listener Ping Called for %ld\n", sock_uuid2fd(uuid));
uuid_data(uuid).active = facil_data->last_cycle.tv_sec;
return;
(void)plistener;
}
static void listener_on_data(intptr_t uuid, protocol_s *plistener) {
intptr_t new_client;
if ((new_client = sock_accept(uuid)) == -1) {
if (errno == ECONNABORTED || errno == ECONNRESET)
goto reschedule;
else if (errno != EWOULDBLOCK && errno != EAGAIN)
perror("ERROR: socket accept error");
return;
}
// to defer or not to defer...? TODO: answer the question
struct ListenerProtocol *listener = (struct ListenerProtocol *)plistener;
defer(listener->on_open, (void *)new_client, listener->udata);
reschedule:
facil_force_event(uuid, FIO_EVENT_ON_DATA);
return;
(void)plistener;
}
static void free_listenner(void *li) { free(li); }
static void listener_on_close(intptr_t uuid, protocol_s *plistener) {
struct ListenerProtocol *listener = (void *)plistener;
listener->on_finish(uuid, listener->udata);
if (FACIL_PRINT_STATE && facil_data->parent == getpid()) {
if (listener->port) {
fprintf(stderr, "* Stopped listening on port %s\n", listener->port);
} else {
fprintf(stderr, "* Stopped listening on Unix Socket %s\n",
listener->address);
}
}
if (!listener->port) {
unlink(listener->address);
}
free_listenner(listener);
}
static inline struct ListenerProtocol *
listener_alloc(struct facil_listen_args settings) {
if (!settings.on_start)
settings.on_start = mock_on_finish;
if (!settings.on_finish)
settings.on_finish = mock_on_finish;
size_t port_len = 0;
size_t addr_len = 0;
if (settings.port) {
port_len = strlen(settings.port) + 1;
}
if (settings.address) {
addr_len = strlen(settings.address) + 1;
}
struct ListenerProtocol *listener =
malloc(sizeof(*listener) + addr_len + port_len);
if (listener) {
*listener = (struct ListenerProtocol){
.protocol.service = listener_protocol_name,
.protocol.on_data = listener_on_data,
.protocol.on_close = listener_on_close,
.protocol.ping = listener_ping,
.on_open = (void (*)(void *, void *))settings.on_open,
.udata = settings.udata,
.on_start = settings.on_start,
.on_finish = settings.on_finish,
};
if (settings.port) {
listener->port = (char *)(listener + 1);
memcpy(listener->port, settings.port, port_len);
}
if (settings.address) {
listener->address = (char *)(listener + 1);
listener->address += port_len;
memcpy(listener->address, settings.address, addr_len);
}
return listener;
}
return NULL;
}
inline static void listener_on_start(int fd) {
intptr_t uuid = sock_fd2uuid((int)fd);
if (uuid < 0) {
fprintf(stderr, "ERROR: listening socket dropped?\n");
kill(0, SIGINT);
exit(4);
}
if (evio_add(fd, (void *)uuid) < 0) {
perror("Couldn't register listening socket");
kill(0, SIGINT);
exit(4);
}
fd_data(fd).active = facil_data->last_cycle.tv_sec;
// call the on_init callback
struct ListenerProtocol *listener =
(struct ListenerProtocol *)uuid_data(uuid).protocol;
listener->on_start(uuid, listener->udata);
}
/**
Listens to a server with the following server settings (which MUST include
a default protocol).
This method blocks the current thread until the server is stopped (either
though a `srv_stop` function or when a SIGINT/SIGTERM is received).
*/
int facil_listen(struct facil_listen_args settings) {
if (!facil_data)
facil_lib_init();
if (settings.on_open == NULL) {
errno = EINVAL;
return -1;
}
if (!settings.port || settings.port[0] == 0 ||
(settings.port[0] == '0' && settings.port[1] == 0)) {
settings.port = NULL;
}
intptr_t uuid = sock_listen(settings.address, settings.port);
if (uuid == -1) {
return -1;
}
protocol_s *protocol = (void *)listener_alloc(settings);
facil_attach(uuid, protocol);
if (!protocol) {
sock_close(uuid);
return -1;
}
if (FACIL_PRINT_STATE && facil_data->parent == getpid()) {
if (settings.port)
fprintf(stderr, "* Listening on port %s\n", settings.port);
else
fprintf(stderr, "* Listening on Unix Socket at %s\n", settings.address);
}
return 0;
}
/* *****************************************************************************
Connect (as client)
***************************************************************************** */
static const char *connector_protocol_name = "connect protocol __internal__";
struct ConnectProtocol {
protocol_s protocol;
void (*on_connect)(void *uuid, void *udata);
void (*on_fail)(intptr_t uuid, void *udata);
void *udata;
intptr_t uuid;
uint8_t opened;
};
/* The first `ready` signal is fired when a connection was established */
static void connector_on_ready(intptr_t uuid, protocol_s *_connector) {
struct ConnectProtocol *connector = (void *)_connector;
sock_touch(uuid);
if (connector->opened == 0) {
connector->opened = 1;
facil_set_timeout(uuid, 0); /* remove connection timeout settings */
connector->on_connect((void *)uuid, connector->udata);
}
return;
}
/* If data events reach this protocol, delay their execution. */
static void connector_on_data(intptr_t uuid, protocol_s *connector) {
(void)connector;
(void)uuid;
}
/* Failed to connect */
static void connector_on_close(intptr_t uuid, protocol_s *pconnector) {
struct ConnectProtocol *connector = (void *)pconnector;
if (connector->opened == 0 && connector->on_fail)
connector->on_fail(connector->uuid, connector->udata);
free(connector);
(void)uuid;
}
#undef facil_connect
intptr_t facil_connect(struct facil_connect_args opt) {
intptr_t uuid = -1;
if (!opt.on_connect || (!opt.address && !opt.port))
goto error;
if (!opt.timeout)
opt.timeout = 30;
struct ConnectProtocol *connector = malloc(sizeof(*connector));
if (!connector)
goto error;
*connector = (struct ConnectProtocol){
.on_connect = (void (*)(void *, void *))opt.on_connect,
.on_fail = opt.on_fail,
.udata = opt.udata,
.protocol.service = connector_protocol_name,
.protocol.on_data = connector_on_data,
.protocol.on_ready = connector_on_ready,
.protocol.on_close = connector_on_close,
.opened = 0,
};
uuid = connector->uuid = sock_connect(opt.address, opt.port);
/* check for errors, always invoke the on_fail if required */
if (uuid == -1) {
goto error;
}
if (facil_attach(uuid, &connector->protocol) == -1) {
sock_close(uuid);
goto error;
}
facil_set_timeout(uuid, opt.timeout);
return uuid;
error:
if (opt.on_fail)
opt.on_fail(uuid, opt.udata);
return -1;
}
#define facil_connect(...) \
facil_connect((struct facil_connect_args){__VA_ARGS__})
/* *****************************************************************************
Timers
***************************************************************************** */
/* *******
Timer Protocol
******* */
typedef struct {
protocol_s protocol;
size_t milliseconds;
size_t repetitions;
void (*task)(void *);
void (*on_finish)(void *);
void *arg;
} timer_protocol_s;
#define prot2timer(protocol) (*((timer_protocol_s *)(protocol)))
static const char *timer_protocol_name = "timer protocol __facil_internal__";
static void timer_on_data(intptr_t uuid, protocol_s *protocol) {
prot2timer(protocol).task(prot2timer(protocol).arg);
if (prot2timer(protocol).repetitions == 0)
goto reschedule;
prot2timer(protocol).repetitions -= 1;
if (prot2timer(protocol).repetitions)
goto reschedule;
sock_force_close(uuid);
return;
reschedule:
spn_trylock(&uuid_data(uuid).scheduled);
evio_set_timer(sock_uuid2fd(uuid), (void *)uuid,
prot2timer(protocol).milliseconds);
}
static void timer_on_close(intptr_t uuid, protocol_s *protocol) {
prot2timer(protocol).on_finish(prot2timer(protocol).arg);
free(protocol);
(void)uuid;
}
static void timer_ping(intptr_t uuid, protocol_s *protocol) {
sock_touch(uuid);
(void)protocol;
}
static inline timer_protocol_s *timer_alloc(void (*task)(void *), void *arg,
size_t milliseconds,
size_t repetitions,
void (*on_finish)(void *)) {
if (!on_finish)
on_finish = (void (*)(void *))mock_on_close;
timer_protocol_s *t = malloc(sizeof(*t));
if (t)
*t = (timer_protocol_s){
.protocol.service = timer_protocol_name,
.protocol.on_data = timer_on_data,
.protocol.on_close = timer_on_close,
.protocol.ping = timer_ping,
.arg = arg,
.task = task,
.on_finish = on_finish,
.milliseconds = milliseconds,
.repetitions = repetitions,
};
return t;
}
inline static void timer_on_server_start(int fd) {
if (evio_set_timer(fd, (void *)sock_fd2uuid(fd),
prot2timer(fd_data(fd).protocol).milliseconds)) {
perror("Couldn't register a required timed event.");
kill(0, SIGINT);
exit(4);
}
}
/**
* Creates a system timer (at the cost of 1 file descriptor).
*
* The task will repeat `repetitions` times. If `repetitions` is set to 0, task
* will repeat forever.
*
* Returns -1 on error or the new file descriptor on succeess.
*
* The `on_finish` handler is always called (even on error).
*/
int facil_run_every(size_t milliseconds, size_t repetitions,
void (*task)(void *), void *arg,
void (*on_finish)(void *)) {
if (task == NULL)
goto error_fin;
timer_protocol_s *protocol = NULL;
intptr_t uuid = -1;
int fd = evio_open_timer();
if (fd == -1) {
perror("ERROR: couldn't create a timer fd");
goto error;
}
uuid = sock_open(fd);
if (uuid == -1)
goto error;
protocol = timer_alloc(task, arg, milliseconds, repetitions, on_finish);
if (protocol == NULL)
goto error;
facil_attach(uuid, (protocol_s *)protocol);
if (evio_isactive() && evio_set_timer(fd, (void *)uuid, milliseconds) == -1)
goto error;
return 0;
error:
if (uuid != -1) {
const int old = errno;
sock_close(uuid);
errno = old;
} else if (fd != -1) {
const int old = errno;
close(fd);
errno = old;
}
error_fin:
if (on_finish) {
const int old = errno;
on_finish(arg);
errno = old;
}
return -1;
}
/* *****************************************************************************
Cluster Messaging - using Unix Sockets
***************************************************************************** */
#ifdef __BIG_ENDIAN__
inline static uint32_t cluster_str2uint32(uint8_t *str) {
return ((str[0] & 0xFF) | (((uint32_t)str[1] << 8) & 0xFF00) |
(((uint32_t)str[2] << 16) & 0xFF0000) |
(((uint32_t)str[3] << 24) & 0xFF000000));
}
inline static void cluster_uint2str(uint8_t *dest, uint32_t i) {
dest[0] = i & 0xFF;
dest[1] = (i >> 8) & 0xFF;
dest[2] = (i >> 16) & 0xFF;
dest[3] = (i >> 24) & 0xFF;
}
#else
inline static uint32_t cluster_str2uint32(uint8_t *str) {
return ((((uint32_t)str[0] << 24) & 0xFF000000) |
(((uint32_t)str[1] << 16) & 0xFF0000) |
(((uint32_t)str[2] << 8) & 0xFF00) | (str[3] & 0xFF));
}
inline static void cluster_uint2str(uint8_t *dest, uint32_t i) {
dest[0] = (i >> 24) & 0xFF;
dest[1] = (i >> 16) & 0xFF;
dest[2] = (i >> 8) & 0xFF;
dest[3] = i & 0xFF;
}
#endif
#define CLUSTER_READ_BUFFER 16384
typedef struct {
protocol_s pr;
FIOBJ channel;
FIOBJ msg;
uint32_t exp_channel;
uint32_t exp_msg;
uint32_t type;
int32_t filter;
uint32_t length;
uint8_t buffer[];
} cluster_pr_s;
typedef struct {
void (*on_message)(int32_t filter, FIOBJ, FIOBJ);
FIOBJ channel;
FIOBJ msg;
int32_t filter;
} cluster_msg_data_s;
static void cluster_on_new_peer(intptr_t srv, protocol_s *pr);
static void cluster_on_listening_ping(intptr_t srv, protocol_s *pr);
static void cluster_on_listening_close(intptr_t srv, protocol_s *pr);
static struct {
protocol_s listening;
intptr_t root;
fio_hash_s clients;
fio_hash_s handlers;
spn_lock_i lock;
uint8_t client_mode;
char cluster_name[128];
} facil_cluster_data = {
.lock = SPN_LOCK_INIT,
.root = -1,
.listening =
{
.on_close = cluster_on_listening_close,
.ping = cluster_on_listening_ping,
.on_data = cluster_on_new_peer,
},
};
enum cluster_message_type_e {
CLUSTER_MESSAGE_FORWARD,
CLUSTER_MESSAGE_JSON,
CLUSTER_MESSAGE_SHUTDOWN,
CLUSTER_MESSAGE_ERROR,
CLUSTER_MESSAGE_PING,
};
static void cluster_deferred_handler(void *msg_data_, void *ignr) {
cluster_msg_data_s *data = msg_data_;
data->on_message(data->filter, data->channel, data->msg);
fiobj_free(data->channel);
fiobj_free(data->msg);
free(data);
(void)ignr;
}
static void cluster_forward_msg2handlers(cluster_pr_s *c) {
spn_lock(&facil_cluster_data.lock);
void *target_ =
fio_hash_find(&facil_cluster_data.handlers, (FIO_HASH_KEY_TYPE)c->filter);
spn_unlock(&facil_cluster_data.lock);
// fprintf(stderr, "handler for %d: %p\n", c->filter, target_);
if (target_) {
cluster_msg_data_s *data = malloc(sizeof(*data));
if (!data) {
perror("FATAL ERROR: (facil.io cluster) couldn't allocate memory");
exit(errno);
}
*data = (cluster_msg_data_s){
.on_message = ((cluster_msg_data_s *)(&target_))->on_message,
.channel = fiobj_dup(c->channel),
.msg = fiobj_dup(c->msg),
.filter = c->filter,
};
defer(cluster_deferred_handler, data, NULL);
}
}
static inline FIOBJ cluster_wrap_message(uint32_t ch_len, uint32_t msg_len,
uint32_t type, int32_t id,
uint8_t *ch_data, uint8_t *msg_data) {
FIOBJ buf = fiobj_str_buf(ch_len + msg_len + 16);
fio_cstr_s f = fiobj_obj2cstr(buf);
cluster_uint2str(f.bytes, ch_len);
cluster_uint2str(f.bytes + 4, msg_len);
cluster_uint2str(f.bytes + 8, type);
cluster_uint2str(f.bytes + 12, (uint32_t)id);
if (ch_data) {
memcpy(f.bytes + 16, ch_data, ch_len);
}
if (msg_data) {
memcpy(f.bytes + 16 + ch_len, msg_data, msg_len);
}
fiobj_str_resize(buf, ch_len + msg_len + 16);
return buf;
}
static inline void cluster_send2clients(uint32_t ch_len, uint32_t msg_len,
uint32_t type, int32_t id,
uint8_t *ch_data, uint8_t *msg_data,
intptr_t uuid) {
if (facil_cluster_data.clients.count == 0)
return;
FIOBJ forward =
cluster_wrap_message(ch_len, msg_len, type, id, ch_data, msg_data);
spn_lock(&facil_cluster_data.lock);
FIO_HASH_FOR_LOOP(&facil_cluster_data.clients, i) {
if (i->obj) {
if ((intptr_t)i->key != uuid)
fiobj_send_free((intptr_t)i->key, fiobj_dup(forward));
}
}
spn_unlock(&facil_cluster_data.lock);
fiobj_free(forward);
}
static inline void cluster_send2traget(uint32_t ch_len, uint32_t msg_len,
uint32_t type, int32_t id,
uint8_t *ch_data, uint8_t *msg_data) {
if (facil_cluster_data.client_mode) {
FIOBJ forward =
cluster_wrap_message(ch_len, msg_len, type, id, ch_data, msg_data);
fiobj_send_free(facil_cluster_data.root, fiobj_dup(forward));
} else {
cluster_send2clients(ch_len, msg_len, type, id, ch_data, msg_data, 0);
}
}
static void cluster_on_client_message(cluster_pr_s *c, intptr_t uuid) {
switch ((enum cluster_message_type_e)c->type) {
case CLUSTER_MESSAGE_JSON: {
fio_cstr_s s = fiobj_obj2cstr(c->channel);
FIOBJ tmp = FIOBJ_INVALID;
if (fiobj_json2obj(&tmp, s.bytes, s.len)) {
fiobj_free(c->channel);
c->channel = tmp;
tmp = FIOBJ_INVALID;
} else {
fprintf(stderr,
"WARNING: (facil.io cluster) JSON message isn't valid JSON.\n");
}
s = fiobj_obj2cstr(c->msg);
if (fiobj_json2obj(&tmp, s.bytes, s.len)) {
fiobj_free(c->msg);
c->msg = tmp;
} else {
fprintf(stderr,
"WARNING: (facil.io cluster) JSON message isn't valid JSON.\n");
}
}
/* fallthrough */
case CLUSTER_MESSAGE_FORWARD:
cluster_forward_msg2handlers(c);
break;
case CLUSTER_MESSAGE_ERROR:
case CLUSTER_MESSAGE_SHUTDOWN:
facil_stop();
facil_cluster_data.root = -1;
sock_close(uuid);
break;
case CLUSTER_MESSAGE_PING:
/* do nothing, really. */
break;
}
}
static void cluster_on_server_message(cluster_pr_s *c, intptr_t uuid) {
switch ((enum cluster_message_type_e)c->type) {
case CLUSTER_MESSAGE_JSON:
case CLUSTER_MESSAGE_FORWARD: {
if (fio_hash_count(&facil_cluster_data.clients)) {
fio_cstr_s cs = fiobj_obj2cstr(c->channel);
fio_cstr_s ms = fiobj_obj2cstr(c->msg);
cluster_send2clients((uint32_t)cs.len, (uint32_t)ms.len, c->type,
c->filter, cs.bytes, ms.bytes, uuid);
}
if (c->type == CLUSTER_MESSAGE_JSON) {
fio_cstr_s s = fiobj_obj2cstr(c->channel);
FIOBJ tmp = FIOBJ_INVALID;
if (fiobj_json2obj(&tmp, s.bytes, s.len)) {
fiobj_free(c->channel);
c->channel = tmp;
tmp = FIOBJ_INVALID;
} else {
fprintf(stderr,
"WARNING: (facil.io cluster) JSON message isn't valid JSON.\n");
}
s = fiobj_obj2cstr(c->msg);
if (fiobj_json2obj(&tmp, s.bytes, s.len)) {
fiobj_free(c->msg);
c->msg = tmp;
} else {
fprintf(stderr,
"WARNING: (facil.io cluster) JSON message isn't valid JSON.\n");
}
}
cluster_forward_msg2handlers(c);
break;
}
case CLUSTER_MESSAGE_SHUTDOWN:
facil_stop();
break;
case CLUSTER_MESSAGE_ERROR:
case CLUSTER_MESSAGE_PING:
/* do nothing, really. */
break;
}
}
static void cluster_on_client_close(intptr_t uuid, protocol_s *pr_) {
cluster_pr_s *c = (cluster_pr_s *)pr_;
/* no shutdown message received - parent crashed. */
if (facil_cluster_data.root == uuid && c->type != CLUSTER_MESSAGE_SHUTDOWN &&
facil_data->active) {
if (FACIL_PRINT_STATE)
fprintf(stderr,
"* (%d) Parent Process crash detected, signaling for exit.\n",
getpid());
facil_stop();
unlink(facil_cluster_data.cluster_name);
}
fiobj_free(c->msg);
fiobj_free(c->channel);
free(c);
if (facil_cluster_data.root == uuid)
facil_cluster_data.root = -1;
}
static void cluster_on_server_close(intptr_t uuid, protocol_s *pr_) {
if (facil_cluster_data.client_mode || facil_parent_pid() != getpid()) {
cluster_on_client_close(uuid, pr_); /* we respawned. */
return;
}
spn_lock(&facil_cluster_data.lock);
fio_hash_insert(&facil_cluster_data.clients, (FIO_HASH_KEY_TYPE)uuid, NULL);
// fio_hash_compact(&facil_cluster_data.clients);
spn_unlock(&facil_cluster_data.lock);
cluster_pr_s *c = (cluster_pr_s *)pr_;
fiobj_free(c->msg);
fiobj_free(c->channel);
free(c);
}
static void cluster_on_shutdown(intptr_t uuid, protocol_s *pr_) {
cluster_send2traget(0, 0, CLUSTER_MESSAGE_SHUTDOWN, 0, NULL, NULL);
facil_force_event(uuid, FIO_EVENT_ON_READY);
(void)pr_;
(void)uuid;
}
static void cluster_on_data(intptr_t uuid, protocol_s *pr_) {
cluster_pr_s *c = (cluster_pr_s *)pr_;
ssize_t i =
sock_read(uuid, c->buffer + c->length, CLUSTER_READ_BUFFER - c->length);
if (i <= 0)
return;
c->length += i;
i = 0;
do {
if (!c->exp_channel && !c->exp_msg) {
if (c->length - i < 16)
break;
c->exp_channel = cluster_str2uint32(c->buffer + i);
c->exp_msg = cluster_str2uint32(c->buffer + i + 4);
c->type = cluster_str2uint32(c->buffer + i + 8);
c->filter = (int32_t)cluster_str2uint32(c->buffer + i + 12);
if (c->exp_channel)
c->channel = fiobj_str_buf(c->exp_channel);
if (c->exp_msg)
c->msg = fiobj_str_buf(c->exp_msg);
i += 16;
}
if (c->exp_channel) {
if (c->exp_channel + i > c->length) {
fiobj_str_write(c->channel, (char *)c->buffer + i,
(size_t)(c->length - i));
i = c->length;
c->exp_channel -= i;
break;
} else {
fiobj_str_write(c->channel, (char *)c->buffer + i, c->exp_channel);
i += c->exp_channel;
c->exp_channel = 0;
}
}
if (c->exp_msg) {
if (c->exp_msg + i > c->length) {
fiobj_str_write(c->msg, (char *)c->buffer + i, (size_t)(c->length - i));
i = c->length;
c->exp_msg -= i;
break;
} else {
fiobj_str_write(c->msg, (char *)c->buffer + i, c->exp_msg);
i += c->exp_msg;
c->exp_msg = 0;
}
}
if (facil_cluster_data.client_mode) {
cluster_on_client_message(c, uuid);
} else {
cluster_on_server_message(c, uuid);
}
fiobj_free(c->msg);
fiobj_free(c->channel);
c->msg = FIOBJ_INVALID;
c->channel = FIOBJ_INVALID;
} while (c->length > i);
c->length -= i;
if (c->length) {
memmove(c->buffer, c->buffer + i, c->length);
}
(void)pr_;
}
static void cluster_ping(intptr_t uuid, protocol_s *pr_) {
static uint8_t buffer[12];
cluster_uint2str(buffer, (uint32_t)0);
cluster_uint2str(buffer + 4, CLUSTER_MESSAGE_PING);
cluster_uint2str(buffer + 8, 0);
sock_write2(.uuid = uuid, .buffer = buffer, .length = 12,
.dealloc = SOCK_DEALLOC_NOOP);
(void)pr_;
}
static void cluster_on_open(intptr_t fd, void *udata) {
cluster_pr_s *pr = malloc(sizeof(*pr) + CLUSTER_READ_BUFFER);
*pr = (cluster_pr_s){
.pr =
{
.service = "facil_io_cluster_protocol",
.on_data = cluster_on_data,
.on_shutdown = cluster_on_shutdown,
.on_close =
(facil_cluster_data.client_mode ? cluster_on_client_close
: cluster_on_server_close),
.ping = cluster_ping,
},
};
if (facil_cluster_data.root >= 0 && facil_cluster_data.root != fd) {
// if (facil_parent_pid() != getpid()) {
// fprintf(stderr,
// "WARNING: non-root process adding client...\n"
// " adding to clients %p (root == %p)\n",
// (void *)fd, (void *)facil_cluster_data.root);
// } else {
// fprintf(stderr, "INFO: root process adding child connection %p\n",
// (void *)fd);
// }
spn_lock(&facil_cluster_data.lock);
fio_hash_insert(&facil_cluster_data.clients, (FIO_HASH_KEY_TYPE)fd,
(void *)fd);
spn_unlock(&facil_cluster_data.lock);
} else if (facil_parent_pid() != getpid()) {
// fprintf(stderr, "INFO: child process registering...%p \n", (void *)fd);
}
if (facil_attach(fd, &pr->pr) == -1) {
fprintf(stderr, "(%d) ", getpid());
perror("ERROR: (facil.io cluster) couldn't attach connection");
}
(void)udata;
}
static void cluster_on_new_peer(intptr_t srv, protocol_s *pr) {
intptr_t client = sock_accept(srv);
if (client == -1) {
// fprintf(stderr,
// "ERROR: (facil.io cluster) couldn't accept connection\n");
} else {
cluster_on_open(client, NULL);
}
(void)pr;
}
static void cluster_on_listening_close(intptr_t srv, protocol_s *pr) {
if (facil_parent_pid() == getpid()) {
unlink(facil_cluster_data.cluster_name);
if (facil_cluster_data.root == srv)
facil_cluster_data.root = -1;
}
(void)srv;
(void)pr;
}
static void cluster_on_listening_ping(intptr_t srv, protocol_s *pr) {
sock_touch(srv);
(void)pr;
}
static int cluster_on_start(void) {
if (facil_data->active <= 1)
return 0;
if (facil_parent_pid() == getpid()) {
facil_cluster_data.client_mode = 0;
if (facil_attach(facil_cluster_data.root, &facil_cluster_data.listening)) {
perror("FATAL ERROR: (facil.io) couldn't attach cluster socket");
}
} else {
facil_cluster_data.client_mode = 1;
fio_hash_free(&facil_cluster_data.clients);
// FIO_HASH_FOR_FREE(&facil_cluster_data.clients, pos) {
// if (!pos->obj)
// continue;
// close(sock_uuid2fd(pos->key));
// sock_force_close(pos->key);
// }
facil_cluster_data.clients = (fio_hash_s)FIO_HASH_INIT;
if (facil_cluster_data.root != -1) {
close(sock_uuid2fd(facil_cluster_data.root)); /* prevent `shutdown` */
sock_force_close(facil_cluster_data.root);
}
facil_cluster_data.root =
facil_connect(.address = facil_cluster_data.cluster_name,
.on_connect = cluster_on_open);
if (facil_cluster_data.root == -1) {
perror(
"FATAL ERROR: (facil.io cluster) couldn't connect to cluster socket");
fprintf(stderr, " socket: %s\n", facil_cluster_data.cluster_name);
facil_stop();
return -1;
}
}
return 0;
}
static int facil_cluster_init(void) {
if (facil_data->active <= 1)
return 0;
/* create a unique socket name */
char *tmp_folder = getenv("TMPDIR");
uint32_t tmp_folder_len = 0;
if (!tmp_folder || ((tmp_folder_len = (uint32_t)strlen(tmp_folder)) > 100)) {
#ifdef P_tmpdir
tmp_folder = P_tmpdir;
if (tmp_folder)
tmp_folder_len = (uint32_t)strlen(tmp_folder);
#else
tmp_folder = "/tmp/";
tmp_folder_len = 5;
#endif
}
if (tmp_folder_len >= 100)
tmp_folder_len = 0;
if (tmp_folder_len) {
memcpy(facil_cluster_data.cluster_name, tmp_folder, tmp_folder_len);
if (facil_cluster_data.cluster_name[tmp_folder_len - 1] != '/')
facil_cluster_data.cluster_name[tmp_folder_len++] = '/';
}
memcpy(facil_cluster_data.cluster_name + tmp_folder_len, "facil-io-sock-",
14);
tmp_folder_len += 14;
tmp_folder_len +=
fio_ltoa(facil_cluster_data.cluster_name + tmp_folder_len, getpid(), 8);
facil_cluster_data.cluster_name[tmp_folder_len] = 0;
/* remove if existing */
unlink(facil_cluster_data.cluster_name);
/* create, bind, listen */
facil_cluster_data.root = sock_listen(facil_cluster_data.cluster_name, NULL);
if (facil_cluster_data.root == -1) {
perror("FATAL ERROR: (facil.io cluster) failed to open cluster socket.\n"
" check file permissions");
return -1;
}
return 0;
}
void facil_cluster_set_handler(int32_t filter,
void (*on_message)(int32_t id, FIOBJ ch,
FIOBJ msg)) {
spn_lock(&facil_cluster_data.lock);
fio_hash_insert(&facil_cluster_data.handlers, (uint64_t)filter,
(void *)(uintptr_t)on_message);
spn_unlock(&facil_cluster_data.lock);
}
int facil_cluster_send(int32_t filter, FIOBJ ch, FIOBJ msg) {
if (!facil_data) {
fprintf(stderr, "ERROR: cluster inactive, can't send message.\n");
return -1;
}
uint32_t type = CLUSTER_MESSAGE_FORWARD;
if ((!ch || FIOBJ_TYPE_IS(ch, FIOBJ_T_STRING)) &&
(!msg || FIOBJ_TYPE_IS(msg, FIOBJ_T_STRING))) {
fiobj_dup(ch);
fiobj_dup(msg);
} else {
type = CLUSTER_MESSAGE_JSON;
ch = fiobj_obj2json(ch, 0);
msg = fiobj_obj2json(msg, 0);
}
fio_cstr_s cs = fiobj_obj2cstr(ch);
fio_cstr_s ms = fiobj_obj2cstr(msg);
cluster_send2traget((uint32_t)cs.len, (uint32_t)ms.len, type, filter,
cs.bytes, ms.bytes);
fiobj_free(ch);
fiobj_free(msg);
return 0;
}
/* *****************************************************************************
Running the server
***************************************************************************** */
static void print_pid(void *arg, void *ignr) {
(void)arg;
(void)ignr;
fprintf(stderr, "* %d is running.\n", getpid());
}
static void facil_review_timeout(void *arg, void *ignr) {
(void)ignr;
protocol_s *tmp;
time_t review = facil_data->last_cycle.tv_sec;
intptr_t fd = (intptr_t)arg;
uint16_t timeout = fd_data(fd).timeout;
if (!timeout)
timeout = 300; /* enforced timout settings */
if (!fd_data(fd).protocol || (fd_data(fd).active + timeout >= review))
goto finish;
tmp = protocol_try_lock(fd, FIO_PR_LOCK_STATE);
if (!tmp)
goto reschedule;
if (prt_meta(tmp).locks[FIO_PR_LOCK_TASK] ||
prt_meta(tmp).locks[FIO_PR_LOCK_WRITE])
goto unlock;
defer(deferred_ping, (void *)sock_fd2uuid((int)fd), NULL);
unlock:
protocol_unlock(tmp, FIO_PR_LOCK_STATE);
finish:
do {
fd++;
} while (!fd_data(fd).protocol && (fd < facil_data->capacity) &&
facil_data->active);
if (facil_data->capacity <= fd || facil_data->active == 0) {
facil_data->need_review = 1;
return;
}
reschedule:
defer(facil_review_timeout, (void *)fd, NULL);
}
static void perform_idle(void *arg, void *ignr) {
facil_data->on_idle();
(void)arg;
(void)ignr;
}
static void facil_cycle(void *arg, void *ignr) {
(void)ignr;
static int idle = 0;
clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle);
int events;
if (defer_has_queue()) {
events = evio_review(0);
if (events < 0) {
goto error;
}
if (events > 0)
idle = 1;
} else {
events = evio_review(512);
if (events < 0)
goto error;
if (events > 0) {
idle = 1;
} else if (idle) {
defer(perform_idle, arg, ignr);
idle = 0;
}
}
if (!facil_data->active)
return;
static time_t last_to_review = 0;
if (facil_data->need_review &&
facil_data->last_cycle.tv_sec != last_to_review) {
last_to_review = facil_data->last_cycle.tv_sec;
facil_data->need_review = 0;
defer(facil_review_timeout, (void *)0, NULL);
}
defer(facil_cycle, arg, ignr);
return;
error:
if (facil_data->active)
defer(facil_cycle, arg, ignr);
(void)1;
}
/**
OVERRIDE THIS to replace the default `fork` implementation or to inject hooks
into the forking function.
Behaves like the system's `fork`.
*/
#pragma weak facil_fork
int facil_fork(void) { return (int)fork(); }
/** This will be called by child processes, make sure to unlock any existing
* locks.
*
* Known locks:
* * `defer` tasks lock.
* * `sock` packet pool lock.
* * `sock` connection lock (per connection).
* * `facil` global lock.
* * `facil` pub/sub lock.
* * `facil` connection data lock (per connection data).
* * `facil` protocol lock (per protocol object, placed in `rsv`).
* * `pubsub` pubsub global lock (should be initialized in facil_external_init.
* * `pubsub` pubsub client lock (should be initialized in facil_external_init.
*/
static void facil_worker_startup(uint8_t sentinel) {
facil_cluster_data.lock = facil_data->global_lock = SPN_LOCK_INIT;
defer_on_fork();
evio_create();
clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle);
facil_external_init();
if (facil_data->active == 1) {
/* single process */
for (int i = 0; i < facil_data->capacity; i++) {
errno = 0;
fd_data(i).lock = SPN_LOCK_INIT;
if (fd_data(i).protocol) {
fd_data(i).protocol->rsv = 0;
if (fd_data(i).protocol->service == listener_protocol_name)
listener_on_start(i);
else if (fd_data(i).protocol->service == timer_protocol_name)
timer_on_server_start(i);
else {
evio_add(i, (void *)sock_fd2uuid(i));
}
}
}
} else if (sentinel == 0) {
/* child process */
for (int i = 0; i < facil_data->capacity; i++) {
errno = 0;
fd_data(i).lock = SPN_LOCK_INIT;
if (fd_data(i).protocol) {
fd_data(i).protocol->rsv = 0;
if (fd_data(i).protocol->service == listener_protocol_name)
listener_on_start(i);
else if (fd_data(i).protocol->service == timer_protocol_name)
timer_on_server_start(i);
else {
/* prevent normal connections from being shared across workers */
close(i);
sock_force_close(sock_fd2uuid(i));
}
}
}
} else {
/* sentinel process - ignore listening sockets, but keep them open */
for (int i = 0; i < facil_data->capacity; i++) {
fd_data(i).lock = SPN_LOCK_INIT;
if (fd_data(i).protocol) {
fd_data(i).protocol->rsv = 0;
if (fd_data(i).protocol->service == timer_protocol_name)
timer_on_server_start(i);
else if (fd_data(i).protocol->service != listener_protocol_name)
evio_add(i, (void *)sock_fd2uuid(i));
}
}
}
/* called after connection cleanup, as it will close any connections. */
if (cluster_on_start()) {
facil_data->thread_pool = NULL;
return;
}
/* called after `evio_create` but before actually reacting to events. */
facil_data->need_review = 1;
defer(facil_cycle, NULL, NULL);
if (FACIL_PRINT_STATE) {
if (sentinel || facil_data->parent == getpid()) {
fprintf(stderr,
"Server is running %u %s X %u %s, press ^C to stop\n"
"* Root pid: %d\n",
facil_data->active, facil_data->active > 1 ? "workers" : "worker",
facil_data->threads,
facil_data->threads > 1 ? "threads" : "thread",
facil_data->parent);
} else {
defer(print_pid, NULL, NULL);
}
}
facil_data->thread_pool =
defer_pool_start((sentinel ? 1 : facil_data->threads));
}
static void facil_worker_cleanup(void) {
facil_data->active = 0;
fprintf(stderr, "* %d cleanning up.\n", getpid());
for (int i = 0; i < facil_data->capacity; i++) {
intptr_t uuid;
if (fd_data(i).protocol && (uuid = sock_fd2uuid(i)) >= 0) {
defer(deferred_on_shutdown, (void *)uuid, NULL);
}
}
evio_review(100);
defer_perform();
sock_flush_all();
evio_review(0);
sock_flush_all();
facil_data->on_finish();
defer_perform();
evio_close();
facil_external_cleanup();
if (facil_data->parent == getpid()) {
while (wait(NULL) != -1)
;
if (FACIL_PRINT_STATE) {
fprintf(stderr, "\n --- Completed Shutdown ---\n");
}
}
}
static void facil_sentinel_task(void *arg1, void *arg2);
static void *facil_sentinel_worker_thread(void *arg) {
errno = 0;
pid_t child = facil_fork();
if (child == -1) {
perror("FATAL ERROR: couldn't spawn workers at startup");
kill(facil_parent_pid(), SIGINT);
facil_stop();
return NULL;
} else if (child) {
int status;
waitpid(child, &status, 0);
#if DEBUG
if (facil_data->active) { /* !WIFEXITED(status) || WEXITSTATUS(status) */
fprintf(stderr,
"FATAL ERROR: Child worker (%d) crashed. Stopping services in "
"DEBUG mode.\n",
child);
kill(0, SIGINT);
}
#else
if (facil_data->active) {
fprintf(stderr, "ERROR: Child worker (%d) crashed. Respawning worker.\n",
child);
defer(facil_sentinel_task, (void *)1, NULL);
}
#endif
} else {
defer_on_fork();
if (arg) {
/* respawn */
defer_clear_queue();
}
facil_worker_startup(0);
if (facil_data->thread_pool)
defer_pool_wait(facil_data->thread_pool);
else if (facil_parent_pid() != getpid()) {
kill(facil_parent_pid(), SIGINT);
}
facil_data->thread_pool = NULL;
facil_worker_cleanup();
exit(0);
}
(void)arg;
return NULL;
}
static void facil_sentinel_task(void *arg1, void *arg2) {
void *thrd = defer_new_thread(facil_sentinel_worker_thread, arg1);
defer_free_thread(thrd);
if (facil_parent_pid() == getpid())
facil_cluster_data.listening.on_data(facil_cluster_data.root,
&facil_cluster_data.listening);
(void)arg1;
(void)arg2;
}
/* handles the SIGINT and SIGTERM signals by shutting down workers */
static void sig_int_handler(int sig) {
if (sig != SIGINT && sig != SIGTERM)
return;
facil_stop();
}
/* handles the SIGINT and SIGTERM signals by shutting down workers */
static void facil_setp_signal_handler(void) {
/* setup signal handling */
struct sigaction act, old;
act.sa_handler = sig_int_handler;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGINT, &act, &old)) {
perror("couldn't set signal handler");
return;
};
if (sigaction(SIGTERM, &act, &old)) {
perror("couldn't set signal handler");
return;
};
act.sa_handler = SIG_IGN;
if (sigaction(SIGPIPE, &act, &old)) {
perror("couldn't set signal handler");
return;
};
}
/*
* Zombie Reaping
* With thanks to Dr Graham D Shaw.
* http://www.microhowto.info/howto/reap_zombie_processes_using_a_sigchld_handler.html
*/
static void reap_child_handler(int sig) {
(void)(sig);
int old_errno = errno;
while (waitpid(-1, NULL, WNOHANG) > 0)
;
errno = old_errno;
}
/* initializes zombie reaping for the process */
void facil_reap_children(void) {
struct sigaction sa;
sa.sa_handler = reap_child_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGCHLD, &sa, 0) == -1) {
perror("Child reaping initialization failed");
kill(0, SIGINT);
exit(errno);
}
}
/** returns facil.io's parent (root) process pid. */
pid_t facil_parent_pid(void) {
if (!facil_data)
facil_lib_init();
return facil_data->parent;
}
#undef facil_run
void facil_run(struct facil_run_args args) {
signal(SIGPIPE, SIG_IGN);
if (!facil_data)
facil_lib_init();
if (!args.on_idle)
args.on_idle = mock_idle;
if (!args.on_finish)
args.on_finish = mock_idle;
#ifdef _SC_NPROCESSORS_ONLN
if (!args.threads && !args.processes) {
ssize_t cpu_count = sysconf(_SC_NPROCESSORS_ONLN);
if (cpu_count > 0)
args.threads = args.processes = (int16_t)cpu_count;
} else if (args.threads < 0 || args.processes < 0) {
ssize_t cpu_count = sysconf(_SC_NPROCESSORS_ONLN);
#if FACIL_CPU_CORES_LIMIT
if (cpu_count > FACIL_CPU_CORES_LIMIT) {
fprintf(stderr,
"WARNING: facil.io detected %zu cores. Capping number of cores "
"at %zu\n",
cpu_count, (size_t)FACIL_CPU_CORES_LIMIT);
cpu_count = FACIL_CPU_CORES_LIMIT;
}
#endif
if (cpu_count > 0) {
if (args.threads < 0)
args.threads = (int16_t)cpu_count;
if (args.processes < 0)
args.processes = (int16_t)cpu_count;
}
}
#endif
if (args.processes <= 0)
args.processes = 1;
if (args.threads <= 0)
args.threads = 1;
/* listen to SIGINT / SIGTERM */
facil_setp_signal_handler();
/* activate facil, fork if needed */
facil_data->active = (uint16_t)args.processes;
facil_data->threads = (uint16_t)args.threads;
facil_data->on_finish = args.on_finish;
facil_data->on_idle = args.on_idle;
/* initialize cluster */
if (args.processes > 1) {
if (facil_cluster_init()) {
kill(0, SIGINT);
if (FACIL_PRINT_STATE) {
fprintf(stderr, "\n !!! Crashed trying to "
"start the service !!!\n");
}
exit(-1);
}
for (int i = 0; i < args.processes && facil_data->active; ++i) {
facil_sentinel_task(NULL, NULL);
}
facil_worker_startup(1);
} else {
facil_worker_startup(0);
}
if (facil_data->thread_pool)
defer_pool_wait(facil_data->thread_pool);
facil_data->thread_pool = NULL;
if (args.processes > 1) {
facil_stop();
kill(0, SIGINT);
}
facil_worker_cleanup();
}
/**
* returns true (1) if the facil.io engine is already running.
*/
int facil_is_running(void) { return facil_data->active > 0; }
/* *****************************************************************************
Setting the protocol
***************************************************************************** */
static int facil_attach_state(intptr_t uuid, protocol_s *protocol,
protocol_metadata_s state) {
if (uuid == -1) {
errno = EBADF;
return -1;
}
if (!facil_data)
facil_lib_init();
if (protocol) {
if (!protocol->on_close)
protocol->on_close = mock_on_close;
if (!protocol->on_data)
protocol->on_data = mock_on_ev;
if (!protocol->on_ready)
protocol->on_ready = mock_on_ev;
if (!protocol->ping)
protocol->ping = mock_ping;
if (!protocol->on_shutdown)
protocol->on_shutdown = mock_on_ev;
prt_meta(protocol) = state;
}
spn_lock(&uuid_data(uuid).lock);
if (!sock_isvalid(uuid)) {
spn_unlock(&uuid_data(uuid).lock);
if (protocol)
defer(deferred_on_close, (void *)uuid, protocol);
errno = ENOTCONN;
return -1;
}
protocol_s *old_protocol = uuid_data(uuid).protocol;
uuid_data(uuid).protocol = protocol;
uuid_data(uuid).active = facil_data->last_cycle.tv_sec;
spn_unlock(&uuid_data(uuid).lock);
if (old_protocol)
defer(deferred_on_close, (void *)uuid, old_protocol);
else if (evio_isactive()) {
return evio_add(sock_uuid2fd(uuid), (void *)uuid);
}
return 0;
}
/** Attaches (or updates) a protocol object to a socket UUID.
* Returns -1 on error and 0 on success.
*/
int facil_attach(intptr_t uuid, protocol_s *protocol) {
return facil_attach_state(uuid, protocol, (protocol_metadata_s){.rsv = 0});
}
/**
* Attaches (or updates) a LOCKED protocol object to a socket UUID.
*/
int facil_attach_locked(intptr_t uuid, protocol_s *protocol) {
{
protocol_metadata_s state = {.rsv = 0};
spn_lock(state.locks + FIO_PR_LOCK_TASK);
return facil_attach_state(uuid, protocol, state);
}
}
/** Sets a timeout for a specific connection (if active). */
void facil_set_timeout(intptr_t uuid, uint8_t timeout) {
if (sock_isvalid(uuid)) {
uuid_data(uuid).active = facil_data->last_cycle.tv_sec;
uuid_data(uuid).timeout = timeout;
}
}
/** Gets a timeout for a specific connection. Returns 0 if there's no set
* timeout or the connection is inactive. */
uint8_t facil_get_timeout(intptr_t uuid) { return uuid_data(uuid).timeout; }
/* *****************************************************************************
Misc helpers
***************************************************************************** */
/**
Returns the last time the server reviewed any pending IO events.
*/
struct timespec facil_last_tick(void) {
if (!facil_data) {
facil_lib_init();
clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle);
}
return facil_data->last_cycle;
}
/**
* This function allows out-of-task access to a connection's `protocol_s`
* object by attempting to lock it.
*/
protocol_s *facil_protocol_try_lock(intptr_t uuid,
enum facil_protocol_lock_e type) {
if (!sock_isvalid(uuid) || !uuid_data(uuid).protocol) {
errno = EBADF;
return NULL;
}
return protocol_try_lock(sock_uuid2fd(uuid), type);
}
/** See `facil_protocol_try_lock` for details. */
void facil_protocol_unlock(protocol_s *pr, enum facil_protocol_lock_e type) {
if (!pr)
return;
protocol_unlock(pr, type);
}
/** Counts all the connections of a specific type. */
size_t facil_count(void *service) {
size_t count = 0;
for (intptr_t i = 0; i < facil_data->capacity; i++) {
void *tmp = NULL;
spn_lock(&fd_data(i).lock);
if (fd_data(i).protocol && fd_data(i).protocol->service)
tmp = (void *)fd_data(i).protocol->service;
spn_unlock(&fd_data(i).lock);
if (tmp != listener_protocol_name && tmp != timer_protocol_name &&
(!service || (tmp == service)))
count++;
}
return count;
}
/* *****************************************************************************
Task Management - `facil_defer`, `facil_each`
***************************************************************************** */
struct task {
intptr_t origin;
void (*func)(intptr_t uuid, protocol_s *, void *arg);
void *arg;
void (*on_done)(intptr_t uuid, void *arg);
const void *service;
uint32_t count;
enum facil_protocol_lock_e task_type;
spn_lock_i lock;
};
static inline struct task *alloc_facil_task(void) {
return malloc(sizeof(struct task));
}
static inline void free_facil_task(struct task *task) { free(task); }
static void mock_on_task_done(intptr_t uuid, void *arg) {
(void)uuid;
(void)arg;
}
static void perform_single_task(void *v_uuid, void *v_task) {
struct task *task = v_task;
if (!uuid_data(v_uuid).protocol)
goto fallback;
protocol_s *pr = protocol_try_lock(sock_uuid2fd(v_uuid), task->task_type);
if (!pr)
goto defer;
if (pr->service == connector_protocol_name) {
protocol_unlock(pr, task->task_type);
goto defer;
}
task->func((intptr_t)v_uuid, pr, task->arg);
protocol_unlock(pr, task->task_type);
free_facil_task(task);
return;
fallback:
task->on_done((intptr_t)v_uuid, task->arg);
free_facil_task(task);
return;
defer:
defer(perform_single_task, v_uuid, v_task);
return;
}
static void finish_multi_task(void *v_fd, void *v_task) {
struct task *task = v_task;
if (spn_trylock(&task->lock))
goto reschedule;
task->count--;
if (task->count) {
spn_unlock(&task->lock);
return;
}
task->on_done(task->origin, task->arg);
free_facil_task(task);
return;
reschedule:
defer(finish_multi_task, v_fd, v_task);
}
static void perform_multi_task(void *v_fd, void *v_task) {
if (!fd_data((intptr_t)v_fd).protocol) {
finish_multi_task(v_fd, v_task);
return;
}
struct task *task = v_task;
protocol_s *pr = protocol_try_lock((intptr_t)v_fd, task->task_type);
if (!pr)
goto reschedule;
if (pr->service == task->service)
task->func(sock_fd2uuid((int)(intptr_t)v_fd), pr, task->arg);
protocol_unlock(pr, task->task_type);
defer(finish_multi_task, v_fd, v_task);
return;
reschedule:
// fprintf(stderr, "rescheduling multi for %p\n", v_fd);
defer(perform_multi_task, v_fd, v_task);
}
static void schedule_multi_task(void *v_fd, void *v_task) {
struct task *task = v_task;
intptr_t fd = (intptr_t)v_fd;
for (size_t i = 0; i < 64; i++) {
if (!fd_data(fd).protocol)
goto finish;
if (spn_trylock(&fd_data(fd).lock))
goto reschedule;
if (!fd_data(fd).protocol ||
fd_data(fd).protocol->service != task->service || fd == task->origin) {
spn_unlock(&fd_data(fd).lock);
goto finish;
}
spn_unlock(&fd_data(fd).lock);
spn_lock(&task->lock);
task->count++;
spn_unlock(&task->lock);
defer(perform_multi_task, (void *)fd, task);
finish:
do {
fd++;
} while (!fd_data(fd).protocol && (fd < facil_data->capacity));
if (fd >= (intptr_t)facil_data->capacity)
goto complete;
}
reschedule:
schedule_multi_task((void *)fd, v_task);
return;
complete:
defer(finish_multi_task, NULL, v_task);
}
/**
* Schedules a protected connection task. The task will run within the
* connection's lock.
*
* If the connection is closed before the task can run, the
* `fallback` task wil be called instead, allowing for resource cleanup.
*/
#undef facil_defer
void facil_defer(struct facil_defer_args_s args) {
if (!args.fallback)
args.fallback = mock_on_task_done;
if (!args.type)
args.type = FIO_PR_LOCK_TASK;
if (!args.task || !uuid_data(args.uuid).protocol || args.uuid < 0 ||
!sock_isvalid(args.uuid))
goto error;
struct task *task = alloc_facil_task();
if (!task)
goto error;
*task = (struct task){
.func = args.task, .arg = args.arg, .on_done = args.fallback};
defer(perform_single_task, (void *)args.uuid, task);
return;
error:
defer((void (*)(void *, void *))args.fallback, (void *)args.uuid, args.arg);
}
/**
* Schedules a protected connection task for each `service` connection.
* The tasks will run within each of the connection's locks.
*
* Once all the tasks were performed, the `on_complete` callback will be called.
*/
#undef facil_each
int facil_each(struct facil_each_args_s args) {
if (!args.on_complete)
args.on_complete = mock_on_task_done;
if (!args.task_type)
args.task_type = FIO_PR_LOCK_TASK;
if (!args.task)
goto error;
struct task *task = alloc_facil_task();
if (!task)
goto error;
*task = (struct task){.origin = args.origin,
.func = args.task,
.arg = args.arg,
.on_done = args.on_complete,
.service = args.service,
.task_type = args.task_type,
.count = 1};
defer(schedule_multi_task, (void *)0, task);
return 0;
error:
defer((void (*)(void *, void *))args.on_complete, (void *)args.origin,
args.arg);
return -1;
}