| /* |
| copyright: Boaz segev, 2016 |
| license: MIT |
| |
| Feel free to copy, use and enjoy according to the license provided. |
| */ |
| |
| // lib server is based off and requires the following libraries: |
| #include "lib-server.h" |
| #include "libbuffer.h" |
| |
| // #include <ssl.h> |
| // sys includes |
| #include <stdlib.h> |
| #include <stdio.h> |
| #include <signal.h> |
| #include <unistd.h> |
| #include <sys/resource.h> |
| #include <sys/socket.h> |
| #include <sys/types.h> |
| #include <sys/wait.h> |
| #include <netdb.h> |
| #include <string.h> |
| #include <fcntl.h> |
| #include <pthread.h> |
| #include <errno.h> |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // socket binding and server limits helpers |
| static int bind_server_socket(struct Server*); |
| static int set_non_blocking_socket(int fd); |
| static long srv_capacity(void); |
| |
| // // async data sending buffer and helpers |
| // struct Buffer { |
| // struct Buffer* next; |
| // int fd; |
| // void* data; |
| // int len; |
| // int sent; |
| // unsigned notification : 1; |
| // unsigned moved : 1; |
| // unsigned final : 1; |
| // unsigned urgent : 1; |
| // unsigned destroy : 4; |
| // unsigned rsv : 3; |
| // }; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // The server data object container |
| struct Server { |
| // inherit the reactor (must be first in the struct, for pointer conformity). |
| struct Reactor reactor; |
| // a pointer to the server's settings object. |
| struct ServerSettings* settings; |
| // a pointer to the thread pool object (libasync). |
| struct Async* async; |
| // a mutex for server data integrity |
| pthread_mutex_t lock; |
| // maps each connection to it's protocol. |
| struct Protocol* volatile* protocol_map; |
| // Used to send the server data + sockfd number to any worker threads. |
| // pointer offset calculations are used to calculate the sockfd. |
| struct Server** server_map; |
| // maps each udata with it's associated connection fd. |
| void** udata_map; |
| // reading hook maps |
| ssize_t (**reading_hooks)(server_pt srv, int fd, void* buffer, size_t size); |
| /// maps a connection's "busy" flag, preventing the same connection from |
| /// running `on_data` on two threads. use busy[fd] to get the status of the |
| /// flag. |
| volatile char* busy; |
| /// maps all connection's timeout values. use tout[fd] to get/set the |
| /// timeout. |
| unsigned char* tout; |
| /// maps all connection's idle cycle count values. use idle[fd] to get/set |
| /// the count. |
| unsigned char* idle; |
| // a buffer map. |
| void** buffer_map; |
| // socket capacity |
| long capacity; |
| /// the last timeout review |
| time_t last_to; |
| /// the server socket |
| int srvfd; |
| /// the original process pid |
| pid_t root_pid; |
| /// the flag that tells the server to stop |
| volatile char run; |
| }; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Server API gateway |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| //////////////////////////////////////////////////////////////////////////////// |
| //////////////////////////////////////////////////////////////////////////////// |
| // Server API declerations |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Server settings and objects |
| |
| /** returns the originating process pid */ |
| static pid_t root_pid(struct Server* server); |
| /// allows direct access to the reactor object. use with care. |
| static struct Reactor* srv_reactor(struct Server* server); |
| /// allows direct access to the server's original settings. use with care. |
| static struct ServerSettings* srv_settings(struct Server* server); |
| /// returns the computed capacity for any server instance on the system. |
| static long srv_capacity(void); |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Server actions |
| |
| /** listens to a server with the following server settings (which MUST include |
| a default protocol). */ |
| static int srv_listen(struct ServerSettings); |
| /// stops a specific server, closing any open connections. |
| static void srv_stop(struct Server* server); |
| /// stops any and all server instances, closing any open connections. |
| static void srv_stop_all(void); |
| |
| // helpers |
| static void srv_cycle_core(server_pt server); |
| static int set_to_busy(server_pt server, int fd); |
| static void async_on_data(server_pt* p_server); |
| static void on_ready(struct Reactor* reactor, int fd); |
| static void on_shutdown(struct Reactor* reactor, int fd); |
| static void on_close(struct Reactor* reactor, int fd); |
| static void clear_conn_data(server_pt server, int fd); |
| static void accept_async(server_pt server); |
| |
| // signal management |
| static void register_server(struct Server* server); |
| static void on_signal(int sig); |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Socket settings and data |
| |
| /** Returns true if a specific connection's protected callback is running. |
| |
| Protected callbacks include only the `on_message` callback and tasks forwarded |
| to the connection using the `td_task` or `each` functions. |
| */ |
| static unsigned char is_busy(struct Server* server, int sockfd); |
| /// retrives the active protocol object for the requested file descriptor. |
| static struct Protocol* get_protocol(struct Server* server, int sockfd); |
| /// sets the active protocol object for the requested file descriptor. |
| static int set_protocol(struct Server* server, |
| int sockfd, |
| struct Protocol* new_protocol); |
| /** retrives an opaque pointer set by `set_udata` and associated with the |
| connection. |
| |
| since no new connections are expected on fd == 0..2, it's possible to store |
| global data in these locations. */ |
| static void* get_udata(struct Server* server, int sockfd); |
| /** Sets the opaque pointer to be associated with the connection. returns the |
| old pointer, if any. */ |
| static void* set_udata(struct Server* server, int sockfd, void* udata); |
| /** Sets the timeout limit for the specified connectionl, in seconds, up to |
| 255 seconds (the maximum allowed timeout count). */ |
| static void set_timeout(struct Server* server, |
| int sockfd, |
| unsigned char timeout); |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Socket actions |
| |
| /** Attaches an existing connection (fd) to the server's reactor and protocol |
| management system, so that the server can be used also to manage connection |
| based resources asynchronously (i.e. database resources etc'). |
| */ |
| static int srv_attach(struct Server* server, |
| int sockfd, |
| struct Protocol* protocol); |
| /** Closes the connection. |
| |
| If any data is waiting to be written, close will |
| return immediately and the connection will only be closed once all the data |
| was sent. */ |
| static void srv_close(struct Server* server, int sockfd); |
| /** Hijack a socket (file descriptor) from the server, clearing up it's |
| resources. The control of hte socket is totally relinquished. |
| |
| This method will block until all the data in the buffer is sent before |
| releasing control of the socket. */ |
| static int srv_hijack(struct Server* server, int sockfd); |
| /** Counts the number of connections for the specified protocol (NULL = all |
| protocols). */ |
| static long srv_count(struct Server* server, char* service); |
| /// "touches" a socket, reseting it's timeout counter. |
| static void srv_touch(struct Server* server, int sockfd); |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Read and Write |
| |
| /** |
| Sets up the read/write hooks, allowing for transport layer extensions (i.e. |
| SSL/TLS) or monitoring extensions. |
| */ |
| void rw_hooks( |
| server_pt srv, |
| int sockfd, |
| ssize_t (*writing_hook)(server_pt srv, int fd, void* data, size_t len), |
| ssize_t (*reading_hook)(server_pt srv, int fd, void* buffer, size_t size)); |
| |
| /** Reads up to `max_len` of data from a socket. the data is stored in the |
| `buffer` and the number of bytes received is returned. |
| |
| Returns -1 if an error was raised and the connection was closed. |
| |
| Returns the number of bytes written to the buffer. Returns 0 if no data was |
| available. |
| */ |
| static ssize_t srv_read(server_pt srv, int fd, void* buffer, size_t max_len); |
| /** Copies & writes data to the socket, managing an asyncronous buffer. |
| |
| returns 0 on success. success means that the data is in a buffer waiting to |
| be written. If the socket is forced to close at this point, the buffer will be |
| destroyed (never sent). |
| |
| On error, returns -1 otherwise returns the number of bytes in `len`. |
| */ |
| static ssize_t srv_write(struct Server* server, |
| int sockfd, |
| void* data, |
| size_t len); |
| /** Writes data to the socket, moving the data's pointer directly to the buffer. |
| |
| Once the data was written, `free` will be called to free the data's memory. |
| |
| returns 0 on success. success means that the data is in a buffer waiting to |
| be written. If the socket is forced to close at this point, the buffer will be |
| destroyed (never sent). |
| |
| On error, returns -1 otherwise returns the number of bytes in `len`. |
| */ |
| static ssize_t srv_write_move(struct Server* server, |
| int sockfd, |
| void* data, |
| size_t len); |
| /** Copies & writes data to the socket, managing an asyncronous buffer. |
| |
| Each call to a `write` function considers it's data atomic (a single package). |
| |
| The `urgent` varient will send the data as soon as possible, without distrupting |
| any data packages (data written using `write` will not be interrupted in the |
| middle). |
| |
| returns 0 on success. success means that the data is in a buffer waiting to |
| be written. If the socket is forced to close at this point, the buffer will be |
| destroyed (never sent). |
| |
| On error, returns -1 otherwise returns the number of bytes in `len`. |
| */ |
| static ssize_t srv_write_urgent(struct Server* server, |
| int sockfd, |
| void* data, |
| size_t len); |
| /** Writes data to the socket, moving the data's pointer directly to the buffer. |
| |
| Once the data was written, `free` will be called to free the data's memory. |
| |
| Each call to a `write` function considers it's data atomic (a single package). |
| |
| The `urgent` varient will send the data as soon as possible, without distrupting |
| any data packages (data written using `write` will not be interrupted in the |
| middle). |
| |
| returns 0 on success. success means that the data is in a buffer waiting to |
| be written. If the socket is forced to close at this point, the buffer will be |
| destroyed (never sent). |
| |
| On error, returns -1 otherwise returns the number of bytes in `len`. |
| */ |
| static ssize_t srv_write_move_urgent(struct Server* server, |
| int sockfd, |
| void* data, |
| size_t len); |
| /** |
| Sends a whole file as if it were a single atomic packet. |
| |
| Once the file was sent, the `FILE *` will be closed using `fclose`. |
| |
| The file will be buffered to the socket chunk by chunk, so that memory |
| consumption is capped at ~ 64Kb. |
| */ |
| static ssize_t srv_sendfile(struct Server* server, int sockfd, FILE* file); |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Tasks + Async |
| |
| /** Schedules a specific task to run asyncronously for each connection. |
| a NULL service identifier == all connections (all protocols). */ |
| static int each(struct Server* server, |
| char* service, |
| void (*task)(struct Server* server, int fd, void* arg), |
| void* arg, |
| void (*fallback)(struct Server* server, int fd, void* arg)); |
| /** Schedules a specific task to run for each connection. The tasks will be |
| * performed sequencially, in a blocking manner. The method will only return |
| * once all the tasks were completed. A NULL service identifier == all |
| * connections (all protocols). |
| */ |
| static int each_block(struct Server* server, |
| char* service, |
| void (*task)(struct Server* server, int fd, void* arg), |
| void* arg); |
| /** Schedules a specific task to run asyncronously for a specific connection. |
| |
| returns -1 on failure, 0 on success (success being scheduling or performing |
| the task). |
| */ |
| static int fd_task(struct Server* server, |
| int sockfd, |
| void (*task)(struct Server* server, int fd, void* arg), |
| void* arg, |
| void (*fallback)(struct Server* server, int fd, void* arg)); |
| /** Runs an asynchronous task, IF threading is enabled (set the |
| `threads` to 1 (the default) or greater). |
| |
| If threading is disabled, the current thread will perform the task and return. |
| |
| Returns -1 on error or 0 |
| on succeess. |
| */ |
| static int run_async(struct Server* self, void task(void*), void* arg); |
| /** Creates a system timer (at the cost of 1 file descriptor) and pushes the |
| timer to the reactor. The task will NOT repeat. Returns -1 on error or the |
| new file descriptor on succeess. |
| |
| NOTICE: Do NOT create timers from within the on_close callback, as this might |
| block resources from being properly freed (if the timer and the on_close |
| object share the same fd number). |
| */ |
| static int run_after(struct Server* self, |
| long milliseconds, |
| void task(void*), |
| void* arg); |
| /** Creates a system timer (at the cost of 1 file descriptor) and pushes the |
| timer to the reactor. The task will repeat `repetitions` times. if |
| `repetitions` is set to 0, task will repeat forever. Returns -1 on error |
| or the new file descriptor on succeess. |
| |
| NOTICE: Do NOT create timers from within the on_close callback, as this might |
| block resources from being properly freed (if the timer and the on_close |
| object share the same fd number). |
| */ |
| static int run_every(struct Server* self, |
| long milliseconds, |
| int repetitions, |
| void task(void*), |
| void* arg); |
| |
| // helpers |
| |
| // the data-type for async messages |
| struct ConnTask { |
| struct Server* server; |
| int fd; |
| void (*task)(struct Server* server, int fd, void* arg); |
| void* arg; |
| void (*fallback)(struct Server* server, int fd, void* arg); |
| }; |
| // the async handler |
| static void perform_each_task(struct ConnTask* task); |
| void make_a_task_async(struct Server* server, int fd, void* arg); |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // The actual Server API access setup |
| // The following allows access to helper functions and defines a namespace |
| // for the API in this library. |
| const struct Server__API____ Server = { |
| .reactor = srv_reactor, // |
| .settings = srv_settings, // |
| .capacity = srv_capacity, // |
| .listen = srv_listen, // |
| .stop = srv_stop, // |
| .stop_all = srv_stop_all, // |
| .is_busy = is_busy, // |
| .get_protocol = get_protocol, // |
| .set_protocol = set_protocol, // |
| .get_udata = get_udata, // |
| .set_udata = set_udata, // |
| .set_timeout = set_timeout, // |
| .attach = srv_attach, // |
| .close = srv_close, // |
| .hijack = srv_hijack, // |
| .count = srv_count, // |
| .touch = srv_touch, // |
| .rw_hooks = rw_hooks, // |
| .read = srv_read, // |
| .write = srv_write, // |
| .write_move = srv_write_move, // |
| .write_urgent = srv_write_urgent, // |
| .write_move_urgent = srv_write_move_urgent, // |
| .sendfile = srv_sendfile, // |
| .each = each, // |
| .each_block = each_block, // |
| .fd_task = fd_task, // |
| .run_async = run_async, // |
| .run_after = run_after, // |
| .run_every = run_every, // |
| .root_pid = root_pid, // |
| }; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // timer protocol |
| |
| // service name |
| static char* timer_protocol_name = "timer"; |
| // a timer protocol, will be allocated for each timer. |
| struct TimerProtocol { |
| // must be first for pointer compatability |
| struct Protocol parent; |
| void (*task)(void*); |
| void* arg; |
| // how many repeats? |
| int repeat; |
| }; |
| |
| // the timer's on_data callback |
| static void tp_perform_on_data(struct Server* self, int fd) { |
| struct TimerProtocol* timer = |
| (struct TimerProtocol*)Server.get_protocol(self, fd); |
| if (timer) { |
| // set local data copy, to avoid race contitions related to `free`. |
| void (*task)(void*) = (void (*)(void*))timer->task; |
| void* arg = timer->arg; |
| // perform the task |
| if (task) |
| task(arg); |
| // reset the timer |
| reactor_reset_timer(fd); |
| // close the file descriptor |
| if (timer->repeat < 0) |
| return; |
| if (timer->repeat == 0) { |
| reactor_close((struct Reactor*)self, fd); |
| return; |
| } |
| timer->repeat--; |
| } |
| } |
| |
| // the timer's on_close (cleanup) |
| static void tp_perform_on_close(struct Server* self, int fd) { |
| // free the protocol object when it was "aborted" using `close`. |
| struct TimerProtocol* timer = |
| (struct TimerProtocol*)Server.get_protocol(self, fd); |
| if (timer) |
| free(timer); |
| } |
| |
| // creates a new TimeProtocol object. |
| // use: TimerProtocol(task, arg, rep) |
| static struct TimerProtocol* TimerProtocol(void* task, |
| void* arg, |
| int repetitions) { |
| struct TimerProtocol* tp = malloc(sizeof(struct TimerProtocol)); |
| |
| *tp = (struct TimerProtocol){.parent.on_data = tp_perform_on_data, |
| .parent.on_close = tp_perform_on_close, |
| .parent.service = timer_protocol_name, |
| .task = task, |
| .arg = arg, |
| .repeat = repetitions - 1}; |
| return tp; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Server settings and objects |
| |
| /** returns the originating process pid */ |
| static pid_t root_pid(struct Server* server) { |
| return server->root_pid; |
| } |
| /// allows direct access to the reactor object. use with care. |
| static struct Reactor* srv_reactor(struct Server* server) { |
| return (struct Reactor*)server; |
| } |
| /// allows direct access to the server's original settings. use with care. |
| static struct ServerSettings* srv_settings(struct Server* server) { |
| return server->settings; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Socket settings and data |
| |
| /** Returns true if a specific connection's protected callback is running. |
| |
| Protected callbacks include only the `on_message` callback and tasks forwarded |
| to the connection using the `td_task` or `each` functions. |
| */ |
| static unsigned char is_busy(struct Server* server, int sockfd) { |
| return server->busy[sockfd]; |
| } |
| /// retrives the active protocol object for the requested file descriptor. |
| static struct Protocol* get_protocol(struct Server* server, int sockfd) { |
| return server->protocol_map[sockfd]; |
| } |
| /// sets the active protocol object for the requested file descriptor. |
| static int set_protocol(struct Server* server, |
| int sockfd, |
| struct Protocol* new_protocol) { |
| // before bothering with the mutex, make sure we have a valid connection. |
| if (!server->protocol_map[sockfd]) { |
| // fprintf(stderr, |
| // "ERROR: Cannot set a protocol for a disconnected socket.\n"); |
| return -1; |
| } |
| // on_close and set_protocol should never conflict. |
| // we should also prevent the same thread from deadlocking |
| // (i.e., when on_close tries to update the protocol) |
| if (pthread_mutex_trylock(&(server->lock))) |
| return -1; |
| // review the connection's validity again (in proteceted state) |
| if (!server->protocol_map[sockfd]) { |
| pthread_mutex_unlock(&(server->lock)); |
| // fprintf(stderr, |
| // "ERROR: Cannot set a protocol for a disconnected socket.\n"); |
| return -1; |
| } |
| // set the new protocol |
| server->protocol_map[sockfd] = new_protocol; |
| // release the lock |
| pthread_mutex_unlock(&(server->lock)); |
| // return 0 (no error) |
| return 0; |
| } |
| /** retrives an opaque pointer set by `set_udata` and associated with the |
| connection. |
| |
| since no new connections are expected on fd == 0..2, it's possible to store |
| global data in these locations. */ |
| static void* get_udata(struct Server* server, int sockfd) { |
| return server->udata_map[sockfd]; |
| } |
| /** Sets the opaque pointer to be associated with the connection. returns the |
| old pointer, if any. */ |
| static void* set_udata(struct Server* server, int sockfd, void* udata) { |
| void* old = server->udata_map[sockfd]; |
| // pthread_mutex_lock(&(server->lock)); |
| server->udata_map[sockfd] = udata; |
| // pthread_mutex_unlock(&(server->lock)); |
| return old; |
| } |
| /** Sets the timeout limit for the specified connectionl, in seconds, up to |
| 255 seconds (the maximum allowed timeout count). */ |
| static void set_timeout(server_pt server, int fd, unsigned char timeout) { |
| server->tout[fd] = timeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Server actions & Core |
| |
| // helper macros |
| #define _reactor_(server) ((struct Reactor*)(server)) |
| #define _server_(reactor) ((server_pt)(reactor)) |
| #define _protocol_(reactor, fd) (_server_(reactor)->protocol_map[(fd)]) |
| |
| // clears a connection's data |
| static void clear_conn_data(server_pt server, int fd) { |
| server->protocol_map[fd] = 0; |
| server->busy[fd] = 0; |
| server->tout[fd] = 0; |
| server->idle[fd] = 0; |
| server->udata_map[fd] = NULL; |
| server->reading_hooks[fd] = NULL; |
| Buffer.clear(server->buffer_map[fd]); |
| } |
| // on_ready, handles async buffer sends |
| static void on_ready(struct Reactor* reactor, int fd) { |
| if (Buffer.flush(_server_(reactor)->buffer_map[fd], fd) > 0) |
| _server_(reactor)->idle[fd] = 0; |
| if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_ready) |
| _protocol_(reactor, fd)->on_ready(_server_(reactor), fd); |
| } |
| |
| /// called for any open file descriptors when the reactor is shutting down. |
| static void on_shutdown(struct Reactor* reactor, int fd) { |
| // call the callback for the mentioned active(?) connection. |
| if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_shutdown) |
| _protocol_(reactor, fd)->on_shutdown(_server_(reactor), fd); |
| } |
| |
| // called when a file descriptor was closed (either locally or by the other |
| // party, when it's a socket or a pipe). |
| static void on_close(struct Reactor* reactor, int fd) { |
| if (!_protocol_(reactor, fd)) |
| return; |
| // file descriptors can be added from external threads (i.e. creating timers), |
| // this could cause race conditions and data corruption, |
| // (i.e., when on_close is releasing memory). |
| int lck = 0; |
| // this should preven the same thread from calling on_close recursively |
| // (i.e., when using the Server.attach) |
| if ((lck = pthread_mutex_trylock(&(_server_(reactor)->lock)))) { |
| if (lck != EAGAIN) |
| return; |
| pthread_mutex_lock(&(_server_(reactor)->lock)); |
| } |
| if (_protocol_(reactor, fd)) { |
| if (_protocol_(reactor, fd)->on_close) |
| _protocol_(reactor, fd)->on_close(_server_(reactor), fd); |
| clear_conn_data(_server_(reactor), fd); |
| } |
| pthread_mutex_unlock(&(_server_(reactor)->lock)); |
| } |
| |
| // The busy flag is used to make sure that a single connection doesn't perform |
| // two "critical" tasks at the same time. Critical tasks are defined as the |
| // `on_message` callback and any user task requested by `each` or `fd_task`. |
| static int set_to_busy(struct Server* server, int sockfd) { |
| static pthread_mutex_t locker = PTHREAD_MUTEX_INITIALIZER; |
| |
| if (!server->protocol_map[sockfd]) |
| return 0; |
| pthread_mutex_lock(&locker); |
| if (server->busy[sockfd] == 1) { |
| pthread_mutex_unlock(&locker); |
| return 0; |
| } |
| server->busy[sockfd] = 1; |
| pthread_mutex_unlock(&locker); |
| return 1; |
| } |
| |
| // accepts new connections |
| static void accept_async(server_pt server) { |
| static socklen_t cl_addrlen = 0; |
| int client = 1; |
| while (1) { |
| #ifdef SOCK_NONBLOCK |
| client = accept4(server->srvfd, NULL, &cl_addrlen, SOCK_NONBLOCK); |
| if (client <= 0) |
| return; |
| #else |
| client = accept(server->srvfd, NULL, &cl_addrlen); |
| if (client <= 0) |
| return; |
| set_non_blocking_socket(client); |
| #endif |
| // handle server overload |
| if (client >= _reactor_(server)->maxfd) { |
| if (server->settings->busy_msg) |
| if (write(client, server->settings->busy_msg, |
| strlen(server->settings->busy_msg)) < 0) |
| ; |
| close(client); |
| continue; |
| } |
| // attach the new client (performs on_close if needed) |
| srv_attach(server, client, server->settings->protocol); |
| } |
| } |
| // makes sure that the on_data callback isn't overlapping a previous on_data |
| static void async_on_data(server_pt* p_server) { |
| // compute sockfd by comparing the distance between the original pointer and |
| // the passed pointer's address. |
| if (!(*p_server)) |
| return; |
| int sockfd = p_server - (*p_server)->server_map; |
| // if we get the handle, perform the task |
| if (set_to_busy(*p_server, sockfd)) { |
| struct Protocol* protocol = (*p_server)->protocol_map[sockfd]; |
| if (!protocol || !protocol->on_data) |
| return; |
| (*p_server)->idle[sockfd] = 0; |
| protocol->on_data((*p_server), sockfd); |
| // release the handle |
| (*p_server)->busy[sockfd] = 0; |
| return; |
| } |
| // we didn't get the handle, reschedule - but only if the connection is still |
| // open. |
| if ((*p_server)->protocol_map[sockfd]) |
| Async.run((*p_server)->async, (void (*)(void*))async_on_data, p_server); |
| } |
| |
| static void on_data(struct Reactor* reactor, int fd) { |
| // static socklen_t cl_addrlen = 0; |
| if (fd == _server_(reactor)->srvfd) { |
| // listening socket. accept connections. |
| Async.run(_server_(reactor)->async, (void (*)(void*))accept_async, reactor); |
| } else if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_data) { |
| _server_(reactor)->idle[fd] = 0; |
| // clients, forward on. |
| Async.run(_server_(reactor)->async, (void (*)(void*))async_on_data, |
| &(_server_(reactor)->server_map[fd])); |
| } |
| } |
| |
| // calls the reactor's core and checks for timeouts. |
| // schedules it's own execution when done. |
| static void srv_cycle_core(server_pt server) { |
| int delta; // we also use this for other things, but it's the TOut delta |
| // review reactor events |
| delta = reactor_review(_reactor_(server)); |
| if (delta < 0) { |
| srv_stop(server); |
| return; |
| } else if (delta == 0 && server->settings->on_idle) { |
| server->settings->on_idle(server); |
| } |
| // timeout + local close management |
| if (server->last_to != _reactor_(server)->last_tick) { |
| // We use the delta with fuzzy logic (only after the first second) |
| int delta = _reactor_(server)->last_tick - server->last_to; |
| for (long i = 3; i <= _reactor_(server)->maxfd; i++) { |
| if (server->protocol_map[i] && fcntl(i, F_GETFL) < 0 && errno == EBADF) { |
| reactor_close(_reactor_(server), i); |
| } |
| if (server->tout[i]) { |
| if (server->tout[i] > server->idle[i]) |
| server->idle[i] += server->idle[i] ? delta : 1; |
| else { |
| if (server->protocol_map[i] && server->protocol_map[i]->ping) |
| server->protocol_map[i]->ping(server, i); |
| else if (!server->busy[i] || server->idle[i] == 255) |
| reactor_close(_reactor_(server), i); |
| } |
| } |
| } |
| // ready for next call. |
| server->last_to = _reactor_(server)->last_tick; |
| // double en = (float)clock()/CLOCKS_PER_SEC; |
| // printf("timeout review took %f milliseconds\n", (en-st)*1000); |
| } |
| if (server->run && |
| Async.run(server->async, (void (*)(void*))srv_cycle_core, server)) { |
| perror( |
| "FATAL ERROR:" |
| "couldn't schedule the server's reactor in the task queue"); |
| exit(1); |
| } |
| } |
| |
| /** listens to a server with the following server settings (which MUST include |
| a default protocol). */ |
| static int srv_listen(struct ServerSettings settings) { |
| // review the settings, setup defaults when something's missing |
| if (!settings.protocol) { |
| fprintf(stderr, |
| "Server err: (protocol == NULL) Running a server requires " |
| "a protocol for new connections.\n"); |
| return -1; |
| } |
| if (!settings.port) |
| settings.port = "3000"; |
| if (!settings.timeout) |
| settings.timeout = 5; |
| if (!settings.threads || settings.threads <= 0) |
| settings.threads = 1; |
| if (!settings.processes || settings.processes <= 0) |
| settings.processes = 1; |
| |
| // We can use the stack for the server's core memory. |
| long capacity = srv_capacity(); |
| void* udata_map[capacity]; |
| struct Protocol* volatile protocol_map[capacity]; |
| struct Server* server_map[capacity]; |
| void* buffer_map[capacity]; |
| volatile char busy[capacity]; |
| unsigned char tout[capacity]; |
| unsigned char idle[capacity]; |
| ssize_t (*reading_hooks[capacity])(server_pt srv, int fd, void* buffer, |
| size_t size); |
| |
| // populate the Server structure with the data |
| struct Server srv = { |
| // initialize the server object |
| .settings = &settings, // store a pointer to the settings |
| .last_to = 0, // last timeout review |
| .capacity = capacity, // the server's capacity |
| .protocol_map = protocol_map, |
| .udata_map = udata_map, |
| .server_map = server_map, |
| .buffer_map = buffer_map, |
| .reading_hooks = reading_hooks, |
| .busy = busy, |
| .tout = tout, |
| .idle = idle, |
| .reactor.maxfd = capacity - 1, |
| .reactor.on_data = on_data, |
| .reactor.on_ready = on_ready, |
| .reactor.on_shutdown = on_shutdown, |
| .reactor.on_close = on_close, |
| }; |
| // initialize the server data mutex |
| if (pthread_mutex_init(&srv.lock, NULL)) { |
| return -1; |
| } |
| // bind the server's socket - if relevent |
| int srvfd = 0; |
| if (settings.port > 0) { |
| srvfd = bind_server_socket(&srv); |
| // if we didn't get a socket, quit now. |
| if (srvfd < 0) |
| return -1; |
| srv.srvfd = srvfd; |
| } |
| |
| // zero out data... |
| for (int i = 0; i < capacity; i++) { |
| protocol_map[i] = 0; |
| server_map[i] = &srv; |
| busy[i] = 0; |
| tout[i] = 0; |
| idle[i] = 0; |
| udata_map[i] = 0; |
| // buffer_map[i] = 0; |
| buffer_map[i] = Buffer.new(&srv); |
| } |
| |
| // register signals - do this before concurrency, so that they are inherited. |
| struct sigaction old_term, old_int, old_pipe, new_int, new_pipe; |
| sigemptyset(&new_int.sa_mask); |
| sigemptyset(&new_pipe.sa_mask); |
| new_pipe.sa_flags = new_int.sa_flags = 0; |
| new_pipe.sa_handler = SIG_IGN; |
| new_int.sa_handler = on_signal; |
| sigaction(SIGINT, &new_int, &old_int); |
| sigaction(SIGTERM, &new_int, &old_term); |
| sigaction(SIGPIPE, &new_pipe, &old_pipe); |
| |
| // setup concurrency |
| srv.root_pid = getpid(); |
| pid_t pids[settings.processes > 0 ? settings.processes : 0]; |
| if (settings.processes > 1) { |
| pids[0] = 0; |
| for (int i = 1; i < settings.processes; i++) { |
| if (getpid() == srv.root_pid) |
| pids[i] = fork(); |
| } |
| } |
| // once we forked, we can initiate a thread pool for each process |
| srv.async = Async.new(settings.threads); |
| if (srv.async <= 0) { |
| if (srvfd) |
| close(srvfd); |
| return -1; |
| } |
| |
| // register server for signals |
| register_server(&srv); |
| |
| // let'm know... |
| if (srvfd) |
| printf( |
| "(pid %d x %d threads) Listening on port %s (max sockets: %lu, ~%d " |
| "used)\n", |
| getpid(), srv.settings->threads, srv.settings->port, srv.capacity, |
| srv.srvfd + 2); |
| else |
| printf( |
| "(pid %d x %d threads) Started a non-listening network service" |
| "(max sockets: %lu ~ at least 6 are in system use)\n", |
| getpid(), srv.settings->threads, srv.capacity); |
| |
| // initialize reactor |
| reactor_init(&srv.reactor); |
| // bind server data to reactor loop |
| if (srvfd) |
| reactor_add(&srv.reactor, srv.srvfd); |
| |
| // call the on_init callback |
| if (settings.on_init) { |
| settings.on_init(&srv); |
| } |
| |
| // initiate the core's cycle |
| srv.run = 1; |
| Async.run(srv.async, (void (*)(void*))srv_cycle_core, &srv); |
| Async.wait(srv.async); |
| fprintf(stderr, "server done\n"); |
| // cleanup |
| reactor_stop(&srv.reactor); |
| |
| if (settings.processes > 1 && getpid() == srv.root_pid) { |
| int sts; |
| for (int i = 1; i < settings.processes; i++) { |
| // printf("sending signal to pid %d\n", pids[i]); |
| kill(pids[i], SIGINT); |
| } |
| for (int i = 1; i < settings.processes; i++) { |
| sts = 0; |
| // printf("waiting for pid %d\n", pids[i]); |
| if (waitpid(pids[i], &sts, 0) != pids[i]) |
| perror("waiting for child process had failed"); |
| } |
| } |
| |
| if (settings.on_finish) |
| settings.on_finish(&srv); |
| printf("\n(%d) Stopped listening for port %s\n", getpid(), settings.port); |
| if (getpid() != srv.root_pid) { |
| fflush(NULL); |
| exit(0); |
| } |
| // restore signal state |
| sigaction(SIGINT, &old_int, NULL); |
| sigaction(SIGTERM, &old_term, NULL); |
| sigaction(SIGPIPE, &old_pipe, NULL); |
| |
| // destroy the buffers. |
| for (int i = 0; i < srv_capacity(); i++) { |
| Buffer.destroy(buffer_map[i]); |
| } |
| |
| // destroy the mutex |
| pthread_mutex_destroy(&srv.lock); |
| |
| return 0; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Socket actions |
| |
| /** Attaches an existing connection (fd) to the server's reactor and protocol |
| management system, so that the server can be used also to manage connection |
| based resources asynchronously (i.e. database resources etc'). |
| */ |
| static int srv_attach(server_pt server, int sockfd, struct Protocol* protocol) { |
| if (server->protocol_map[sockfd]) { |
| on_close((struct Reactor*)server, sockfd); |
| } |
| if (sockfd >= server->capacity) |
| return -1; |
| // setup protocol |
| server->protocol_map[sockfd] = protocol; // set_protocol() would cost more |
| // setup timeouts |
| server->tout[sockfd] = server->settings->timeout; |
| server->idle[sockfd] = 0; |
| // call on_open |
| // register the client - start it off as busy, protocol still initializing |
| // we don't need the mutex, because it's all fresh |
| server->busy[sockfd] = 1; |
| // attach the socket to the reactor |
| if (reactor_add((struct Reactor*)server, sockfd) < 0) { |
| clear_conn_data(server, sockfd); |
| return -1; |
| } |
| if (protocol->on_open) |
| protocol->on_open(server, sockfd); |
| server->busy[sockfd] = 0; |
| return 0; |
| } |
| /** Closes the connection. |
| |
| If any data is waiting to be written, close will |
| return immediately and the connection will only be closed once all the data |
| was sent. */ |
| static void srv_close(struct Server* server, int sockfd) { |
| if (!server->protocol_map[sockfd]) |
| return; |
| if (Buffer.is_empty(server->buffer_map[sockfd])) { |
| reactor_close((struct Reactor*)server, sockfd); |
| } else |
| Buffer.close_when_done(server->buffer_map[sockfd], sockfd); |
| } |
| /** Hijack a socket (file descriptor) from the server, clearing up it's |
| resources. The control of the socket is totally relinquished. |
| |
| This method will block until all the data in the buffer is sent before |
| releasing control of the socket. */ |
| static int srv_hijack(struct Server* server, int sockfd) { |
| if (!server->protocol_map[sockfd]) |
| return -1; |
| reactor_remove((struct Reactor*)server, sockfd); |
| while (!Buffer.is_empty(server->buffer_map[sockfd]) && |
| Buffer.flush(server->buffer_map[sockfd], sockfd) >= 0) |
| ; |
| clear_conn_data(server, sockfd); |
| return 0; |
| } |
| /** Counts the number of connections for the specified protocol (NULL = all |
| protocols). */ |
| static long srv_count(struct Server* server, char* service) { |
| int c = 0; |
| if (service) { |
| for (int i = 0; i < server->capacity; i++) { |
| if (server->protocol_map[i] && |
| (server->protocol_map[i]->service == service || |
| !strcmp(server->protocol_map[i]->service, service))) |
| c++; |
| } |
| } else { |
| for (int i = 0; i < server->capacity; i++) { |
| if (server->protocol_map[i] && |
| server->protocol_map[i]->service != timer_protocol_name) |
| c++; |
| } |
| } |
| return c; |
| } |
| /// "touches" a socket, reseting it's timeout counter. |
| static void srv_touch(struct Server* server, int sockfd) { |
| server->idle[sockfd] = 0; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Read and Write |
| |
| /** |
| Sets up the read/write hooks, allowing for transport layer extensions (i.e. |
| SSL/TLS) or monitoring extensions. |
| */ |
| void rw_hooks( |
| server_pt srv, |
| int sockfd, |
| ssize_t (*writing_hook)(server_pt srv, int fd, void* data, size_t len), |
| ssize_t (*reading_hook)(server_pt srv, int fd, void* buffer, size_t size)) { |
| srv->reading_hooks[sockfd] = reading_hook; |
| Buffer.set_whook(srv->buffer_map[sockfd], writing_hook); |
| } |
| |
| /** Reads up to `max_len` of data from a socket. the data is stored in the |
| `buffer` and the number of bytes received is returned. |
| |
| Returns -1 if an error was raised and the connection was closed. |
| |
| Returns the number of bytes written to the buffer. Returns 0 if no data was |
| available. |
| */ |
| static ssize_t srv_read(server_pt srv, int fd, void* buffer, size_t max_len) { |
| if (srv->reading_hooks[fd]) // check for reading hook |
| return srv->reading_hooks[fd](srv, fd, buffer, max_len); |
| |
| ssize_t read = 0; |
| if ((read = recv(fd, buffer, max_len, 0)) > 0) { |
| // return data |
| return read; |
| } else { |
| if (read && (errno & (EWOULDBLOCK | EAGAIN))) |
| return 0; |
| } |
| return -1; |
| } |
| /** Copies & writes data to the socket, managing an asyncronous buffer. |
| |
| On error, returns -1 otherwise returns the number of bytes already sent. |
| */ |
| static ssize_t srv_write(struct Server* server, |
| int sockfd, |
| void* data, |
| size_t len) { |
| // make sure the socket is alive |
| if (!server->protocol_map[sockfd]) |
| return -1; |
| // reset timeout |
| server->idle[sockfd] = 0; |
| // send data |
| Buffer.write(server->buffer_map[sockfd], data, len); |
| if (Buffer.flush(server->buffer_map[sockfd], sockfd) < 0) |
| return -1; |
| return 0; |
| } |
| /** Writes data to the socket, moving the data's pointer directly to the buffer. |
| |
| Once the data was written, `free` will be called to free the data's memory. |
| |
| On error, returns -1 otherwise returns the number of bytes already sent. |
| */ |
| static ssize_t srv_write_move(struct Server* server, |
| int sockfd, |
| void* data, |
| size_t len) { |
| // make sure the socket is alive |
| if (!server->protocol_map[sockfd]) |
| return -1; |
| // reset timeout |
| server->idle[sockfd] = 0; |
| // send data |
| Buffer.write_move(server->buffer_map[sockfd], data, len); |
| if (Buffer.flush(server->buffer_map[sockfd], sockfd) < 0) |
| return -1; |
| return 0; |
| } |
| /** Copies & writes data to the socket, managing an asyncronous buffer. |
| |
| Each call to a `write` function considers it's data atomic (a single package). |
| |
| The `urgent` varient will send the data as soon as possible, without distrupting |
| any data packages (data written using `write` will not be interrupted in the |
| middle). |
| |
| On error, returns -1 otherwise returns the number of bytes already sent. |
| */ |
| static ssize_t srv_write_urgent(struct Server* server, |
| int sockfd, |
| void* data, |
| size_t len) { |
| // make sure the socket is alive |
| if (!server->protocol_map[sockfd]) |
| return -1; |
| // reset timeout |
| server->idle[sockfd] = 0; |
| // send data |
| Buffer.write_next(server->buffer_map[sockfd], data, len); |
| if (Buffer.flush(server->buffer_map[sockfd], sockfd) < 0) |
| return -1; |
| return 0; |
| } |
| /** Writes data to the socket, moving the data's pointer directly to the buffer. |
| |
| Once the data was written, `free` will be called to free the data's memory. |
| |
| Each call to a `write` function considers it's data atomic (a single package). |
| |
| The `urgent` varient will send the data as soon as possible, without distrupting |
| any data packages (data written using `write` will not be interrupted in the |
| middle). |
| |
| On error, returns -1 otherwise returns the number of bytes already sent. |
| */ |
| static ssize_t srv_write_move_urgent(struct Server* server, |
| int sockfd, |
| void* data, |
| size_t len) { |
| // make sure the socket is alive |
| if (!server->protocol_map[sockfd]) |
| return -1; |
| // reset timeout |
| server->idle[sockfd] = 0; |
| // send data |
| Buffer.write_move_next(server->buffer_map[sockfd], data, len); |
| if (Buffer.flush(server->buffer_map[sockfd], sockfd) < 0) |
| return -1; |
| return 0; |
| } |
| /** |
| Sends a whole file as if it were a single atomic packet. |
| |
| Once the file was sent, the `FILE *` will be closed using `fclose`. |
| |
| The file will be buffered to the socket chunk by chunk, so that memory |
| consumption is capped at ~ 64Kb. |
| */ |
| static ssize_t srv_sendfile(struct Server* server, int sockfd, FILE* file) { |
| // make sure the socket is alive |
| if (!server->protocol_map[sockfd]) |
| return -1; |
| // reset timeout |
| server->idle[sockfd] = 0; |
| // send data |
| Buffer.sendfile(server->buffer_map[sockfd], file); |
| if (Buffer.flush(server->buffer_map[sockfd], sockfd) < 0) |
| return -1; |
| return 0; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Tasks + Async |
| |
| // the async handler |
| static void perform_each_task(struct ConnTask* task) { |
| // is it okay to perform the task? |
| if (task->server->protocol_map[task->fd] && |
| set_to_busy(task->server, task->fd)) { |
| // perform the task |
| task->task(task->server, task->fd, task->arg); |
| // release the busy flag |
| task->server->busy[task->fd] = 0; |
| // free the memory |
| free(task); |
| return; |
| } |
| // reschedule - but only if the connection is still open |
| if (task->server->protocol_map[task->fd]) |
| Async.run(task->server->async, (void (*)(void*))perform_each_task, task); |
| else if (task->fallback) // check for fallback, call if requested |
| task->fallback(task->server, task->fd, task->arg); |
| return; |
| } |
| |
| // schedules a task for async performance |
| void make_a_task_async(struct Server* server, int fd, void* arg) { |
| if (server->async) { |
| struct ConnTask* task = malloc(sizeof(struct ConnTask)); |
| if (!task) |
| return; |
| memcpy(task, arg, sizeof(struct ConnTask)); |
| task->fd = fd; |
| Async.run(server->async, (void (*)(void*))perform_each_task, task); |
| } else { |
| struct ConnTask* task = arg; |
| task->task(server, fd, task->arg); |
| } |
| } |
| |
| /** Schedules a specific task to run asyncronously for each connection. |
| a NULL service identifier == all connections (all protocols). */ |
| static int each(struct Server* server, |
| char* service, |
| void (*task)(struct Server* server, int fd, void* arg), |
| void* arg, |
| void (*fallback)(struct Server* server, int fd, void* arg)) { |
| struct ConnTask msg = { |
| .server = server, .task = task, .arg = arg, .fallback = fallback, |
| }; |
| return each_block(server, service, make_a_task_async, &msg); |
| } |
| /** Schedules a specific task to run for each connection. The tasks will be |
| * performed sequencially, in a blocking manner. The method will only return |
| * once all the tasks were completed. A NULL service identifier == all |
| * connections (all protocols). |
| */ |
| static int each_block(struct Server* server, |
| char* service, |
| void (*task)(struct Server* server, int fd, void* arg), |
| void* arg) { |
| int c = 0; |
| if (service) { |
| for (int i = 0; i < server->capacity; i++) { |
| if (server->protocol_map[i] && server->protocol_map[i]->service && |
| (server->protocol_map[i]->service == service || |
| !strcmp(server->protocol_map[i]->service, service))) { |
| task(server, i, arg); |
| c++; |
| } |
| } |
| } else { |
| for (int i = 0; i < server->capacity; i++) { |
| if (server->protocol_map[i] && |
| server->protocol_map[i]->service != timer_protocol_name) { |
| task(server, i, arg); |
| c++; |
| } |
| } |
| } |
| return c; |
| } |
| /** Schedules a specific task to run asyncronously for a specific connection. |
| |
| returns -1 on failure, 0 on success (success being scheduling or performing |
| the task). |
| */ |
| static int fd_task(struct Server* server, |
| int sockfd, |
| void (*task)(struct Server* server, int fd, void* arg), |
| void* arg, |
| void (*fallback)(struct Server* server, int fd, void* arg)) { |
| if (server->protocol_map[sockfd]) { |
| struct ConnTask msg = { |
| .server = server, .task = task, .arg = arg, .fallback = fallback}; |
| make_a_task_async(server, sockfd, &msg); |
| return 0; |
| } |
| return -1; |
| } |
| /** Runs an asynchronous task, IF threading is enabled (set the |
| `threads` to 1 (the default) or greater). |
| |
| If threading is disabled, the current thread will perform the task and return. |
| |
| Returns -1 on error or 0 |
| on succeess. |
| */ |
| static int run_async(struct Server* self, void task(void*), void* arg) { |
| return Async.run(self->async, task, arg); |
| } |
| /** Creates a system timer (at the cost of 1 file descriptor) and pushes the |
| timer to the reactor. The task will NOT repeat. Returns -1 on error or the |
| new file descriptor on succeess. |
| |
| NOTICE: Do NOT create timers from within the on_close callback, as this might |
| block resources from being properly freed (if the timer and the on_close |
| object share the same fd number). |
| */ |
| static int run_after(struct Server* self, |
| long milliseconds, |
| void task(void*), |
| void* arg) { |
| return run_every(self, milliseconds, 1, task, arg); |
| } |
| /** Creates a system timer (at the cost of 1 file descriptor) and pushes the |
| timer to the reactor. The task will repeat `repetitions` times. if |
| `repetitions` is set to 0, task will repeat forever. Returns -1 on error |
| or the new file descriptor on succeess. |
| |
| NOTICE: Do NOT create timers from within the on_close callback, as this might |
| block resources from being properly freed (if the timer and the on_close |
| object share the same fd number). |
| */ |
| static int run_every(struct Server* self, |
| long milliseconds, |
| int repetitions, |
| void task(void*), |
| void* arg) { |
| int tfd = reactor_make_timer(); |
| if (tfd <= 0) |
| return -1; |
| struct Protocol* timer_protocol = |
| (struct Protocol*)TimerProtocol(task, arg, repetitions); |
| if (srv_attach(self, tfd, timer_protocol)) { |
| free(timer_protocol); |
| return -1; |
| } |
| // remove the default timeout (timers shouldn't timeout) |
| self->tout[tfd] = 0; |
| |
| // srv_attach connected the fd as a regular socket - remove it and reconnect |
| // as a timer |
| reactor_remove((struct Reactor*)self, tfd); |
| if (reactor_add_timer((struct Reactor*)self, tfd, milliseconds) < 0) { |
| perror("Closing timer"); |
| printf("Timer %d closing for server with sock %d, epoll %d, map data: %d\n", |
| tfd, self->srvfd, self->reactor.private.reactor_fd, |
| self->reactor.private.map[tfd]); |
| // close(tfd); |
| // on_close might be called by the server to free the resources - we |
| // shouldn't race... but we are making some changes... |
| reactor_close((struct Reactor*)self, tfd); |
| return -1; |
| }; |
| return 0; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Server Registry |
| // (stopping and signal managment) |
| |
| // types and global data |
| struct ServerSet { |
| struct ServerSet* next; |
| struct Server* server; |
| }; |
| static struct ServerSet* global_servers_set = NULL; |
| static pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; |
| |
| // register a server |
| static void register_server(server_pt srv) { |
| pthread_mutex_lock(&global_lock); |
| struct ServerSet* n_reg = malloc(sizeof(struct ServerSet)); |
| n_reg->server = srv; |
| n_reg->next = global_servers_set; |
| global_servers_set = n_reg; |
| pthread_mutex_unlock(&global_lock); |
| } |
| // stop a server (+unregister) |
| static void srv_stop(server_pt srv) { |
| pthread_mutex_lock(&global_lock); |
| // remove from regisrty |
| struct ServerSet* set = global_servers_set; |
| struct ServerSet* tmp = global_servers_set; |
| if (global_servers_set && global_servers_set->server == srv) { |
| global_servers_set = global_servers_set->next; |
| free(tmp); |
| goto sig_srv; |
| } else |
| while (set) { |
| if (set->next && set->next->server == srv) { |
| tmp = set->next; |
| set->next = set->next->next; |
| free(tmp); |
| goto sig_srv; |
| } |
| set = set->next; |
| } |
| // the server wasn't in the registry - we shouldn't stop it again... |
| srv = NULL; |
| // send a signal to the server, if it was in the registry |
| sig_srv: |
| if (srv) { |
| // close the listening socket, preventing new connections from coming in. |
| reactor_add((struct Reactor*)srv, srv->srvfd); |
| // set the stop server flag. |
| srv->run = 0; |
| // signal the async object to finish |
| Async.signal(srv->async); |
| } |
| pthread_mutex_unlock(&global_lock); |
| } |
| // deregisters and stops all servers |
| static void srv_stop_all(void) { |
| while (global_servers_set) |
| srv_stop(global_servers_set->server); |
| } |
| |
| // handles signals |
| static void on_signal(int sig) { |
| if (!global_servers_set) { |
| signal(sig, SIG_DFL); |
| raise(sig); |
| return; |
| } |
| srv_stop_all(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // socket helpers |
| |
| // sets a socket to non blocking state |
| static inline int set_non_blocking_socket(int fd) // Thanks to Bjorn Reese |
| { |
| /* If they have O_NONBLOCK, use the Posix way to do it */ |
| #if defined(O_NONBLOCK) |
| /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */ |
| static int flags; |
| if (-1 == (flags = fcntl(fd, F_GETFL, 0))) |
| flags = 0; |
| // printf("flags initial value was %d\n", flags); |
| return fcntl(fd, F_SETFL, flags | O_NONBLOCK); |
| #else |
| /* Otherwise, use the old way of doing it */ |
| static int flags = 1; |
| return ioctl(fd, FIOBIO, &flags); |
| #endif |
| } |
| |
| // helper functions used in the context of this file |
| static int bind_server_socket(struct Server* self) { |
| int srvfd; |
| // setup the address |
| struct addrinfo hints; |
| struct addrinfo* servinfo; // will point to the results |
| memset(&hints, 0, sizeof hints); // make sure the struct is empty |
| hints.ai_family = AF_UNSPEC; // don't care IPv4 or IPv6 |
| hints.ai_socktype = SOCK_STREAM; // TCP stream sockets |
| hints.ai_flags = AI_PASSIVE; // fill in my IP for me |
| if (getaddrinfo(self->settings->address, self->settings->port, &hints, |
| &servinfo)) { |
| perror("addr err"); |
| return -1; |
| } |
| // get the file descriptor |
| srvfd = |
| socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); |
| if (srvfd <= 0) { |
| perror("socket err"); |
| freeaddrinfo(servinfo); |
| return -1; |
| } |
| // make sure the socket is non-blocking |
| if (set_non_blocking_socket(srvfd) < 0) { |
| perror("couldn't set socket as non blocking! "); |
| freeaddrinfo(servinfo); |
| close(srvfd); |
| return -1; |
| } |
| // avoid the "address taken" |
| { |
| int optval = 1; |
| setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); |
| } |
| // bind the address to the socket |
| { |
| int bound = 0; |
| for (struct addrinfo* p = servinfo; p != NULL; p = p->ai_next) { |
| if (!bind(srvfd, p->ai_addr, p->ai_addrlen)) |
| bound = 1; |
| } |
| |
| if (!bound) { |
| perror("bind err"); |
| freeaddrinfo(servinfo); |
| close(srvfd); |
| return -1; |
| } |
| } |
| freeaddrinfo(servinfo); |
| // listen in |
| if (listen(srvfd, SOMAXCONN) < 0) { |
| perror("couldn't start listening"); |
| close(srvfd); |
| return -1; |
| } |
| return srvfd; |
| } |
| |
| //////////////// |
| // file limit helper |
| static long srv_capacity(void) { |
| // get current limits |
| static long flim = 0; |
| if (flim) |
| return flim; |
| #ifdef _SC_OPEN_MAX |
| flim = sysconf(_SC_OPEN_MAX) - 1; |
| #elif defined(OPEN_MAX) |
| flim = OPEN_MAX - 1; |
| #endif |
| // try to maximize limits - collect max and set to max |
| struct rlimit rlim; |
| getrlimit(RLIMIT_NOFILE, &rlim); |
| // printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur, |
| // rlim.rlim_max); |
| rlim.rlim_cur = rlim.rlim_max; |
| setrlimit(RLIMIT_NOFILE, &rlim); |
| getrlimit(RLIMIT_NOFILE, &rlim); |
| // printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur, |
| // rlim.rlim_max); |
| // if the current limit is higher than it was, update |
| if (flim < rlim.rlim_cur) |
| flim = rlim.rlim_cur; |
| // review stack memory limits - don't use more than half...? |
| // use 1 byte (char) for busy_map; |
| // use 1 byte (char) for idle_map; |
| // use 1 byte (char) for tout_map; |
| // use 8 byte (void *) for protocol_map; |
| // use 8 byte (void *) for server_map; |
| // ------ |
| // total of 19 (assume 20) bytes per connection. |
| // |
| // assume a 2Kb stack allocation (per connection) for |
| // network IO buffer and request management...? |
| // (this would be almost wrong to assume, as buffer might be larger) |
| // ------- |
| // 2068 Byte |
| // round up to page size? |
| // 4096 |
| // |
| getrlimit(RLIMIT_STACK, &rlim); |
| if (flim * 4096 > rlim.rlim_cur && flim * 4096 < rlim.rlim_max) { |
| rlim.rlim_cur = flim * 4096; |
| setrlimit(RLIMIT_STACK, &rlim); |
| getrlimit(RLIMIT_STACK, &rlim); |
| } |
| if (flim > rlim.rlim_cur / 4096) { |
| // printf("The current maximum file limit (%d) if reduced due to stack |
| // memory " |
| // "limits(%d)\n", |
| // flim, (int)(rlim.rlim_cur / 2068)); |
| flim = rlim.rlim_cur / 4096; |
| } else { |
| // printf( |
| // "The current maximum file limit (%d) is supported by the stack |
| // memory |
| // " |
| // "limits(%d)\n", |
| // flim, (int)(rlim.rlim_cur / 2068)); |
| } |
| |
| // how many Kb per connection? assume 8Kb for kernel? x2 (16Kb). |
| // (http://www.metabrew.com/article/a-million-user-comet-application-with-mochiweb-part-3) |
| // 10,000 connections == 16*1024*10000 == +- 160Mb? seems a tight fit... |
| // i.e. the Http request buffer is 8Kb... maybe 24Kb is a better minimum? |
| // Some per connection heap allocated data (i.e. 88 bytes per user-land |
| // buffer) also matters. |
| getrlimit(RLIMIT_DATA, &rlim); |
| if (flim > rlim.rlim_cur / (24 * 1024)) { |
| printf( |
| "The current maximum file limit (%ld) if reduced due to system " |
| "memory " |
| "limits(%ld)\n", |
| flim, (long)(rlim.rlim_cur / (24 * 1024))); |
| flim = rlim.rlim_cur / (24 * 1024); |
| } else { |
| // printf( |
| // "The current maximum file limit (%d) is supported by the stack |
| // memory |
| // " |
| // "limits(%d)\n", |
| // flim, (int)(rlim.rlim_cur / 2068)); |
| } |
| // return what we have |
| return flim; |
| } |