| /* |
| copyright: Boaz segev, 2016 |
| license: MIT |
| |
| Feel free to copy, use and enjoy according to the license provided. |
| */ |
| #ifndef _GNU_SOURCE |
| #define _GNU_SOURCE |
| #endif |
| #include "libserver.h" |
| #include <string.h> |
| #include <stdatomic.h> |
| #include <signal.h> |
| #include <pthread.h> |
| #include <sys/mman.h> |
| #include <sys/wait.h> |
| #include <errno.h> |
| |
| #ifdef __has_include |
| #if __has_include(<x86intrin.h>) |
| #include <x86intrin.h> |
| #define HAVE_X86Intrin |
| #define sched_yield() _mm_pause() |
| // see: https://software.intel.com/en-us/node/513411 |
| // and: https://software.intel.com/sites/landingpage/IntrinsicsGuide/ |
| #endif |
| #endif |
| |
| /* ***************************************************************************** |
| Connection Data |
| */ |
| typedef struct { |
| protocol_s* protocol; |
| time_t active; |
| uint8_t timeout; |
| atomic_bool lock; |
| } fd_data_s; |
| |
| /* |
| These macros mean we won't need to change code if we change the locking system. |
| */ |
| |
| #define lock_fd_init(fd) atomic_store(&(fd)->lock, 0) |
| #define lock_fd_destroy(fd) atomic_store(&(fd)->lock, 0) |
| /** returns 0 on success, value on failure */ |
| #define try_lock_fd(fd) atomic_exchange(&(fd)->lock, 1) |
| #define lock_fd(fd) \ |
| { \ |
| while (try_lock_fd(fd)) \ |
| sched_yield(); \ |
| } |
| #define unlock_fd(fd) atomic_store(&(fd)->lock, 0) |
| #define clear_fd_data(fd_data) \ |
| { *(fd_data) = (fd_data_s){.lock = (fd_data)->lock}; } |
| |
| /* ***************************************************************************** |
| Server Core Data |
| */ |
| static struct { |
| fd_data_s* fds; |
| time_t last_tick; |
| void (*on_idle)(void); |
| size_t capacity; |
| uint8_t running; |
| } server_data = {.fds = NULL}; |
| |
| /* |
| These macros help prevent code changes when changing the data struct. |
| */ |
| |
| #define valid_uuid(uuid) sock_isvalid(uuid) |
| |
| #define fd_data(fd) server_data.fds[(fd)] |
| |
| #define uuid_data(uuid) fd_data(sock_uuid2fd(uuid)) |
| |
| #define clear_uuid(uuid) clear_fd_data(server_data.fds + sock_uuid2fd(uuid)) |
| |
| #define protocol_fd(fd) (server_data.fds[(fd)].protocol) |
| #define protocol_uuid(uuid) protocol_fd(sock_uuid2fd(uuid)) |
| |
| #define fduuid_get(ifd) (server_data.fds[(ifd)].uuid) |
| |
| #define protocol_is_busy(protocol) \ |
| atomic_load(&(((protocol_s*)(protocol))->callback_lock)) |
| #define protocol_unset_busy(protocol) \ |
| atomic_store(&(((protocol_s*)(protocol))->callback_lock), 0) |
| #define protocol_set_busy(protocol) \ |
| atomic_exchange(&(((protocol_s*)(protocol))->callback_lock), 1) |
| |
| #define try_lock_uuid(uuid) try_lock_fd(server_data.fds + sock_uuid2fd(uuid)) |
| #define lock_uuid(uuid) lock_fd(server_data.fds + sock_uuid2fd(uuid)) |
| #define unlock_uuid(uuid) unlock_fd(server_data.fds + sock_uuid2fd(uuid)) |
| |
| // run through any open sockets and call the shutdown handler |
| static inline void server_on_shutdown(void) { |
| if (server_data.fds && server_data.capacity > 0) { |
| for (size_t i = 0; i < server_data.capacity; i++) { |
| if (server_data.fds[i].protocol == NULL) |
| continue; |
| intptr_t uuid = sock_fd2uuid(i); |
| if (uuid != -1) { |
| if (server_data.fds[i].protocol->on_shutdown != NULL) |
| server_data.fds[i].protocol->on_shutdown(uuid, |
| server_data.fds[i].protocol); |
| sock_close(uuid); |
| sock_flush_strong(uuid); |
| } |
| } |
| } |
| } |
| |
| static void server_cleanup(void) { |
| // run through any open sockets and call the shutdown handler |
| server_on_shutdown(); |
| // free any lock objects (no need to change code if changing locking systems) |
| for (size_t i = 0; i < server_data.capacity - 1; i++) { |
| server_data.fds[i] = (fd_data_s){0}; |
| lock_fd_destroy(server_data.fds + i); |
| } |
| // free memory |
| if (server_data.fds) { |
| munmap(server_data.fds, sizeof(fd_data_s) * server_data.capacity); |
| server_data.fds = NULL; |
| } |
| } |
| |
| static void init_server(void) { |
| pthread_mutex_t inner_lock = PTHREAD_MUTEX_INITIALIZER; |
| pthread_mutex_lock(&inner_lock); |
| if (server_data.fds == NULL) { |
| // initialize libsock, but NOT libreact (which can't be forked) |
| sock_lib_init(); |
| server_data.capacity = sock_max_capacity(); |
| atexit(server_cleanup); |
| server_data.fds = mmap(NULL, sizeof(fd_data_s) * server_data.capacity, |
| PROT_READ | PROT_WRITE | PROT_EXEC, |
| MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); |
| for (size_t i = 0; i < server_data.capacity - 1; i++) { |
| server_data.fds[i] = (fd_data_s){0}; |
| lock_fd_init(server_data.fds + i); |
| } |
| } |
| pthread_mutex_unlock(&inner_lock); |
| } |
| |
| /** initializes the library if it wasn't already initialized. */ |
| #define validate_mem() \ |
| { \ |
| if (server_data.fds == NULL) \ |
| init_server(); \ |
| } |
| |
| /* ***************************************************************************** |
| `libsock` Callback Implementation |
| */ |
| |
| void sock_touch(intptr_t uuid) { |
| if (server_data.fds != NULL) |
| uuid_data(uuid).active = server_data.last_tick; |
| } |
| |
| /* ***************************************************************************** |
| The Reactor Callback Implementation |
| */ |
| |
| void reactor_on_close_async(void* _pr) { |
| if (protocol_set_busy(_pr) == 0) { |
| ((protocol_s*)_pr)->on_close(_pr); |
| return; |
| } |
| async_run(reactor_on_close_async, _pr); |
| } |
| |
| void reactor_on_close(intptr_t uuid) { |
| if (server_data.fds) { |
| // get the currect state |
| lock_uuid(uuid); |
| protocol_s* protocol = protocol_uuid(uuid); |
| // clear state |
| clear_uuid(uuid); |
| unlock_uuid(uuid); |
| // call callback |
| if (protocol && protocol->on_close) |
| reactor_on_close_async(protocol); |
| } |
| } |
| |
| void reactor_on_data_async(void* _fduuid) { |
| intptr_t fduuid = (intptr_t)_fduuid; |
| if (!valid_uuid(fduuid) || protocol_uuid(fduuid) == NULL) |
| return; |
| // try to lock the socket |
| if (try_lock_uuid(fduuid)) |
| goto no_lock; |
| // get current state (protocol might have changed during this time) |
| protocol_s* protocol = protocol_uuid(fduuid); |
| // review protocol and get use privilage |
| if (protocol == NULL || protocol_set_busy(protocol)) { |
| // fprintf(stderr, "fduuid is busy %p\n", _fduuid); |
| unlock_uuid(fduuid); |
| goto no_lock; |
| } |
| // unlock |
| unlock_uuid(fduuid); |
| // fire event |
| if (protocol && protocol->on_data) |
| protocol->on_data(fduuid, protocol); |
| // clear the original busy flag |
| protocol_unset_busy(protocol); |
| return; |
| no_lock: |
| // fprintf(stderr, "no lock for %p\n", _fduuid); |
| // failed to aquire lock / busy |
| async_run(reactor_on_data_async, _fduuid); |
| } |
| |
| void reactor_on_data(intptr_t fd) { |
| async_run(reactor_on_data_async, (void*)fd); |
| } |
| |
| void reactor_on_ready(intptr_t uuid) { |
| uuid_data(uuid).active = server_data.last_tick; |
| lock_uuid(uuid); |
| protocol_s* protocol = protocol_uuid(uuid); |
| unlock_uuid(uuid); |
| if (protocol && protocol->on_ready) |
| protocol->on_ready(uuid, protocol); |
| } |
| |
| /* ***************************************************************************** |
| Zombie Reaping |
| With thanks to Dr Graham D Shaw. |
| http://www.microhowto.info/howto/reap_zombie_processes_using_a_sigchld_handler.html |
| */ |
| |
| void reap_child_handler(int sig) { |
| int old_errno = errno; |
| while (waitpid(-1, NULL, WNOHANG) > 0) |
| ; |
| errno = old_errno; |
| } |
| |
| inline static void reap_children(void) { |
| struct sigaction sa; |
| sa.sa_handler = reap_child_handler; |
| sigemptyset(&sa.sa_mask); |
| sa.sa_flags = SA_RESTART | SA_NOCLDSTOP; |
| if (sigaction(SIGCHLD, &sa, 0) == -1) { |
| perror("Child reaping initialization failed"); |
| exit(1); |
| } |
| } |
| |
| /* ***************************************************************************** |
| Exit Signal Handling |
| */ |
| |
| static void stop_server_handler(int sig) { |
| server_data.running = 0; |
| async_signal(); |
| #if defined(SERVER_PRINT_STATE) && SERVER_PRINT_STATE == 1 |
| fprintf(stderr, " --- Stop signal received ---\n"); |
| #endif |
| signal(sig, SIG_DFL); |
| } |
| |
| inline static void listen_for_stop_signal(void) { |
| struct sigaction sa; |
| sa.sa_handler = stop_server_handler; |
| sigemptyset(&sa.sa_mask); |
| sa.sa_flags = SA_RESTART | SA_NOCLDSTOP; |
| if (sigaction(SIGINT, &sa, 0) || sigaction(SIGTERM, &sa, 0)) { |
| perror("Signal registration failed"); |
| exit(2); |
| } |
| } |
| |
| /* ***************************************************************************** |
| The Listenning Protocol |
| */ |
| |
| static const char* listener_protocol_name = "listening protocol __internal__"; |
| |
| struct ListenerProtocol { |
| protocol_s protocol; |
| protocol_s* (*on_open)(intptr_t uuid, void* udata); |
| void* udata; |
| void (*on_start)(void* udata); |
| void (*on_finish)(void* udata); |
| }; |
| |
| static void listener_on_data(intptr_t uuid, protocol_s* _listener) { |
| intptr_t new_client; |
| struct ListenerProtocol* listener = (void*)_listener; |
| while ((new_client = sock_accept(uuid)) != -1) { |
| // make sure it's a clean slate... although it should be assumed to be. |
| lock_uuid(new_client); |
| clear_uuid(new_client); |
| unlock_uuid(new_client); |
| // assume that sock_accept calls reactor_on_close if needed |
| protocol_uuid(new_client) = listener->on_open(new_client, listener->udata); |
| if (protocol_uuid(new_client)) { |
| uuid_data(new_client).active = server_data.last_tick; |
| atomic_store(&protocol_uuid(new_client)->callback_lock, 0); |
| reactor_add(new_client); |
| continue; |
| } else { |
| sock_close(new_client); |
| } |
| } |
| } |
| |
| static void free_listenner(void* _li) { |
| free(_li); |
| } |
| |
| static void listener_on_close(protocol_s* _listener) { |
| if (((struct ListenerProtocol*)_listener)->on_finish) |
| ((struct ListenerProtocol*)_listener) |
| ->on_finish(((struct ListenerProtocol*)_listener)->udata); |
| free_listenner(_listener); |
| } |
| |
| static inline struct ListenerProtocol* listener_alloc( |
| struct ServerServiceSettings settings) { |
| struct ListenerProtocol* listener = malloc(sizeof(*listener)); |
| if (listener) { |
| *listener = (struct ListenerProtocol){ |
| .protocol.service = listener_protocol_name, |
| .protocol.on_data = listener_on_data, |
| .protocol.on_close = listener_on_close, |
| .on_open = settings.on_open, |
| .udata = settings.udata, |
| .on_start = settings.on_start, |
| .on_finish = settings.on_finish, |
| }; |
| return listener; |
| } |
| return NULL; |
| } |
| |
| inline static void listener_on_server_start(void) { |
| for (size_t i = 0; i < server_data.capacity; i++) { |
| if (protocol_fd(i) && protocol_fd(i)->service == listener_protocol_name) { |
| if (reactor_add(sock_fd2uuid(i))) |
| perror("Couldn't register listenning socket"), exit(4); |
| // call the on_init callback |
| if (((struct ListenerProtocol*)protocol_fd(i))->on_start) |
| ((struct ListenerProtocol*)protocol_fd(i)) |
| ->on_start(((struct ListenerProtocol*)protocol_fd(i))->udata); |
| } |
| } |
| } |
| inline static void listener_on_server_shutdown(void) { |
| for (size_t i = 0; i < server_data.capacity; i++) { |
| if (protocol_fd(i) && protocol_fd(i)->service == listener_protocol_name) { |
| sock_close(sock_fd2uuid(i)); |
| } |
| } |
| } |
| |
| /* ***************************************************************************** |
| * The timer protocol |
| */ |
| |
| /* ******* |
| Timer Protocol |
| ******* */ |
| typedef struct { |
| protocol_s protocol; |
| size_t milliseconds; |
| size_t repetitions; |
| void (*task)(void*); |
| void (*on_finish)(void*); |
| void* arg; |
| } timer_protocol_s; |
| |
| #define prot2timer(protocol) (*((timer_protocol_s*)(protocol))) |
| |
| const char* timer_protocol_name = "timer protocol __internal__"; |
| |
| static void timer_on_data(intptr_t uuid, protocol_s* protocol) { |
| prot2timer(protocol).task(prot2timer(protocol).arg); |
| if (prot2timer(protocol).repetitions) { |
| prot2timer(protocol).repetitions -= 1; |
| if (prot2timer(protocol).repetitions == 0) { |
| // fprintf(stderr, "closing timer?\n"); |
| reactor_remove_timer(uuid); |
| sock_force_close(uuid); |
| } |
| } |
| reactor_reset_timer(uuid); |
| } |
| |
| static void timer_on_close(protocol_s* protocol) { |
| // fprintf(stderr, "timer closed\n"); |
| if (prot2timer(protocol).on_finish) |
| prot2timer(protocol).on_finish(prot2timer(protocol).arg); |
| free(protocol); |
| } |
| |
| static inline timer_protocol_s* timer_alloc(void (*task)(void*), |
| void* arg, |
| size_t milliseconds, |
| size_t repetitions, |
| void (*on_finish)(void*)) { |
| timer_protocol_s* t = malloc(sizeof(*t)); |
| if (t) |
| *t = (timer_protocol_s){ |
| .protocol.service = timer_protocol_name, |
| .protocol.on_data = timer_on_data, |
| .protocol.on_close = timer_on_close, |
| .arg = arg, |
| .task = task, |
| .on_finish = on_finish, |
| .milliseconds = milliseconds, |
| .repetitions = repetitions, |
| }; |
| return t; |
| } |
| |
| inline static void timer_on_server_start(void) { |
| for (size_t i = 0; i < server_data.capacity; i++) { |
| if (protocol_fd(i) && protocol_fd(i)->service == timer_protocol_name) { |
| if (reactor_add_timer(sock_fd2uuid(i), |
| prot2timer(protocol_fd(i)).milliseconds)) |
| perror("Couldn't register a required timed event."), exit(4); |
| } |
| } |
| } |
| |
| /* ***************************************************************************** |
| Reactor cycling and timeout handling |
| */ |
| |
| static inline void timeout_review(void) { |
| static time_t review = 0; |
| if (review >= server_data.last_tick) |
| return; |
| time(&review); |
| for (size_t i = 0; i < server_data.capacity; i++) { |
| if (protocol_fd(i) == NULL) |
| continue; // Protocol objects are required for open connections. |
| if (fd_data(i).timeout == 0) { |
| if (protocol_fd(i) && protocol_fd(i)->service != listener_protocol_name && |
| protocol_fd(i)->service != timer_protocol_name && |
| review - fd_data(i).active > 300) { |
| sock_close(sock_fd2uuid(i)); |
| } |
| continue; |
| } |
| if (fd_data(i).active + fd_data(i).timeout < review) { |
| if (protocol_fd(i)->ping) { |
| protocol_fd(i)->ping(sock_fd2uuid(i), protocol_fd(i)); |
| } else if (!protocol_is_busy(protocol_fd(i)) || |
| (review - fd_data(i).active > 300)) { |
| sock_close(sock_fd2uuid(i)); |
| } |
| } |
| } |
| } |
| |
| static void server_cycle(void* _) { |
| static int8_t perform_idle = 1; |
| time(&server_data.last_tick); |
| if (server_data.running) { |
| timeout_review(); |
| int e_count = reactor_review(); |
| if (e_count < 0) { |
| return; |
| } |
| if (e_count == 0) { |
| if (perform_idle && server_data.on_idle) |
| server_data.on_idle(); |
| perform_idle = 0; |
| } else { |
| perform_idle = 1; |
| } |
| async_run(server_cycle, NULL); |
| } |
| } |
| /* ***************************************************************************** |
| * The Server API |
| * (and helper functions) |
| */ |
| |
| /* ***************************************************************************** |
| * Server actions |
| */ |
| |
| #undef server_listen |
| #undef server_run |
| /** |
| Listens to a server with the following server settings (which MUST include |
| a default protocol). |
| |
| This method blocks the current thread until the server is stopped (either |
| though a `srv_stop` function or when a SIGINT/SIGTERM is received). |
| */ |
| int server_listen(struct ServerServiceSettings settings) { |
| validate_mem(); |
| if (settings.on_open == NULL || settings.port == NULL) |
| return -1; |
| intptr_t fduuid = sock_listen(settings.address, settings.port); |
| if (fduuid == -1) |
| return -1; |
| server_data.fds[sock_uuid2fd(fduuid)].protocol = |
| (void*)listener_alloc(settings); |
| if (server_data.fds[sock_uuid2fd(fduuid)].protocol == NULL) |
| goto error; |
| if (server_data.running && reactor_add(fduuid)) |
| goto error; |
| #if defined(SERVER_PRINT_STATE) && SERVER_PRINT_STATE == 1 |
| fprintf(stderr, "* Listenning on port %s\n", settings.port); |
| #endif |
| return 0; |
| error: |
| sock_close(fduuid); |
| return -1; |
| } |
| /** runs the server, hanging the current process and thread. */ |
| ssize_t server_run(struct ServerSettings settings) { |
| validate_mem(); |
| if (server_data.running) { |
| return -1; |
| } |
| reap_children(); |
| listen_for_stop_signal(); |
| server_data.running = 1; |
| server_data.on_idle = settings.on_idle; |
| if (settings.processes == 0) |
| settings.processes = 1; |
| |
| #if defined(SERVER_PRINT_STATE) && SERVER_PRINT_STATE == 1 |
| if (settings.threads == 0) |
| fprintf(stderr, |
| "* Running %lu processes" |
| " in single thread mode.\n", |
| settings.processes); |
| else |
| fprintf(stderr, |
| "* Running %lu processes" |
| " X %lu threads.\n", |
| settings.processes, settings.threads); |
| #endif |
| |
| pid_t rootpid = getpid(); |
| pid_t* children = NULL; |
| if (settings.processes > 1) { |
| children = malloc(sizeof(*children) * settings.processes); |
| for (size_t i = 0; i < settings.processes - 1; i++) { |
| if (fork() == 0) |
| break; |
| } |
| } |
| if (reactor_init() < 0) |
| perror("Reactor initialization failed"), exit(3); |
| listener_on_server_start(); |
| timer_on_server_start(); |
| #if defined(SERVER_PRINT_STATE) && SERVER_PRINT_STATE == 1 |
| fprintf(stderr, "* [%d] Running.\n", getpid()); |
| #endif |
| async_start(settings.threads); |
| if (settings.on_init) |
| settings.on_init(); |
| async_run(server_cycle, NULL); |
| if (settings.threads > 0) |
| async_join(); |
| else |
| async_perform(); |
| listener_on_server_shutdown(); |
| reactor_review(); |
| server_on_shutdown(); |
| if (settings.on_finish) |
| settings.on_finish(); |
| |
| if (children) { |
| if (rootpid == getpid()) { |
| while (waitpid(-1, NULL, 0) >= 0) |
| ; |
| } |
| free(children); |
| } |
| #if defined(SERVER_PRINT_STATE) && SERVER_PRINT_STATE == 1 |
| fprintf(stderr, "* [%d] Shutdown.\n", getpid()); |
| if (rootpid == getpid()) |
| fprintf(stderr, "* Shutdown process complete.\n"); |
| #endif |
| if (rootpid != getpid()) |
| exit(0); |
| |
| return 0; |
| } |
| |
| void server_stop(void) { |
| server_data.running = 0; |
| } |
| /** |
| Returns the last time the server reviewed any pending IO events. |
| */ |
| time_t server_last_tick(void) { |
| return server_data.last_tick; |
| } |
| |
| /* ***************************************************************************** |
| * Socket actions |
| */ |
| |
| /** |
| Sets a new active protocol object for the requested file descriptor. |
| |
| This also schedules the old protocol's `on_close` callback to run, making sure |
| all resources are released. |
| |
| Returns -1 on error (i.e. connection closed), otherwise returns 0. |
| */ |
| ssize_t server_switch_protocol(intptr_t fd, protocol_s* new_protocol) { |
| if (new_protocol == NULL || valid_uuid(fd) == 0) |
| return -1; |
| protocol_s* old_protocol; |
| lock_uuid(fd); |
| old_protocol = uuid_data(fd).protocol; |
| uuid_data(fd).protocol = new_protocol; |
| unlock_uuid(fd); |
| if (old_protocol && old_protocol->on_close) |
| reactor_on_close_async(old_protocol); |
| return 0; |
| } |
| /** |
| Gets the active protocol object for the requested file descriptor. |
| |
| Returns NULL on error (i.e. connection closed), otherwise returns a `protocol_s` |
| pointer. |
| */ |
| protocol_s* server_get_protocol(intptr_t uuid) { |
| if (valid_uuid(uuid) == 0) |
| return NULL; |
| protocol_s* protocol; |
| lock_uuid(uuid); |
| protocol = uuid_data(uuid).protocol; |
| unlock_uuid(uuid); |
| return protocol; |
| } |
| /** |
| Sets a connection's timeout. |
| |
| Returns -1 on error (i.e. connection closed), otherwise returns 0. |
| */ |
| void server_set_timeout(intptr_t fd, uint8_t timeout) { |
| if (valid_uuid(fd) == 0) |
| return; |
| lock_uuid(fd); |
| uuid_data(fd).timeout = timeout; |
| unlock_uuid(fd); |
| } |
| |
| /** Attaches an existing connection (fd) to the server's reactor and protocol |
| management system, so that the server can be used also to manage connection |
| based resources asynchronously (i.e. database resources etc'). |
| |
| On failure the fduuid_u.data.fd value will be -1. |
| */ |
| intptr_t server_attach(int fd, protocol_s* protocol) { |
| intptr_t uuid = sock_open(fd); |
| if (uuid == -1) |
| return -1; |
| protocol_fd(fd) = protocol; |
| if (reactor_add(uuid)) { |
| sock_close(uuid); |
| return -1; |
| } |
| return uuid; |
| } |
| /** Hijack a socket (file descriptor) from the server, clearing up it's |
| resources. The control of hte socket is totally relinquished. |
| |
| This method will block until all the data in the buffer is sent before |
| releasing control of the socket. |
| |
| The returned value is the fd for the socket, or -1 on error. |
| */ |
| int server_hijack(intptr_t uuid) { |
| if (sock_isvalid(uuid) == 0) |
| return -1; |
| reactor_remove(uuid); |
| sock_flush_strong(uuid); |
| if (sock_isvalid(uuid) == 0) |
| return -1; |
| protocol_s* old_protocol; |
| lock_uuid(uuid); |
| old_protocol = uuid_data(uuid).protocol; |
| uuid_data(uuid).protocol = NULL; |
| unlock_uuid(uuid); |
| if (old_protocol && old_protocol->on_close) |
| reactor_on_close_async(old_protocol); |
| return sock_uuid2fd(uuid); |
| } |
| /** Counts the number of connections for the specified protocol (NULL = all |
| protocols). */ |
| long server_count(char* service) { |
| long count = 0; |
| if (service == NULL) { |
| for (size_t i = 0; i < server_data.capacity; i++) { |
| if (protocol_fd(i) && protocol_fd(i)->service != listener_protocol_name && |
| protocol_fd(i)->service != timer_protocol_name) |
| count++; |
| } |
| } else { |
| for (size_t i = 0; i < server_data.capacity; i++) { |
| if (protocol_fd(i) && protocol_fd(i)->service == service) |
| count++; |
| } |
| } |
| return count; |
| } |
| |
| /* ***************************************************************************** |
| * Connection Tasks (each and deffer tactics implementations) |
| */ |
| |
| /* ******* |
| Task core data |
| ******* */ |
| typedef struct { |
| intptr_t origin; |
| intptr_t target; |
| const char* service; |
| void (*task)(intptr_t fd, protocol_s* protocol, void* arg); |
| void* on_finish; |
| void* arg; |
| } srv_task_s; |
| |
| /* Get task from void pointer. */ |
| #define p2task(task) (*((srv_task_s*)(task))) |
| |
| /* Get fallback callback from the task object. */ |
| #define task2fallback(task) \ |
| ((void (*)(intptr_t, void*))(p2task(task).on_finish)) |
| |
| /* Get on_finished callback from the task object. */ |
| #define task2on_done(task) \ |
| ((void (*)(intptr_t, protocol_s*, void*))(p2task(task).on_finish)) |
| /* allows for later implementation of a task pool with minimal code updates. */ |
| static inline srv_task_s* task_alloc(void) { |
| return malloc(sizeof(srv_task_s)); |
| } |
| |
| /* allows for later implementation of a task pool with minimal code updates. */ |
| static inline void task_free(srv_task_s* task) { |
| return free(task); |
| } |
| |
| /* performs a single connection task. */ |
| static void perform_single_task(void* task) { |
| if (sock_isvalid(p2task(task).target) == 0) { |
| if (p2task(task).on_finish) // an invalid connection fallback |
| task2fallback(task)(p2task(task).origin, p2task(task).arg); |
| task_free(task); |
| return; |
| } |
| if (try_lock_uuid(p2task(task).target) == 0) { |
| // get protocol |
| protocol_s* protocol = protocol_uuid(p2task(task).target); |
| if (protocol_set_busy(protocol) == 0) { |
| // clear the original busy flag |
| unlock_uuid(p2task(task).target); |
| p2task(task).task(p2task(task).target, protocol, p2task(task).arg); |
| protocol_unset_busy(protocol); |
| task_free(task); |
| return; |
| } |
| unlock_uuid(p2task(task).target); |
| } |
| async_run(perform_single_task, task); |
| } |
| |
| /* performs a connection group task. */ |
| static void perform_each_task(void* task) { |
| intptr_t uuid; |
| protocol_s* protocol; |
| while (p2task(task).target < server_data.capacity) { |
| uuid = sock_fd2uuid(p2task(task).target); |
| if (uuid == -1 || uuid == p2task(task).origin) { |
| ++p2task(task).target; |
| continue; |
| } |
| if (try_lock_uuid(uuid) == 0) { |
| protocol = protocol_uuid(uuid); |
| if (protocol == NULL || protocol->service != p2task(task).service) { |
| unlock_uuid(uuid); |
| ++p2task(task).target; |
| continue; |
| } else if (protocol_set_busy(protocol) == 0) { |
| // unlock uuid |
| unlock_uuid(uuid); |
| // perform task |
| p2task(task).task(uuid, protocol, p2task(task).arg); |
| // clear the busy flag |
| protocol_unset_busy(protocol); |
| // step forward |
| ++p2task(task).target; |
| continue; |
| } |
| // it's the right protocol and service, but we couldn't lock the protocol |
| unlock_uuid(uuid); |
| } |
| async_run(perform_each_task, task); |
| return; |
| } |
| if (p2task(task).on_finish) { // finished group task callback |
| task2on_done(task)( |
| p2task(task).origin, |
| (sock_isvalid(p2task(task).origin) ? protocol_uuid(p2task(task).origin) |
| : NULL), |
| p2task(task).arg); |
| } |
| task_free(task); |
| return; |
| } |
| |
| /* ******* |
| API |
| ******* */ |
| |
| /** |
| Schedules a specific task to run asyncronously for each connection (except the |
| origin connection) on a specific protocol. |
| */ |
| void server_each(intptr_t origin_fd, |
| const char* service, |
| void (*task)(intptr_t fd, protocol_s* protocol, void* arg), |
| void* arg, |
| void (*on_finish)(intptr_t fd, |
| protocol_s* protocol, |
| void* arg)) { |
| srv_task_s* t = NULL; |
| if (service == NULL || task == NULL) |
| goto error; |
| t = task_alloc(); |
| if (t == NULL) |
| goto error; |
| *t = (srv_task_s){.service = service, |
| .origin = origin_fd, |
| .task = task, |
| .on_finish = on_finish, |
| .arg = arg}; |
| if (async_run(perform_each_task, t)) |
| goto error; |
| return; |
| error: |
| if (t) |
| task_free(t); |
| if (on_finish) |
| on_finish(origin_fd, |
| (sock_isvalid(origin_fd) ? protocol_uuid(origin_fd) : NULL), arg); |
| } |
| |
| /** Schedules a specific task to run asyncronously for a specific connection. |
| */ |
| void server_task(intptr_t caller_fd, |
| void (*task)(intptr_t fd, protocol_s* protocol, void* arg), |
| void* arg, |
| void (*fallback)(intptr_t fd, void* arg)) { |
| srv_task_s* t = NULL; |
| if (task == NULL) |
| goto error; |
| t = task_alloc(); |
| if (t == NULL) |
| goto error; |
| *t = (srv_task_s){ |
| .target = caller_fd, .task = task, .on_finish = fallback, .arg = arg}; |
| if (async_run(perform_single_task, t)) |
| goto error; |
| return; |
| error: |
| if (t) |
| task_free(t); |
| if (fallback) |
| fallback(caller_fd, arg); |
| } |
| |
| /* ***************************************************************************** |
| * Timed tasks |
| */ |
| |
| /** Creates a system timer (at the cost of 1 file descriptor) and pushes the |
| timer to the reactor. The task will repeat `repetitions` times. if |
| `repetitions` is set to 0, task will repeat forever. Returns -1 on error |
| or the new file descriptor on succeess. |
| */ |
| int server_run_every(size_t milliseconds, |
| size_t repetitions, |
| void (*task)(void*), |
| void* arg, |
| void (*on_finish)(void*)) { |
| validate_mem(); |
| if (task == NULL) |
| return -1; |
| timer_protocol_s* protocol = NULL; |
| intptr_t uuid = -1; |
| int fd = reactor_make_timer(); |
| if (fd == -1) { |
| // perror("couldn't create a timer fd"); |
| goto error; |
| } |
| uuid = sock_open(fd); |
| if (uuid == -1) |
| goto error; |
| clear_uuid(uuid); |
| protocol = timer_alloc(task, arg, milliseconds, repetitions, on_finish); |
| if (protocol == NULL) |
| goto error; |
| protocol_fd(fd) = (protocol_s*)protocol; |
| if (server_data.running && reactor_add_timer(uuid, milliseconds)) |
| goto error; |
| return 0; |
| error: |
| if (uuid != -1) |
| sock_close(uuid); |
| else if (fd != -1) |
| close(fd); |
| |
| if (protocol != NULL) { |
| protocol_fd(fd) = NULL; |
| timer_on_close((protocol_s*)protocol); |
| } |
| return -1; |
| } |