| /* |
| Copyright: Boaz Segev, 2016-2017 |
| License: MIT |
| |
| Feel free to copy, use and enjoy according to the license provided. |
| */ |
| #include "spnlock.h" |
| |
| #include "evio.h" |
| #include "facil.h" |
| #include "fio_hashmap.h" |
| #include "fio_llist.h" |
| #include "fiobj4sock.h" |
| |
| #include "fio_mem.h" |
| |
| #include <errno.h> |
| #include <pthread.h> |
| #include <signal.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <sys/mman.h> |
| #include <sys/stat.h> |
| #include <sys/wait.h> |
| |
| #if !defined(__GNUC__) && !defined(__clang__) |
| #define __attribute__(...) |
| #endif |
| |
| /* ***************************************************************************** |
| Patch for OSX version < 10.12 from https://stackoverflow.com/a/9781275/4025095 |
| ***************************************************************************** */ |
| #if defined(__MACH__) && !defined(CLOCK_REALTIME) |
| #include <sys/time.h> |
| #define CLOCK_REALTIME 0 |
| #define clock_gettime patch_clock_gettime |
| // clock_gettime is not implemented on older versions of OS X (< 10.12). |
| // If implemented, CLOCK_REALTIME will have already been defined. |
| static inline int patch_clock_gettime(int clk_id, struct timespec *t) { |
| struct timeval now; |
| int rv = gettimeofday(&now, NULL); |
| if (rv) |
| return rv; |
| t->tv_sec = now.tv_sec; |
| t->tv_nsec = now.tv_usec * 1000; |
| return 0; |
| (void)clk_id; |
| } |
| #endif |
| |
| /* ***************************************************************************** |
| Data Structures |
| ***************************************************************************** */ |
| typedef struct ProtocolMetadata { |
| spn_lock_i locks[3]; |
| unsigned rsv : 8; |
| } protocol_metadata_s; |
| |
| union protocol_metadata_union_u { |
| size_t opaque; |
| protocol_metadata_s meta; |
| }; |
| #define prt_meta(prt) (((union protocol_metadata_union_u *)(&(prt)->rsv))->meta) |
| |
| struct connection_data_s { |
| protocol_s *protocol; |
| time_t active; |
| uint8_t timeout; |
| spn_lock_i scheduled; |
| spn_lock_i lock; |
| }; |
| |
| static struct facil_data_s { |
| spn_lock_i global_lock; |
| uint8_t need_review; |
| uint8_t spindown; |
| uint16_t active; |
| uint16_t threads; |
| pid_t parent; |
| pool_pt thread_pool; |
| ssize_t capacity; |
| size_t connection_count; |
| struct timespec last_cycle; |
| struct connection_data_s conn[]; |
| } * facil_data; |
| |
| #define fd_data(fd) (facil_data->conn[(fd)]) |
| #define uuid_data(uuid) fd_data(sock_uuid2fd((uuid))) |
| // #define uuid_prt_meta(uuid) prt_meta(uuid_data((uuid)).protocol) |
| |
| /** locks a connection's protocol returns a pointer that need to be unlocked. */ |
| inline static protocol_s *protocol_try_lock(intptr_t fd, |
| enum facil_protocol_lock_e type) { |
| errno = 0; |
| if (spn_trylock(&fd_data(fd).lock)) |
| goto would_block; |
| protocol_s *pr = fd_data(fd).protocol; |
| if (!pr) { |
| spn_unlock(&fd_data(fd).lock); |
| errno = EBADF; |
| return NULL; |
| } |
| if (spn_trylock(&prt_meta(pr).locks[type])) { |
| spn_unlock(&fd_data(fd).lock); |
| goto would_block; |
| } |
| spn_unlock(&fd_data(fd).lock); |
| return pr; |
| would_block: |
| errno = EWOULDBLOCK; |
| return NULL; |
| } |
| /** See `facil_protocol_try_lock` for details. */ |
| inline static void protocol_unlock(protocol_s *pr, |
| enum facil_protocol_lock_e type) { |
| spn_unlock(&prt_meta(pr).locks[type]); |
| } |
| |
| /* ***************************************************************************** |
| Internal Protocol Names |
| ***************************************************************************** */ |
| static const char *LISTENER_PROTOCOL_NAME = |
| "listening protocol __facil_internal__"; |
| |
| static const char *CONNECTOR_PROTOCOL_NAME = "connect protocol __internal__"; |
| |
| static const char *TIMER_PROTOCOL_NAME = "timer protocol __facil_internal__"; |
| |
| /* ***************************************************************************** |
| Event deferring (declarations) |
| ***************************************************************************** */ |
| |
| static void deferred_on_close(void *uuid_, void *pr_); |
| static void deferred_on_shutdown(void *arg, void *arg2); |
| static void deferred_on_ready(void *arg, void *arg2); |
| static void deferred_on_data(void *uuid, void *arg2); |
| static void deferred_ping(void *arg, void *arg2); |
| |
| /* ***************************************************************************** |
| Overriding `defer` to use `evio` when waiting for events |
| ***************************************************************************** */ |
| |
| /* if the FIO_DEDICATED_SYSTEM is defined threads are activated more often. */ |
| #if FIO_DEDICATED_SYSTEM |
| void defer_thread_wait(pool_pt pool, void *p_thr) { |
| (void)pool; |
| (void)p_thr; |
| evio_wait(EVIO_TICK); |
| } |
| #endif |
| |
| /* ***************************************************************************** |
| Event Handlers (evio) |
| ***************************************************************************** */ |
| void sock_flush_defer(void *arg, void *ignored) { |
| (void)ignored; |
| switch (sock_flush((intptr_t)arg)) { |
| case 1: |
| evio_add_write(sock_uuid2fd((intptr_t)arg), (void *)arg); |
| break; |
| case 0: |
| defer(deferred_on_ready, arg, NULL); |
| break; |
| } |
| } |
| |
| void evio_on_ready(void *arg) { defer(sock_flush_defer, arg, NULL); } |
| void evio_on_close(void *arg) { sock_force_close((intptr_t)arg); } |
| void evio_on_error(void *arg) { sock_force_close((intptr_t)arg); } |
| void evio_on_data(void *arg) { defer(deferred_on_data, arg, NULL); } |
| |
| /* ***************************************************************************** |
| Mock Protocol Callbacks and Service Funcions |
| ***************************************************************************** */ |
| static void mock_on_ev(intptr_t uuid, protocol_s *protocol) { |
| (void)uuid; |
| (void)protocol; |
| } |
| |
| static void mock_on_data(intptr_t uuid, protocol_s *protocol) { |
| facil_quite(uuid); |
| (void)protocol; |
| } |
| |
| static void mock_on_close(intptr_t uuid, protocol_s *protocol) { |
| (void)protocol; |
| (void)uuid; |
| } |
| static uint8_t mock_on_shutdown(intptr_t uuid, protocol_s *protocol) { |
| return 0; |
| (void)protocol; |
| (void)uuid; |
| } |
| |
| static uint8_t mock_on_shutdown_internal(intptr_t uuid, protocol_s *protocol) { |
| return 0; |
| (void)protocol; |
| (void)uuid; |
| } |
| |
| static void mock_ping(intptr_t uuid, protocol_s *protocol) { |
| (void)protocol; |
| |
| sock_force_close(uuid); |
| } |
| static void mock_ping2(intptr_t uuid, protocol_s *protocol) { |
| (void)protocol; |
| |
| uuid_data(uuid).active = facil_last_tick().tv_sec; |
| if (uuid_data(uuid).timeout == 255) |
| return; |
| protocol->ping = mock_ping; |
| uuid_data(uuid).timeout = 8; |
| sock_close(uuid); |
| } |
| |
| /* Support for the default pub/sub cluster engine */ |
| #pragma weak pubsub_cluster_init |
| void pubsub_cluster_init(void) {} |
| #pragma weak pubsub_cluster_on_fork_start |
| void pubsub_cluster_on_fork_start(void) {} |
| #pragma weak pubsub_cluster_on_fork_end |
| void pubsub_cluster_on_fork_end(void) {} |
| #pragma weak pubsub_cluster_cleanup |
| void pubsub_cluster_cleanup(void) {} |
| |
| /* other cleanup concern */ |
| #pragma weak fio_cli_end |
| void fio_cli_end(void) {} |
| |
| /* ***************************************************************************** |
| Core Callbacks for forking / starting up / cleaning up |
| ***************************************************************************** */ |
| |
| typedef struct { |
| fio_ls_embd_s node; |
| void (*func)(void *); |
| void *arg; |
| } callback_data_s; |
| |
| typedef struct { |
| spn_lock_i lock; |
| fio_ls_embd_s callbacks; |
| } callback_collection_s; |
| |
| static callback_collection_s callback_collection[FIO_CALL_AT_EXIT + 1]; |
| |
| static inline void facil_core_callback_ensure(callback_collection_s *c) { |
| if (c->callbacks.next) |
| return; |
| c->callbacks = (fio_ls_embd_s)FIO_LS_INIT(c->callbacks); |
| } |
| |
| /** Adds a callback to the list of callbacks to be called for the event. */ |
| void facil_core_callback_add(callback_type_e c_type, void (*func)(void *), |
| void *arg) { |
| if (!func || (int)c_type < 0 || c_type > FIO_CALL_AT_EXIT) |
| return; |
| spn_lock(&callback_collection[c_type].lock); |
| facil_core_callback_ensure(&callback_collection[c_type]); |
| callback_data_s *tmp = malloc(sizeof(*tmp)); |
| *tmp = (callback_data_s){.func = func, .arg = arg}; |
| fio_ls_embd_push(&callback_collection[c_type].callbacks, &tmp->node); |
| spn_unlock(&callback_collection[c_type].lock); |
| } |
| |
| /** Removes a callback from the list of callbacks to be called for the event. */ |
| int facil_core_callback_remove(callback_type_e c_type, void (*func)(void *), |
| void *arg) { |
| if ((int)c_type < 0 || c_type > FIO_CALL_AT_EXIT) |
| return -1; |
| spn_lock(&callback_collection[c_type].lock); |
| FIO_LS_EMBD_FOR(&callback_collection[c_type].callbacks, pos) { |
| callback_data_s *tmp = (FIO_LS_EMBD_OBJ(callback_data_s, node, pos)); |
| if (tmp->func == func && tmp->arg == arg) { |
| fio_ls_embd_remove(&tmp->node); |
| free(tmp); |
| goto success; |
| } |
| } |
| spn_unlock(&callback_collection[c_type].lock); |
| return -1; |
| success: |
| spn_unlock(&callback_collection[c_type].lock); |
| return -0; |
| } |
| |
| /** Forces all the existing callbacks to run, as if the event occured. */ |
| void facil_core_callback_force(callback_type_e c_type) { |
| if ((int)c_type < 0 || c_type > FIO_CALL_AT_EXIT) |
| return; |
| /* copy collection */ |
| fio_ls_embd_s copy = FIO_LS_INIT(copy); |
| spn_lock(&callback_collection[c_type].lock); |
| facil_core_callback_ensure(&callback_collection[c_type]); |
| FIO_LS_EMBD_FOR(&callback_collection[c_type].callbacks, pos) { |
| callback_data_s *tmp = fio_malloc(sizeof(*tmp)); |
| *tmp = *(FIO_LS_EMBD_OBJ(callback_data_s, node, pos)); |
| fio_ls_embd_push(©, &tmp->node); |
| } |
| spn_unlock(&callback_collection[c_type].lock); |
| /* run callbacks + free data */ |
| while (fio_ls_embd_any(©)) { |
| callback_data_s *tmp = |
| FIO_LS_EMBD_OBJ(callback_data_s, node, fio_ls_embd_shift(©)); |
| if (tmp->func) { |
| tmp->func(tmp->arg); |
| } |
| fio_free(tmp); |
| } |
| } |
| |
| /** Clears all the existing callbacks for the event. */ |
| void facil_core_callback_clear(callback_type_e c_type) { |
| if ((int)c_type < 0 || c_type > FIO_CALL_AT_EXIT) |
| return; |
| spn_lock(&callback_collection[c_type].lock); |
| facil_core_callback_ensure(&callback_collection[c_type]); |
| while (fio_ls_embd_any(&callback_collection[c_type].callbacks)) { |
| callback_data_s *tmp = FIO_LS_EMBD_OBJ( |
| callback_data_s, node, |
| fio_ls_embd_shift(&callback_collection[c_type].callbacks)); |
| free(tmp); |
| } |
| spn_unlock(&callback_collection[c_type].lock); |
| } |
| |
| /* ***************************************************************************** |
| External initialization / deconstruction |
| ***************************************************************************** */ |
| |
| /* perform initialization for external services. */ |
| static void facil_external_init(void) { |
| sock_on_fork(); |
| fio_malloc_after_fork(); |
| defer_on_fork(); |
| pubsub_cluster_on_fork_start(); |
| } |
| |
| /* perform stage 2 initialization for external services. */ |
| static void facil_external_init2(void) { pubsub_cluster_on_fork_end(); } |
| |
| /* perform cleanup for external services. */ |
| static void facil_external_cleanup(void) {} |
| |
| #pragma weak http_lib_init |
| void http_lib_init(void) {} |
| #pragma weak http_lib_cleanup |
| void http_lib_cleanup(void) {} |
| |
| /* perform initialization for external services. */ |
| static void facil_external_root_init(void) { |
| http_lib_init(); |
| pubsub_cluster_init(); |
| } |
| /* perform cleanup for external services. */ |
| static void facil_external_root_cleanup(void) { |
| http_lib_cleanup(); |
| pubsub_cluster_cleanup(); |
| fio_cli_end(); |
| } |
| |
| /* ***************************************************************************** |
| Deferred event handlers |
| ***************************************************************************** */ |
| static void deferred_on_close(void *uuid_, void *pr_) { |
| protocol_s *pr = pr_; |
| if (pr->rsv) |
| goto postpone; |
| pr->on_close((intptr_t)uuid_, pr); |
| return; |
| postpone: |
| defer(deferred_on_close, uuid_, pr_); |
| } |
| |
| static void deferred_on_shutdown(void *arg, void *arg2) { |
| if (!uuid_data(arg).protocol) { |
| return; |
| } |
| protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_TASK); |
| if (!pr) { |
| if (errno == EBADF) |
| return; |
| goto postpone; |
| } |
| uuid_data(arg).active = facil_data->last_cycle.tv_sec; |
| uint8_t r = pr->on_shutdown((intptr_t)arg, pr); |
| if (r) { |
| if (r == 255) { |
| uuid_data(arg).timeout = 0; |
| } else { |
| spn_add(&facil_data->connection_count, 1); |
| uuid_data(arg).timeout = r; |
| } |
| pr->ping = mock_ping2; |
| protocol_unlock(pr, FIO_PR_LOCK_TASK); |
| } else { |
| spn_add(&facil_data->connection_count, 1); |
| uuid_data(arg).timeout = 8; |
| pr->ping = mock_ping; |
| protocol_unlock(pr, FIO_PR_LOCK_TASK); |
| sock_close((intptr_t)arg); |
| } |
| return; |
| postpone: |
| defer(deferred_on_shutdown, arg, NULL); |
| (void)arg2; |
| } |
| |
| static void deferred_on_ready(void *arg, void *arg2) { |
| if (!uuid_data(arg).protocol) { |
| return; |
| } |
| protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_WRITE); |
| if (!pr) { |
| if (errno == EBADF) |
| return; |
| goto postpone; |
| } |
| pr->on_ready((intptr_t)arg, pr); |
| protocol_unlock(pr, FIO_PR_LOCK_WRITE); |
| return; |
| postpone: |
| defer(deferred_on_ready, arg, NULL); |
| (void)arg2; |
| } |
| |
| static void deferred_on_data(void *uuid, void *arg2) { |
| if (!uuid_data(uuid).protocol || sock_isclosed((intptr_t)uuid)) { |
| return; |
| } |
| protocol_s *pr = protocol_try_lock(sock_uuid2fd(uuid), FIO_PR_LOCK_TASK); |
| if (!pr) { |
| if (errno == EBADF) |
| return; |
| goto postpone; |
| } |
| spn_unlock(&uuid_data(uuid).scheduled); |
| pr->on_data((intptr_t)uuid, pr); |
| protocol_unlock(pr, FIO_PR_LOCK_TASK); |
| if (!spn_trylock(&uuid_data(uuid).scheduled)) { |
| evio_add_read(sock_uuid2fd((intptr_t)uuid), uuid); |
| } |
| return; |
| postpone: |
| if (arg2) { |
| /* the event is being forced, so force rescheduling */ |
| defer(deferred_on_data, (void *)uuid, (void *)1); |
| } else { |
| /* the protocol was locked, so there might not be any need for the event */ |
| evio_add_read(sock_uuid2fd((intptr_t)uuid), uuid); |
| } |
| return; |
| } |
| |
| static void deferred_ping(void *arg, void *arg2) { |
| if (!uuid_data(arg).protocol || |
| (uuid_data(arg).timeout && |
| (uuid_data(arg).timeout > |
| (facil_data->last_cycle.tv_sec - uuid_data(arg).active)))) { |
| return; |
| } |
| protocol_s *pr = protocol_try_lock(sock_uuid2fd(arg), FIO_PR_LOCK_WRITE); |
| if (!pr) |
| goto postpone; |
| pr->ping((intptr_t)arg, pr); |
| protocol_unlock(pr, FIO_PR_LOCK_WRITE); |
| return; |
| postpone: |
| defer(deferred_ping, arg, NULL); |
| (void)arg2; |
| } |
| |
| /* ***************************************************************************** |
| Forcing IO events |
| ***************************************************************************** */ |
| |
| void facil_force_event(intptr_t uuid, enum facil_io_event ev) { |
| switch (ev) { |
| case FIO_EVENT_ON_DATA: |
| spn_trylock(&uuid_data(uuid).scheduled); |
| defer(deferred_on_data, (void *)uuid, (void *)1); |
| break; |
| case FIO_EVENT_ON_TIMEOUT: |
| defer(deferred_ping, (void *)uuid, NULL); |
| break; |
| case FIO_EVENT_ON_READY: |
| evio_on_ready((void *)uuid); |
| break; |
| } |
| } |
| |
| /** |
| * Temporarily prevents `on_data` events from firing. |
| * |
| * The `on_data` event will be automatically rescheduled when (if) the socket's |
| * outgoing buffer fills up or when `facil_force_event` is called with |
| * `FIO_EVENT_ON_DATA`. |
| */ |
| void facil_quite(intptr_t uuid) { |
| if (sock_isvalid(uuid)) |
| spn_trylock(&uuid_data(uuid).scheduled); |
| } |
| |
| /* ***************************************************************************** |
| Socket callbacks |
| ***************************************************************************** */ |
| |
| void sock_on_close(intptr_t uuid) { |
| // fprintf(stderr, "INFO: facil.io, on-close called for %u (set to %p)\n", |
| // (unsigned int)sock_uuid2fd(uuid), (void |
| // *)uuid_data(uuid).protocol); |
| spn_lock(&uuid_data(uuid).lock); |
| struct connection_data_s old_data = uuid_data(uuid); |
| uuid_data(uuid) = (struct connection_data_s){.lock = uuid_data(uuid).lock}; |
| spn_unlock(&uuid_data(uuid).lock); |
| if (old_data.protocol) { |
| defer(deferred_on_close, (void *)uuid, old_data.protocol); |
| if (facil_data->active == 0 && old_data.timeout) { |
| spn_sub(&facil_data->connection_count, 1); |
| } |
| } |
| } |
| |
| void sock_touch(intptr_t uuid) { |
| if (facil_data && facil_data->active) |
| uuid_data(uuid).active = facil_data->last_cycle.tv_sec; |
| } |
| |
| /* ***************************************************************************** |
| Initialization and Cleanup |
| ***************************************************************************** */ |
| static spn_lock_i facil_libinit_lock = SPN_LOCK_INIT; |
| |
| /** Rounds up any size to the nearest page alignment (assumes 4096 bytes per |
| * page) */ |
| #define round_size(size) (((size) & (~4095)) + (4096 * (!!((size)&4095)))) |
| |
| static void facil_libcleanup(void) { |
| /* free memory */ |
| spn_lock(&facil_libinit_lock); |
| facil_core_callback_force(FIO_CALL_AT_EXIT); |
| if (facil_data) { |
| facil_external_root_cleanup(); |
| // defer_perform(); /* perform any lingering cleanup tasks? */ |
| size_t mem_size = sizeof(*facil_data) + ((size_t)facil_data->capacity * |
| sizeof(struct connection_data_s)); |
| munmap(facil_data, round_size(mem_size)); |
| facil_data = NULL; |
| } |
| spn_unlock(&facil_libinit_lock); |
| } |
| |
| static void facil_lib_init(void) { |
| ssize_t capa = sock_max_capacity(); |
| if (capa < 0) { |
| perror("ERROR: socket capacity unknown / failure"); |
| exit(ENOMEM); |
| } |
| size_t mem_size = |
| sizeof(*facil_data) + ((size_t)capa * sizeof(struct connection_data_s)); |
| spn_lock(&facil_libinit_lock); |
| if (facil_data) |
| goto finish; |
| facil_data = mmap(NULL, round_size(mem_size), PROT_READ | PROT_WRITE, |
| MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); |
| if (!facil_data || facil_data == MAP_FAILED) { |
| perror("ERROR: Couldn't initialize the facil.io library"); |
| exit(0); |
| } |
| memset(facil_data, 0, mem_size); |
| *facil_data = (struct facil_data_s){ |
| .capacity = capa, |
| .parent = getpid(), |
| }; |
| facil_external_root_init(); |
| atexit(facil_libcleanup); |
| #ifdef DEBUG |
| if (FACIL_PRINT_STATE) |
| fprintf(stderr, |
| "Initialized the facil.io library.\n" |
| "facil.io's memory footprint per connection == %lu Bytes X %lu\n" |
| "=== facil.io's memory footprint: %lu ===\n\n", |
| (unsigned long)sizeof(struct connection_data_s), |
| (unsigned long)facil_data->capacity, (unsigned long)mem_size); |
| #endif |
| finish: |
| spn_unlock(&facil_libinit_lock); |
| clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle); |
| } |
| |
| /** Sets to shutdown flag for the current process. |
| * |
| * If a cluster is used and this function is called for a worker, a new worker |
| * will respawn. |
| * |
| * This MUST be signal safe (don't call any functions, just use flags). |
| */ |
| static void facil_stop(void) { |
| if (!facil_data) |
| return; |
| facil_data->active = 0; |
| } |
| |
| /* ***************************************************************************** |
| The listenning protocol |
| ***************************************************************************** */ |
| #undef facil_listen |
| |
| struct ListenerProtocol { |
| protocol_s protocol; |
| void (*on_open)(void *uuid, void *udata); |
| void *udata; |
| void (*on_start)(intptr_t uuid, void *udata); |
| void (*on_finish)(intptr_t uuid, void *udata); |
| char *port; |
| char *address; |
| uint8_t quite; |
| }; |
| |
| static void listener_ping(intptr_t uuid, protocol_s *plistener) { |
| // fprintf(stderr, "*** Listener Ping Called for %ld\n", sock_uuid2fd(uuid)); |
| uuid_data(uuid).active = facil_data->last_cycle.tv_sec; |
| return; |
| (void)plistener; |
| } |
| |
| static void listener_on_data(intptr_t uuid, protocol_s *plistener) { |
| for (int i = 0; i < 4; ++i) { |
| intptr_t new_client = sock_accept(uuid); |
| if (new_client == -1) { |
| if (errno == EWOULDBLOCK || errno == EAGAIN || errno == ECONNABORTED || |
| errno == ECONNRESET) |
| return; |
| perror("ERROR: socket accept error"); |
| return; |
| } |
| // to defer or not to defer...? TODO: answer the question |
| struct ListenerProtocol *listener = (struct ListenerProtocol *)plistener; |
| defer(listener->on_open, (void *)new_client, listener->udata); |
| } |
| facil_force_event(uuid, FIO_EVENT_ON_DATA); |
| } |
| |
| static void free_listenner(void *li) { free(li); } |
| |
| static void listener_on_close(intptr_t uuid, protocol_s *plistener) { |
| struct ListenerProtocol *listener = (void *)plistener; |
| if (listener->on_finish) { |
| listener->on_finish(uuid, listener->udata); |
| } |
| if (FACIL_PRINT_STATE && facil_data->parent == getpid()) { |
| if (listener->port) { |
| fprintf(stderr, "* Stopped listening on port %s\n", listener->port); |
| } else { |
| fprintf(stderr, "* Stopped listening on Unix Socket %s\n", |
| listener->address); |
| } |
| } |
| if (!listener->port) { |
| unlink(listener->address); |
| } |
| free_listenner(listener); |
| } |
| |
| static inline struct ListenerProtocol * |
| listener_alloc(struct facil_listen_args settings) { |
| size_t port_len = 0; |
| size_t addr_len = 0; |
| if (settings.port) { |
| port_len = strlen(settings.port) + 1; |
| } |
| if (settings.address) { |
| addr_len = strlen(settings.address) + 1; |
| } |
| struct ListenerProtocol *listener = |
| malloc(sizeof(*listener) + addr_len + port_len); |
| |
| if (listener) { |
| *listener = (struct ListenerProtocol){ |
| .protocol.service = LISTENER_PROTOCOL_NAME, |
| .protocol.on_data = listener_on_data, |
| .protocol.on_close = listener_on_close, |
| .protocol.on_shutdown = mock_on_shutdown_internal, |
| .protocol.ping = listener_ping, |
| .on_open = (void (*)(void *, void *))settings.on_open, |
| .udata = settings.udata, |
| .on_start = settings.on_start, |
| .on_finish = settings.on_finish, |
| }; |
| if (settings.port) { |
| listener->port = (char *)(listener + 1); |
| memcpy(listener->port, settings.port, port_len); |
| } |
| if (settings.address) { |
| listener->address = (char *)(listener + 1); |
| listener->address += port_len; |
| memcpy(listener->address, settings.address, addr_len); |
| } |
| return listener; |
| } |
| return NULL; |
| } |
| |
| inline static void listener_on_start(int fd) { |
| intptr_t uuid = sock_fd2uuid((int)fd); |
| if (uuid < 0) { |
| fprintf(stderr, "ERROR: listening socket dropped?\n"); |
| kill(0, SIGINT); |
| exit(4); |
| } |
| if (evio_add(fd, (void *)uuid) < 0) { |
| perror("Couldn't register listening socket"); |
| kill(0, SIGINT); |
| exit(4); |
| } |
| fd_data(fd).active = facil_data->last_cycle.tv_sec; |
| // call the on_init callback |
| struct ListenerProtocol *listener = |
| (struct ListenerProtocol *)uuid_data(uuid).protocol; |
| if (listener->on_start) { |
| listener->on_start(uuid, listener->udata); |
| } |
| } |
| |
| /** |
| Listens to a server with the following server settings (which MUST include |
| a default protocol). |
| */ |
| intptr_t facil_listen(struct facil_listen_args settings) { |
| if (!facil_data) |
| facil_lib_init(); |
| if (settings.on_open == NULL) { |
| errno = EINVAL; |
| return -1; |
| } |
| if (!settings.port || settings.port[0] == 0 || |
| (settings.port[0] == '0' && settings.port[1] == 0)) { |
| settings.port = NULL; |
| } |
| intptr_t uuid = sock_listen(settings.address, settings.port); |
| if (uuid == -1) { |
| return -1; |
| } |
| protocol_s *protocol = (void *)listener_alloc(settings); |
| facil_attach(uuid, protocol); |
| if (!protocol) { |
| sock_close(uuid); |
| return -1; |
| } |
| if (FACIL_PRINT_STATE && facil_data->parent == getpid()) { |
| if (settings.port) |
| fprintf(stderr, "* Listening on port %s\n", settings.port); |
| else |
| fprintf(stderr, "* Listening on Unix Socket at %s\n", settings.address); |
| } |
| return uuid; |
| } |
| |
| /* ***************************************************************************** |
| Connect (as client) |
| ***************************************************************************** */ |
| |
| struct ConnectProtocol { |
| protocol_s protocol; |
| void (*on_connect)(void *uuid, void *udata); |
| void (*on_fail)(intptr_t uuid, void *udata); |
| void *udata; |
| intptr_t uuid; |
| uint8_t opened; |
| }; |
| |
| /* The first `ready` signal is fired when a connection was established */ |
| static void connector_on_ready(intptr_t uuid, protocol_s *_connector) { |
| struct ConnectProtocol *connector = (void *)_connector; |
| sock_touch(uuid); |
| if (connector->opened == 0) { |
| connector->opened = 1; |
| facil_set_timeout(uuid, 0); /* remove connection timeout settings */ |
| connector->on_connect((void *)uuid, connector->udata); |
| evio_add_write(sock_uuid2fd(uuid), (void *)uuid); |
| } |
| return; |
| } |
| |
| /* If data events reach this protocol, delay their execution. */ |
| static void connector_on_data(intptr_t uuid, protocol_s *connector) { |
| (void)connector; |
| (void)uuid; |
| } |
| |
| /* Failed to connect */ |
| static void connector_on_close(intptr_t uuid, protocol_s *pconnector) { |
| struct ConnectProtocol *connector = (void *)pconnector; |
| if (connector->opened == 0 && connector->on_fail) |
| connector->on_fail(connector->uuid, connector->udata); |
| free(connector); |
| (void)uuid; |
| } |
| |
| #undef facil_connect |
| intptr_t facil_connect(struct facil_connect_args opt) { |
| intptr_t uuid = -1; |
| if (!opt.on_connect || (!opt.address && !opt.port)) |
| goto error; |
| if (!opt.timeout) |
| opt.timeout = 30; |
| struct ConnectProtocol *connector = malloc(sizeof(*connector)); |
| if (!connector) |
| goto error; |
| *connector = (struct ConnectProtocol){ |
| .on_connect = (void (*)(void *, void *))opt.on_connect, |
| .on_fail = opt.on_fail, |
| .udata = opt.udata, |
| .protocol.service = CONNECTOR_PROTOCOL_NAME, |
| .protocol.on_data = connector_on_data, |
| .protocol.on_ready = connector_on_ready, |
| .protocol.on_close = connector_on_close, |
| .opened = 0, |
| }; |
| uuid = connector->uuid = sock_connect(opt.address, opt.port); |
| /* check for errors, always invoke the on_fail if required */ |
| if (uuid == -1) { |
| free(connector); |
| goto error; |
| } |
| if (facil_attach(uuid, &connector->protocol) == -1) { |
| /* facil_attach calls the failure / `on_close` callback */ |
| sock_close(uuid); |
| return -1; |
| } |
| uuid_data(uuid).active = facil_last_tick().tv_sec; |
| facil_set_timeout(uuid, opt.timeout); |
| return uuid; |
| error: |
| if (opt.on_fail) |
| opt.on_fail(uuid, opt.udata); |
| return -1; |
| } |
| #define facil_connect(...) \ |
| facil_connect((struct facil_connect_args){__VA_ARGS__}) |
| |
| /* ***************************************************************************** |
| Timers |
| ***************************************************************************** */ |
| |
| /* ******* |
| Timer Protocol |
| ******* */ |
| typedef struct { |
| protocol_s protocol; |
| size_t milliseconds; |
| size_t repetitions; |
| void (*task)(void *); |
| void (*on_finish)(void *); |
| void *arg; |
| } timer_protocol_s; |
| |
| #define prot2timer(protocol) (*((timer_protocol_s *)(protocol))) |
| |
| static void timer_on_data(intptr_t uuid, protocol_s *protocol) { |
| prot2timer(protocol).task(prot2timer(protocol).arg); |
| if (prot2timer(protocol).repetitions == 0) |
| goto reschedule; |
| prot2timer(protocol).repetitions -= 1; |
| if (prot2timer(protocol).repetitions) |
| goto reschedule; |
| sock_force_close(uuid); |
| return; |
| reschedule: |
| spn_trylock(&uuid_data(uuid).scheduled); |
| evio_set_timer(sock_uuid2fd(uuid), (void *)uuid, |
| prot2timer(protocol).milliseconds); |
| } |
| |
| static void timer_on_close(intptr_t uuid, protocol_s *protocol) { |
| prot2timer(protocol).on_finish(prot2timer(protocol).arg); |
| free(protocol); |
| (void)uuid; |
| } |
| |
| static void timer_ping(intptr_t uuid, protocol_s *protocol) { |
| sock_touch(uuid); |
| (void)protocol; |
| } |
| |
| static inline timer_protocol_s *timer_alloc(void (*task)(void *), void *arg, |
| size_t milliseconds, |
| size_t repetitions, |
| void (*on_finish)(void *)) { |
| if (!on_finish) |
| on_finish = (void (*)(void *))mock_on_close; |
| timer_protocol_s *t = malloc(sizeof(*t)); |
| if (t) |
| *t = (timer_protocol_s){ |
| .protocol.service = TIMER_PROTOCOL_NAME, |
| .protocol.on_data = timer_on_data, |
| .protocol.on_close = timer_on_close, |
| .protocol.on_shutdown = mock_on_shutdown_internal, |
| .protocol.ping = timer_ping, |
| .arg = arg, |
| .task = task, |
| .on_finish = on_finish, |
| .milliseconds = milliseconds, |
| .repetitions = repetitions, |
| }; |
| return t; |
| } |
| |
| inline static void timer_on_server_start(int fd) { |
| if (evio_set_timer(fd, (void *)sock_fd2uuid(fd), |
| prot2timer(fd_data(fd).protocol).milliseconds)) { |
| perror("Couldn't register a required timed event."); |
| kill(0, SIGINT); |
| exit(4); |
| } |
| } |
| |
| /** |
| * Creates a system timer (at the cost of 1 file descriptor). |
| * |
| * The task will repeat `repetitions` times. If `repetitions` is set to 0, task |
| * will repeat forever. |
| * |
| * Returns -1 on error or the new file descriptor on succeess. |
| * |
| * The `on_finish` handler is always called (even on error). |
| */ |
| int facil_run_every(size_t milliseconds, size_t repetitions, |
| void (*task)(void *), void *arg, |
| void (*on_finish)(void *)) { |
| if (task == NULL) |
| goto error_fin; |
| timer_protocol_s *protocol = NULL; |
| intptr_t uuid = -1; |
| int fd = evio_open_timer(); |
| if (fd == -1) { |
| perror("ERROR: couldn't create a timer fd"); |
| goto error; |
| } |
| uuid = sock_open(fd); |
| if (uuid == -1) |
| goto error; |
| protocol = timer_alloc(task, arg, milliseconds, repetitions, on_finish); |
| if (protocol == NULL) |
| goto error; |
| facil_attach(uuid, (protocol_s *)protocol); |
| if (evio_isactive() && evio_set_timer(fd, (void *)uuid, milliseconds) == -1) { |
| /* don't goto error because the protocol is attached. */ |
| const int old = errno; |
| sock_force_close(uuid); |
| errno = old; |
| return -1; |
| } |
| return 0; |
| error: |
| if (uuid != -1) { |
| const int old = errno; |
| sock_close(uuid); |
| errno = old; |
| } else if (fd != -1) { |
| const int old = errno; |
| close(fd); |
| errno = old; |
| } |
| error_fin: |
| if (on_finish) { |
| const int old = errno; |
| on_finish(arg); |
| errno = old; |
| } |
| return -1; |
| } |
| |
| /* ***************************************************************************** |
| Running the server |
| ***************************************************************************** */ |
| |
| volatile uint8_t facil_signal_children_flag = 0; |
| |
| /** |
| * Signals all workers to shutdown, which might invoke a respawning of the |
| * workers unless the shutdown signal was received. |
| * |
| * NOT signal safe. |
| */ |
| #pragma weak facil_cluster_signal_children |
| void facil_cluster_signal_children(void) { |
| if (facil_data->parent != getpid()) { |
| facil_stop(); |
| } |
| } |
| |
| static inline void facil_internal_poll(void) { |
| if (facil_signal_children_flag) { |
| facil_signal_children_flag = 0; |
| facil_cluster_signal_children(); |
| } |
| } |
| |
| static inline void facil_internal_poll_reset(void) { |
| facil_signal_children_flag = 0; |
| } |
| |
| static void print_pid(void *arg, void *ignr) { |
| (void)arg; |
| (void)ignr; |
| fprintf(stderr, "* %d is running.\n", getpid()); |
| } |
| |
| static void facil_review_timeout(void *arg, void *ignr) { |
| (void)ignr; |
| protocol_s *tmp; |
| time_t review = facil_data->last_cycle.tv_sec; |
| intptr_t fd = (intptr_t)arg; |
| |
| uint16_t timeout = fd_data(fd).timeout; |
| if (!timeout) |
| timeout = 300; /* enforced timout settings */ |
| |
| if (!fd_data(fd).protocol || (fd_data(fd).active + timeout >= review)) |
| goto finish; |
| tmp = protocol_try_lock(fd, FIO_PR_LOCK_STATE); |
| if (!tmp) |
| goto reschedule; |
| if (prt_meta(tmp).locks[FIO_PR_LOCK_TASK] || |
| prt_meta(tmp).locks[FIO_PR_LOCK_WRITE]) |
| goto unlock; |
| defer(deferred_ping, (void *)sock_fd2uuid((int)fd), NULL); |
| unlock: |
| protocol_unlock(tmp, FIO_PR_LOCK_STATE); |
| finish: |
| do { |
| fd++; |
| } while (!fd_data(fd).protocol && (fd < facil_data->capacity)); |
| |
| if (facil_data->capacity <= fd) { |
| facil_data->need_review = 1; |
| return; |
| } |
| reschedule: |
| defer(facil_review_timeout, (void *)fd, NULL); |
| } |
| |
| static void perform_idle(void *arg, void *ignr) { |
| facil_core_callback_force(FIO_CALL_ON_IDLE); |
| (void)arg; |
| (void)ignr; |
| } |
| |
| /* reactor pattern cycling - common */ |
| static void facil_cycle_schedule_events(void) { |
| static int idle = 0; |
| clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle); |
| facil_internal_poll(); |
| int events; |
| if (defer_has_queue()) { |
| events = evio_review(0); |
| if (events < 0) { |
| return; |
| } |
| if (events > 0) { |
| idle = 1; |
| } |
| } else { |
| events = evio_review(EVIO_TICK); |
| if (events < 0) |
| return; |
| if (events > 0) { |
| idle = 1; |
| } else if (idle) { |
| defer(perform_idle, NULL, NULL); |
| idle = 0; |
| } |
| } |
| static time_t last_to_review = 0; |
| if (facil_data->need_review && |
| facil_data->last_cycle.tv_sec != last_to_review) { |
| last_to_review = facil_data->last_cycle.tv_sec; |
| facil_data->need_review = 0; |
| defer(facil_review_timeout, (void *)0, NULL); |
| } |
| } |
| |
| /* reactor pattern cycling during cleanup */ |
| static void facil_cycle_unwind(void *ignr, void *ignr2) { |
| if (facil_data->connection_count) { |
| facil_cycle_schedule_events(); |
| defer(facil_cycle_unwind, ignr, ignr2); |
| return; |
| } |
| pool_pt pool = facil_data->thread_pool; |
| facil_data->thread_pool = NULL; |
| defer_pool_stop(pool); |
| return; |
| (void)ignr; |
| (void)ignr2; |
| } |
| |
| /* reactor pattern cycling */ |
| static void facil_cycle(void *ignr, void *ignr2) { |
| facil_cycle_schedule_events(); |
| if (facil_data->active) { |
| defer(facil_cycle, ignr, ignr2); |
| return; |
| } |
| /* switch to winding down */ |
| if (FACIL_PRINT_STATE && !facil_data->spindown) { |
| pid_t pid = getpid(); |
| if (pid != facil_data->parent) |
| fprintf(stderr, "* (%d) Detected exit signal.\n", getpid()); |
| else |
| fprintf(stderr, "* Server Detected exit signal.\n"); |
| } |
| pool_pt pool = facil_data->thread_pool; |
| facil_data->thread_pool = NULL; |
| defer_pool_stop(pool); |
| return; |
| (void)ignr; |
| (void)ignr2; |
| } |
| |
| /** |
| OVERRIDE THIS to replace the default `fork` implementation or to inject hooks |
| into the forking function. |
| |
| Behaves like the system's `fork`. |
| */ |
| #pragma weak facil_fork |
| int facil_fork(void) { return (int)fork(); } |
| |
| /** This will be called by child processes, make sure to unlock any existing |
| * locks. |
| * |
| * Known locks: |
| * * `defer` tasks lock. |
| * * `sock` packet pool lock. |
| * * `sock` connection lock (per connection). |
| * * `facil` global lock. |
| * * `facil` pub/sub lock. |
| * * `facil` connection data lock (per connection data). |
| * * `facil` protocol lock (per protocol object, placed in `rsv`). |
| * * `pubsub` pubsub global lock (should be initialized in facil_external_init. |
| * * `pubsub` pubsub client lock (should be initialized in facil_external_init. |
| */ |
| static void facil_worker_startup(uint8_t sentinel) { |
| facil_internal_poll_reset(); |
| evio_create(); |
| clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle); |
| facil_external_init(); |
| if (facil_data->active == 1) { |
| /* single process */ |
| for (int i = 0; i < facil_data->capacity; i++) { |
| errno = 0; |
| fd_data(i).lock = SPN_LOCK_INIT; |
| if (fd_data(i).protocol) { |
| fd_data(i).protocol->rsv = 0; |
| if (fd_data(i).protocol->service == LISTENER_PROTOCOL_NAME) |
| listener_on_start(i); |
| else if (fd_data(i).protocol->service == TIMER_PROTOCOL_NAME) |
| timer_on_server_start(i); |
| else { |
| evio_add(i, (void *)sock_fd2uuid(i)); |
| } |
| } |
| } |
| } else if (sentinel == 0) { |
| /* child process */ |
| for (int i = 0; i < facil_data->capacity; i++) { |
| errno = 0; |
| fd_data(i).lock = SPN_LOCK_INIT; |
| if (fd_data(i).protocol) { |
| fd_data(i).protocol->rsv = 0; |
| if (fd_data(i).protocol->service == LISTENER_PROTOCOL_NAME) |
| listener_on_start(i); |
| else if (fd_data(i).protocol->service == TIMER_PROTOCOL_NAME) |
| timer_on_server_start(i); |
| else { |
| /* prevent normal connections from being shared across workers */ |
| intptr_t uuid = sock_fd2uuid(i); |
| if (uuid != -1) { |
| sock_force_close(uuid); |
| } else { |
| sock_on_close(uuid); |
| } |
| } |
| } |
| } |
| } else { |
| /* sentinel process - ignore listening sockets, but keep them open */ |
| for (int i = 0; i < facil_data->capacity; i++) { |
| fd_data(i).lock = SPN_LOCK_INIT; |
| if (fd_data(i).protocol) { |
| fd_data(i).protocol->rsv = 0; |
| if (fd_data(i).protocol->service == TIMER_PROTOCOL_NAME) |
| timer_on_server_start(i); |
| else if (fd_data(i).protocol->service != LISTENER_PROTOCOL_NAME) { |
| evio_add(i, (void *)sock_fd2uuid(i)); |
| } |
| } |
| } |
| } |
| /* Clear all existing events & flush `facil_cycle` out. */ |
| { |
| facil_data->spindown = 1; |
| uint16_t old_active = facil_data->active; |
| facil_data->active = 0; |
| defer_perform(); |
| facil_data->active = old_active; |
| facil_data->spindown = 0; |
| } |
| /* call any external startup callbacks. */ |
| facil_external_init2(); |
| /* add cycling to the defer queue to setup the reactor pattern. */ |
| facil_data->need_review = 1; |
| defer(facil_cycle, NULL, NULL); |
| /* Call the on_start callbacks. */ |
| facil_core_callback_force(FIO_CALL_ON_START); |
| |
| if (FACIL_PRINT_STATE) { |
| if (sentinel || facil_data->parent == getpid()) { |
| fprintf(stderr, |
| "Server is running %u %s X %u %s, press ^C to stop\n" |
| "* Detected capacity: %zd open file limit\n" |
| "* Root pid: %d\n", |
| facil_data->active, facil_data->active > 1 ? "workers" : "worker", |
| facil_data->threads, |
| facil_data->threads > 1 ? "threads" : "thread", |
| facil_data->capacity, facil_data->parent); |
| } else { |
| defer(print_pid, NULL, NULL); |
| } |
| } |
| facil_data->thread_pool = |
| defer_pool_start((sentinel ? 1 : facil_data->threads)); |
| if (facil_data->thread_pool) |
| defer_pool_wait(facil_data->thread_pool); |
| } |
| |
| static void facil_worker_cleanup(void) { |
| facil_data->active = 0; |
| facil_cluster_signal_children(); |
| facil_core_callback_force(FIO_CALL_ON_SHUTDOWN); |
| for (int i = 0; i <= facil_data->capacity; ++i) { |
| intptr_t uuid; |
| if ((uuid = sock_fd2uuid(i)) >= 0) { |
| defer(deferred_on_shutdown, (void *)uuid, NULL); |
| } |
| } |
| facil_data->thread_pool = defer_pool_start(facil_data->threads); |
| if (facil_data->thread_pool) { |
| defer(facil_cycle_unwind, NULL, NULL); |
| defer_pool_wait(facil_data->thread_pool); |
| facil_data->thread_pool = NULL; |
| } |
| fprintf(stderr, "* %d cleaning up.\n", getpid()); |
| /* close leftovers */ |
| for (int i = 0; i <= facil_data->capacity; ++i) { |
| intptr_t uuid; |
| if (fd_data(i).protocol && (uuid = sock_fd2uuid(i)) >= 0) { |
| sock_force_close(uuid); |
| } |
| } |
| defer_perform(); |
| |
| if (facil_data->parent == getpid()) { |
| kill(0, SIGINT); |
| while (wait(NULL) != -1) |
| ; |
| facil_external_cleanup(); |
| } |
| facil_core_callback_force(FIO_CALL_ON_FINISH); |
| defer_perform(); |
| evio_close(); |
| facil_external_cleanup(); |
| |
| if (facil_data->parent == getpid() && FACIL_PRINT_STATE) { |
| fprintf(stderr, "\n --- Shutdown Complete ---\n"); |
| } |
| } |
| |
| static spn_lock_i fio_fork_lock = SPN_LOCK_INIT; |
| |
| static void facil_sentinel_task(void *arg1, void *arg2); |
| static void *facil_sentinel_worker_thread(void *arg) { |
| errno = 0; |
| pid_t child = facil_fork(); |
| /* release fork lock. */ |
| spn_unlock(&fio_fork_lock); |
| if (child == -1) { |
| perror("FATAL ERROR: couldn't spawn worker."); |
| kill(facil_parent_pid(), SIGINT); |
| facil_stop(); |
| return NULL; |
| } else if (child) { |
| int status; |
| waitpid(child, &status, 0); |
| #if DEBUG |
| if (facil_data->active) { /* !WIFEXITED(status) || WEXITSTATUS(status) */ |
| if (!WIFEXITED(status) || WEXITSTATUS(status)) { |
| fprintf(stderr, |
| "FATAL ERROR: Child worker (%d) crashed. Stopping services in " |
| "DEBUG mode.\n", |
| child); |
| facil_core_callback_force(FIO_CALL_ON_CHILD_CRUSH); |
| } else { |
| fprintf( |
| stderr, |
| "INFO (FATAL): Child worker (%d) shutdown. Stopping services in " |
| "DEBUG mode.\n", |
| child); |
| } |
| kill(0, SIGINT); |
| } |
| #else |
| if (facil_data->active) { |
| /* don't call any functions while forking. */ |
| spn_lock(&fio_fork_lock); |
| if (!WIFEXITED(status) || WEXITSTATUS(status)) { |
| fprintf(stderr, |
| "ERROR: Child worker (%d) crashed. Respawning worker.\n", |
| child); |
| facil_core_callback_force(FIO_CALL_ON_CHILD_CRUSH); |
| } else { |
| fprintf(stderr, |
| "INFO: Child worker (%d) shutdown. Respawning worker.\n", |
| child); |
| } |
| defer(facil_sentinel_task, NULL, NULL); |
| spn_unlock(&fio_fork_lock); |
| } |
| #endif |
| } else { |
| facil_core_callback_force(FIO_CALL_AFTER_FORK); |
| facil_core_callback_force(FIO_CALL_IN_CHILD); |
| facil_worker_startup(0); |
| facil_worker_cleanup(); |
| exit(0); |
| } |
| return NULL; |
| (void)arg; |
| } |
| |
| #if FIO_SENTINEL_USE_PTHREAD |
| static void facil_sentinel_task(void *arg1, void *arg2) { |
| if (!facil_data->active) |
| return; |
| spn_lock(&fio_fork_lock); |
| pthread_t sentinel; |
| if (pthread_create(&sentinel, NULL, facil_sentinel_worker_thread, |
| (void *)&fio_fork_lock)) { |
| perror("FATAL ERROR: couldn't start sentinel thread"); |
| exit(errno); |
| } |
| pthread_detach(sentinel); |
| spn_lock(&fio_fork_lock); /* will wait for worker thread to release lock. */ |
| spn_unlock(&fio_fork_lock); |
| facil_core_callback_force(FIO_CALL_AFTER_FORK); |
| (void)arg1; |
| (void)arg2; |
| } |
| #else |
| static void facil_sentinel_task(void *arg1, void *arg2) { |
| if (!facil_data->active) |
| return; |
| facil_core_callback_force(FIO_CALL_BEFORE_FORK); |
| spn_lock(&fio_fork_lock); /* will wait for worker thread to release lock. */ |
| void *thrd = |
| defer_new_thread(facil_sentinel_worker_thread, (void *)&fio_fork_lock); |
| defer_free_thread(thrd); |
| spn_lock(&fio_fork_lock); /* will wait for worker thread to release lock. */ |
| spn_unlock(&fio_fork_lock); |
| facil_core_callback_force(FIO_CALL_AFTER_FORK); |
| (void)arg1; |
| (void)arg2; |
| } |
| #endif |
| |
| /* handles the SIGUSR1, SIGINT and SIGTERM signals. */ |
| static void sig_int_handler(int sig) { |
| switch (sig) { |
| #if !FACIL_DISABLE_HOT_RESTART |
| case SIGUSR1: |
| facil_signal_children_flag = 1; |
| break; |
| #endif |
| case SIGINT: /* fallthrough */ |
| case SIGTERM: /* fallthrough */ |
| facil_stop(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| /* setup handling for the SIGUSR1, SIGPIPE, SIGINT and SIGTERM signals. */ |
| static void facil_setup_signal_handler(void) { |
| /* setup signal handling */ |
| struct sigaction act, old; |
| |
| act.sa_handler = sig_int_handler; |
| sigemptyset(&act.sa_mask); |
| act.sa_flags = SA_RESTART | SA_NOCLDSTOP; |
| |
| if (sigaction(SIGINT, &act, &old)) { |
| perror("couldn't set signal handler"); |
| return; |
| }; |
| |
| if (sigaction(SIGTERM, &act, &old)) { |
| perror("couldn't set signal handler"); |
| return; |
| }; |
| #if !FACIL_DISABLE_HOT_RESTART |
| if (sigaction(SIGUSR1, &act, &old)) { |
| perror("couldn't set signal handler"); |
| return; |
| }; |
| #endif |
| |
| act.sa_handler = SIG_IGN; |
| if (sigaction(SIGPIPE, &act, &old)) { |
| perror("couldn't set signal handler"); |
| return; |
| }; |
| } |
| |
| /* |
| * Zombie Reaping |
| * With thanks to Dr Graham D Shaw. |
| * http://www.microhowto.info/howto/reap_zombie_processes_using_a_sigchld_handler.html |
| */ |
| static void reap_child_handler(int sig) { |
| (void)(sig); |
| int old_errno = errno; |
| while (waitpid(-1, NULL, WNOHANG) > 0) |
| ; |
| errno = old_errno; |
| } |
| |
| /* initializes zombie reaping for the process */ |
| void facil_reap_children(void) { |
| struct sigaction sa; |
| sa.sa_handler = reap_child_handler; |
| sigemptyset(&sa.sa_mask); |
| sa.sa_flags = SA_RESTART | SA_NOCLDSTOP; |
| if (sigaction(SIGCHLD, &sa, 0) == -1) { |
| perror("Child reaping initialization failed"); |
| kill(0, SIGINT); |
| exit(errno); |
| } |
| } |
| |
| /** returns facil.io's parent (root) process pid. */ |
| pid_t facil_parent_pid(void) { |
| if (!facil_data) |
| facil_lib_init(); |
| return facil_data->parent; |
| } |
| |
| static inline size_t facil_detect_cpu_cores(void) { |
| ssize_t cpu_count = 0; |
| #ifdef _SC_NPROCESSORS_ONLN |
| cpu_count = sysconf(_SC_NPROCESSORS_ONLN); |
| if (cpu_count < 0) { |
| if (FACIL_PRINT_STATE) { |
| fprintf(stderr, "WARNING: CPU core count auto-detection failed.\n"); |
| } |
| return 0; |
| } |
| #else |
| if (1 || FACIL_PRINT_STATE) { |
| fprintf(stderr, "WARNING: CPU core count auto-detection failed.\n"); |
| } |
| #endif |
| return cpu_count; |
| } |
| |
| /** |
| * Returns the number of expected threads / processes to be used by facil.io. |
| * |
| * The pointers should start with valid values that match the expected threads / |
| * processes values passed to `facil_run`. |
| * |
| * The data in the pointers will be overwritten with the result. |
| */ |
| void facil_expected_concurrency(int16_t *threads, int16_t *processes) { |
| if (!threads || !processes) |
| return; |
| if (!*threads && !*processes) { |
| /* both options set to 0 - default to cores*cores matrix */ |
| ssize_t cpu_count = facil_detect_cpu_cores(); |
| #if FACIL_CPU_CORES_LIMIT |
| if (cpu_count > FACIL_CPU_CORES_LIMIT) { |
| static int print_cores_warning = 1; |
| if (print_cores_warning) { |
| fprintf( |
| stderr, |
| "INFO: Detected %zu cores. Capping auto-detection of cores " |
| "to %zu.\n" |
| " Avoid this message by setting threads / workers manually.\n" |
| " To increase auto-detection limit, recompile with:\n" |
| " -DFACIL_CPU_CORES_LIMIT=%zu \n", |
| (size_t)cpu_count, (size_t)FACIL_CPU_CORES_LIMIT, |
| (size_t)cpu_count); |
| print_cores_warning = 0; |
| } |
| cpu_count = FACIL_CPU_CORES_LIMIT; |
| } |
| #endif |
| *threads = *processes = (int16_t)cpu_count; |
| if (cpu_count > FACIL_CPU_CORES_LIMIT) { |
| /* leave a core available for the kernel */ |
| --(*processes); |
| } |
| } else if (*threads < 0 || *processes < 0) { |
| /* Set any option that is less than 0 be equal to cores/value */ |
| /* Set any option equal to 0 be equal to the other option in value */ |
| ssize_t cpu_count = facil_detect_cpu_cores(); |
| size_t cpu_adjust = (*processes <= 0 ? 1 : 0); |
| |
| if (cpu_count > 0) { |
| int16_t tmp_threads = 0; |
| if (*threads < 0) |
| tmp_threads = (int16_t)(cpu_count / (*threads * -1)); |
| else if (*threads == 0) |
| tmp_threads = -1 * *processes; |
| else |
| tmp_threads = *threads; |
| if (*processes < 0) |
| *processes = (int16_t)(cpu_count / (*processes * -1)); |
| else if (*processes == 0) |
| *processes = -1 * *threads; |
| *threads = tmp_threads; |
| if (cpu_adjust && (*processes * tmp_threads) >= cpu_count && |
| cpu_count > FACIL_CPU_CORES_LIMIT) { |
| /* leave a core available for the kernel */ |
| --*processes; |
| } |
| } |
| } |
| |
| /* make sure we have at least one process and at least one thread */ |
| if (*processes <= 0) |
| *processes = 1; |
| if (*threads <= 0) |
| *threads = 1; |
| } |
| |
| #undef facil_run |
| void facil_run(struct facil_run_args args) { |
| signal(SIGPIPE, SIG_IGN); |
| if (!facil_data) { |
| facil_lib_init(); |
| } |
| |
| /* compute actual concurrency */ |
| facil_expected_concurrency(&args.threads, &args.processes); |
| |
| /* listen to SIGINT / SIGTERM */ |
| facil_setup_signal_handler(); |
| |
| /* activate facil, fork if needed */ |
| facil_data->active = (uint16_t)args.processes; |
| facil_data->threads = (uint16_t)args.threads; |
| |
| /* call any pre-start callbacks*/ |
| facil_core_callback_force(FIO_CALL_PRE_START); |
| |
| /* initialize cluster */ |
| if (args.processes > 1) { |
| for (int i = 0; i < args.processes && facil_data->active; ++i) { |
| facil_sentinel_task(NULL, NULL); |
| } |
| facil_worker_startup(1); |
| } else { |
| facil_worker_startup(0); |
| } |
| facil_worker_cleanup(); |
| } |
| |
| /** |
| * Returns the number of worker processes if facil.io is running. |
| * |
| * (1 is returned when in single process mode, otherwise the number of workers) |
| */ |
| int16_t facil_is_running(void) { return (facil_data ? facil_data->active : 0); } |
| |
| /* ***************************************************************************** |
| Setting the protocol |
| ***************************************************************************** */ |
| |
| /* managing the protocol pointer array and the `on_close` callback */ |
| static int facil_attach_state(intptr_t uuid, protocol_s *protocol, |
| protocol_metadata_s state) { |
| if (!facil_data) |
| facil_lib_init(); |
| if (protocol) { |
| if (!protocol->on_close) { |
| protocol->on_close = mock_on_close; |
| } |
| if (!protocol->on_data) { |
| protocol->on_data = mock_on_data; |
| } |
| if (!protocol->on_ready) { |
| protocol->on_ready = mock_on_ev; |
| } |
| if (!protocol->ping) { |
| protocol->ping = mock_ping; |
| } |
| if (!protocol->on_shutdown) { |
| protocol->on_shutdown = mock_on_shutdown; |
| } |
| prt_meta(protocol) = state; |
| } |
| spn_lock(&uuid_data(uuid).lock); |
| if (!sock_isvalid(uuid)) { |
| spn_unlock(&uuid_data(uuid).lock); |
| if (protocol) |
| defer(deferred_on_close, (void *)uuid, protocol); |
| if (uuid == -1) |
| errno = EBADF; |
| else |
| errno = ENOTCONN; |
| return -1; |
| } |
| struct connection_data_s old_data = uuid_data(uuid); |
| uuid_data(uuid).protocol = protocol; |
| uuid_data(uuid).active = facil_data->last_cycle.tv_sec; |
| spn_unlock(&uuid_data(uuid).lock); |
| if (old_data.protocol) { |
| defer(deferred_on_close, (void *)uuid, old_data.protocol); |
| } else if (evio_isactive() && protocol) { |
| return evio_add(sock_uuid2fd(uuid), (void *)uuid); |
| } |
| return 0; |
| } |
| |
| /** Attaches (or updates) a protocol object to a socket UUID. |
| * Returns -1 on error and 0 on success. |
| */ |
| int facil_attach(intptr_t uuid, protocol_s *protocol) { |
| return facil_attach_state(uuid, protocol, (protocol_metadata_s){.rsv = 0}); |
| } |
| |
| /** |
| * Attaches (or updates) a LOCKED protocol object to a socket UUID. |
| */ |
| int facil_attach_locked(intptr_t uuid, protocol_s *protocol) { |
| { |
| protocol_metadata_s state = {.rsv = 0}; |
| spn_lock(state.locks + FIO_PR_LOCK_TASK); |
| return facil_attach_state(uuid, protocol, state); |
| } |
| } |
| |
| /** Sets a timeout for a specific connection (only when running and valid). */ |
| void facil_set_timeout(intptr_t uuid, uint8_t timeout) { |
| if (sock_isvalid(uuid) && facil_data && facil_data->active) { |
| uuid_data(uuid).active = facil_data->last_cycle.tv_sec; |
| uuid_data(uuid).timeout = timeout; |
| } |
| } |
| /** Gets a timeout for a specific connection. Returns 0 if there's no set |
| * timeout or the connection is inactive. */ |
| uint8_t facil_get_timeout(intptr_t uuid) { return uuid_data(uuid).timeout; } |
| |
| /* ***************************************************************************** |
| Misc helpers |
| ***************************************************************************** */ |
| |
| /** |
| Returns the last time the server reviewed any pending IO events. |
| */ |
| struct timespec facil_last_tick(void) { |
| if (!facil_data) { |
| facil_lib_init(); |
| clock_gettime(CLOCK_REALTIME, &facil_data->last_cycle); |
| } |
| return facil_data->last_cycle; |
| } |
| |
| /** |
| * This function allows out-of-task access to a connection's `protocol_s` |
| * object by attempting to lock it. |
| */ |
| protocol_s *facil_protocol_try_lock(intptr_t uuid, |
| enum facil_protocol_lock_e type) { |
| if (!sock_isvalid(uuid) || !uuid_data(uuid).protocol) { |
| errno = EBADF; |
| return NULL; |
| } |
| return protocol_try_lock(sock_uuid2fd(uuid), type); |
| } |
| /** See `facil_protocol_try_lock` for details. */ |
| void facil_protocol_unlock(protocol_s *pr, enum facil_protocol_lock_e type) { |
| if (!pr) |
| return; |
| protocol_unlock(pr, type); |
| } |
| /** Counts all the connections of a specific type. */ |
| size_t facil_count(void *service) { |
| size_t count = 0; |
| for (intptr_t i = 0; i < facil_data->capacity; i++) { |
| void *tmp = NULL; |
| spn_lock(&fd_data(i).lock); |
| if (fd_data(i).protocol && fd_data(i).protocol->service) |
| tmp = (void *)fd_data(i).protocol->service; |
| spn_unlock(&fd_data(i).lock); |
| if (tmp != LISTENER_PROTOCOL_NAME && tmp != TIMER_PROTOCOL_NAME && |
| (!service || (tmp == service))) |
| count++; |
| } |
| return count; |
| } |
| |
| /* ***************************************************************************** |
| Task Management - `facil_defer`, `facil_each` |
| ***************************************************************************** */ |
| |
| struct task { |
| intptr_t origin; |
| void (*func)(intptr_t uuid, protocol_s *, void *arg); |
| void *arg; |
| void (*on_done)(intptr_t uuid, void *arg); |
| const void *service; |
| uint32_t count; |
| enum facil_protocol_lock_e task_type; |
| spn_lock_i lock; |
| }; |
| |
| static inline struct task *alloc_facil_task(void) { |
| return fio_malloc(sizeof(struct task)); |
| } |
| |
| static inline void free_facil_task(struct task *task) { fio_free(task); } |
| |
| static void mock_on_task_done(intptr_t uuid, void *arg) { |
| (void)uuid; |
| (void)arg; |
| } |
| |
| static void perform_single_task(void *v_uuid, void *v_task) { |
| struct task *task = v_task; |
| if (!uuid_data(v_uuid).protocol) |
| goto fallback; |
| protocol_s *pr = protocol_try_lock(sock_uuid2fd(v_uuid), task->task_type); |
| if (!pr) |
| goto defer; |
| if (pr->service == CONNECTOR_PROTOCOL_NAME) { |
| protocol_unlock(pr, task->task_type); |
| goto defer; |
| } |
| task->func((intptr_t)v_uuid, pr, task->arg); |
| protocol_unlock(pr, task->task_type); |
| free_facil_task(task); |
| return; |
| fallback: |
| task->on_done((intptr_t)v_uuid, task->arg); |
| free_facil_task(task); |
| return; |
| defer: |
| defer(perform_single_task, v_uuid, v_task); |
| return; |
| } |
| |
| static void finish_multi_task(void *v_fd, void *v_task) { |
| struct task *task = v_task; |
| if (spn_trylock(&task->lock)) |
| goto reschedule; |
| task->count--; |
| if (task->count) { |
| spn_unlock(&task->lock); |
| return; |
| } |
| task->on_done(task->origin, task->arg); |
| free_facil_task(task); |
| return; |
| reschedule: |
| defer(finish_multi_task, v_fd, v_task); |
| } |
| |
| static void perform_multi_task(void *v_fd, void *v_task) { |
| if (!fd_data((intptr_t)v_fd).protocol) { |
| finish_multi_task(v_fd, v_task); |
| return; |
| } |
| struct task *task = v_task; |
| protocol_s *pr = protocol_try_lock((intptr_t)v_fd, task->task_type); |
| if (!pr) |
| goto reschedule; |
| if (pr->service == task->service) { |
| const intptr_t uuid = sock_fd2uuid((int)(intptr_t)v_fd); |
| task->func(uuid, pr, task->arg); |
| } |
| protocol_unlock(pr, task->task_type); |
| defer(finish_multi_task, v_fd, v_task); |
| return; |
| reschedule: |
| // fprintf(stderr, "rescheduling multi for %p\n", v_fd); |
| defer(perform_multi_task, v_fd, v_task); |
| } |
| |
| static void schedule_multi_task(void *v_fd, void *v_task) { |
| struct task *task = v_task; |
| intptr_t fd = (intptr_t)v_fd; |
| for (size_t i = 0; i < 64; i++) { |
| if (!fd_data(fd).protocol) |
| goto finish; |
| if (spn_trylock(&fd_data(fd).lock)) |
| goto reschedule; |
| if (!fd_data(fd).protocol || |
| fd_data(fd).protocol->service != task->service || fd == task->origin) { |
| spn_unlock(&fd_data(fd).lock); |
| goto finish; |
| } |
| spn_unlock(&fd_data(fd).lock); |
| spn_lock(&task->lock); |
| task->count++; |
| spn_unlock(&task->lock); |
| defer(perform_multi_task, (void *)fd, task); |
| finish: |
| do { |
| fd++; |
| } while (!fd_data(fd).protocol && (fd < facil_data->capacity)); |
| if (fd >= (intptr_t)facil_data->capacity) |
| goto complete; |
| } |
| reschedule: |
| schedule_multi_task((void *)fd, v_task); |
| return; |
| complete: |
| defer(finish_multi_task, NULL, v_task); |
| } |
| /** |
| * Schedules a protected connection task. The task will run within the |
| * connection's lock. |
| * |
| * If the connection is closed before the task can run, the |
| * `fallback` task wil be called instead, allowing for resource cleanup. |
| */ |
| #undef facil_defer |
| void facil_defer(struct facil_defer_args_s args) { |
| if (!args.fallback) |
| args.fallback = mock_on_task_done; |
| if (!args.type) |
| args.type = FIO_PR_LOCK_TASK; |
| if (!args.task || !uuid_data(args.uuid).protocol || args.uuid < 0 || |
| !sock_isvalid(args.uuid)) |
| goto error; |
| struct task *task = alloc_facil_task(); |
| if (!task) |
| goto error; |
| *task = (struct task){ |
| .func = args.task, .arg = args.arg, .on_done = args.fallback}; |
| defer(perform_single_task, (void *)args.uuid, task); |
| return; |
| error: |
| defer((void (*)(void *, void *))args.fallback, (void *)args.uuid, args.arg); |
| } |
| |
| /** |
| * Schedules a protected connection task for each `service` connection. |
| * The tasks will run within each of the connection's locks. |
| * |
| * Once all the tasks were performed, the `on_complete` callback will be called. |
| */ |
| #undef facil_each |
| int facil_each(struct facil_each_args_s args) { |
| if (!args.on_complete) |
| args.on_complete = mock_on_task_done; |
| if (!args.task_type) |
| args.task_type = FIO_PR_LOCK_TASK; |
| if (!args.task) |
| goto error; |
| struct task *task = alloc_facil_task(); |
| if (!task) |
| goto error; |
| *task = (struct task){.origin = args.origin, |
| .func = args.task, |
| .arg = args.arg, |
| .on_done = args.on_complete, |
| .service = args.service, |
| .task_type = args.task_type, |
| .count = 1}; |
| defer(schedule_multi_task, (void *)0, task); |
| return 0; |
| error: |
| defer((void (*)(void *, void *))args.on_complete, (void *)args.origin, |
| args.arg); |
| return -1; |
| } |