blob: 218abf856176f406ff241e5b5229734a245e0558 [file] [log] [blame] [raw]
/*
copyright: Boaz segev, 2016
license: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
// lib server is based off and requires the following libraries:
#include "lib-server.h"
#include "libbuffer.h"
// #include <ssl.h>
// sys includes
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netdb.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#include <errno.h>
////////////////////////////////////////////////////////////////////////////////
// socket binding and server limits helpers
static int bind_server_socket(server_pt);
static int set_non_blocking_socket(int fd);
static long srv_capacity(void);
////////////////////////////////////////////////////////////////////////////////
// The data associated with each connection
struct ConnectionData {
server_pt srv;
struct Protocol* protocol;
ssize_t (*reading_hook)(server_pt srv, int fd, void* buffer, size_t size);
void* buffer;
void* udata;
int fd;
volatile uint32_t counter;
unsigned tout : 8;
unsigned idle : 8;
volatile unsigned busy : 1;
};
////////////////////////////////////////////////////////////////////////////////
// Task object containers
// the data-type for async messages
struct FDTask {
union fd_id conn_id;
struct FDTask* next;
server_pt server;
void (*task)(server_pt server, uint64_t conn_id, void* arg);
void* arg;
void (*fallback)(server_pt server, uint64_t origin, void* arg);
};
// A self handling task structure
struct GroupTask {
uint64_t conn_origin;
struct GroupTask* next;
server_pt server;
char* service;
void (*task)(server_pt server, uint64_t conn, void* arg);
void* arg;
void (*on_finished)(server_pt server, uint64_t origin, void* arg);
uint32_t pos;
};
////////////////////////////////////////////////////////////////////////////////
// The server data object container
struct Server {
// inherit the reactor (must be first in the struct, for pointer conformity).
struct Reactor reactor;
// a pointer to the server's settings object.
struct ServerSettings* settings;
// a pointer to the thread pool object (libasync).
struct Async* async;
// a mutex for server data integrity
pthread_mutex_t lock;
// maps each connection to it's protocol.
struct ConnectionData* connections;
/// a mutex for server data integrity
pthread_mutex_t task_lock;
/// fd task pool.
struct FDTask* fd_task_pool;
/// group task pool.
struct GroupTask* group_task_pool;
/// task(s) pool size
size_t fd_task_pool_size;
size_t group_task_pool_size;
// socket capacity
long capacity;
/// the last timeout review
time_t last_to;
/// the server socket
int srvfd;
/// the original process pid
pid_t root_pid;
/// the flag that tells the server to stop
volatile char run;
};
////////////////////////////////////////////////////////////////////////////////
// Macros for checking a connection ID is valid or retriving it's data
/** returns the connection's data */
#define connection_data(srv, conn) ((srv)->connections[(conn).data.fd])
/** valuates as FALSE (0) if the connection is valid and true if outdated */
#define validate_connection(srv, conn) \
((connection_data((srv), (conn)).counter != (conn).data.counter) && \
connection_data((srv), (conn)).protocol)
/** returns a value if the connection isn't valid */
#define validate_connection_or_return(srv, conn, ret) \
if (validate_connection((srv), (conn))) \
return (ret);
// object accessing helper macros
/** casts the server to the reactor pointer. */
#define _reactor_(server) ((struct Reactor*)(server))
/** casts the reactor pointer to a server pointer. */
#define _server_(reactor) ((server_pt)(reactor))
/** gets a specific connection data object from the server (reactor). */
#define _connection_(reactor, fd) (_server_(reactor)->connections[(fd)])
/** gets a specific protocol from a server's connection. */
#define _protocol_(reactor, fd) (_connection_((reactor), (fd))).protocol
/** gets a server connection's UUID. */
#define _fd_uuid_(reactor, sfd) \
(((union fd_id){.data.fd = (sfd), \
.data.counter = _connection_((reactor), (sfd)).counter}) \
.uuid)
/** creates a UUID from an fd and a counter object. */
#define _fd_counter_uuid_(sfd, count) \
(((union fd_id){.data.fd = (sfd), .data.counter = count}).uuid)
////////////////////////////////////////////////////////////////////////////////
// Server API gateway
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// Server API declerations
////////////////////////////////////////////////////////////////////////////////
// Server settings and objects
/** returns the originating process pid */
static pid_t root_pid(server_pt server);
/// allows direct access to the reactor object. use with care.
static struct Reactor* srv_reactor(server_pt server);
/// allows direct access to the server's original settings. use with care.
static struct ServerSettings* srv_settings(server_pt server);
/// returns the computed capacity for any server instance on the system.
static long srv_capacity(void);
/**
Returns the file descriptor belonging to the connection's UUID, if
available. Returns -1 if the connection is closed (we cannot use 0 since 0
is potentially a valid file descriptor). */
int to_fd(server_pt server, uint64_t connection_id);
////////////////////////////////////////////////////////////////////////////////
// Server actions
/** listens to a server with the following server settings (which MUST include
a default protocol). */
static int srv_listen(struct ServerSettings);
/// stops a specific server, closing any open connections.
static void srv_stop(server_pt server);
/// stops any and all server instances, closing any open connections.
static void srv_stop_all(void);
// helpers
static void srv_cycle_core(server_pt server);
static int set_to_busy(server_pt server, int fd);
static void async_on_data(struct ConnectionData* conn);
static void on_ready(struct Reactor* reactor, int fd);
static void on_shutdown(struct Reactor* reactor, int fd);
static void on_close(struct Reactor* reactor, int fd);
static void clear_conn_data(server_pt server, int fd);
static void accept_async(server_pt server);
// signal management
static void register_server(server_pt server);
static void on_signal(int sig);
////////////////////////////////////////////////////////////////////////////////
// Socket settings and data
/** Returns true if a specific connection's protected callback is running.
Protected callbacks include only the `on_message` callback and tasks forwarded
to the connection using the `td_task` or `each` functions.
*/
static uint8_t is_busy(server_pt server, union fd_id cuuid);
/** Returns true if the connection's UUID points to a valid connection (valid
* meanning `on_close` wasn't called and processed just yet).
*/
static uint8_t is_open(server_pt server, union fd_id cuuid);
/// retrives the active protocol object for the requested file descriptor.
static struct Protocol* get_protocol(server_pt server, union fd_id cuuid);
/// sets the active protocol object for the requested file descriptor.
static int set_protocol(server_pt server,
union fd_id cuuid,
struct Protocol* new_protocol);
/** retrives an opaque pointer set by `set_udata` and associated with the
connection.
since no new connections are expected on fd == 0..2, it's possible to store
global data in these locations. */
static void* get_udata(server_pt server, union fd_id cuuid);
/** Sets the opaque pointer to be associated with the connection. returns the
old pointer, if any. */
static void* set_udata(server_pt server, union fd_id cuuid, void* udata);
/** Sets the timeout limit for the specified connectionl, in seconds, up to
255 seconds (the maximum allowed timeout count). */
static void set_timeout(server_pt server, union fd_id cuuid, uint8_t timeout);
////////////////////////////////////////////////////////////////////////////////
// Socket actions
/** Attaches an existing connection (fd) to the server's reactor and protocol
management system, so that the server can be used also to manage connection
based resources asynchronously (i.e. database resources etc').
*/
static int srv_attach(server_pt server, int sockfd, struct Protocol* protocol);
/** Closes the connection.
If any data is waiting to be written, close will
return immediately and the connection will only be closed once all the data
was sent. */
static void srv_close(server_pt server, union fd_id cuuid);
/** Hijack a socket (file descriptor) from the server, clearing up it's
resources. The control of hte socket is totally relinquished.
This method will block until all the data in the buffer is sent before
releasing control of the socket. */
static int srv_hijack(server_pt server, union fd_id cuuid);
/** Counts the number of connections for the specified protocol (NULL = all
protocols). */
static long srv_count(server_pt server, char* service);
/// "touches" a socket, reseting it's timeout counter.
static void srv_touch(server_pt server, union fd_id cuuid);
////////////////////////////////////////////////////////////////////////////////
// Read and Write
/**
Sets up the read/write hooks, allowing for transport layer extensions (i.e.
SSL/TLS) or monitoring extensions.
*/
void rw_hooks(
server_pt srv,
union fd_id cuuid,
ssize_t (*reading_hook)(server_pt srv, int fd, void* buffer, size_t size),
ssize_t (*writing_hook)(server_pt srv, int fd, void* data, size_t len));
/** Reads up to `max_len` of data from a socket. the data is stored in the
`buffer` and the number of bytes received is returned.
Returns -1 if an error was raised and the connection was closed.
Returns the number of bytes written to the buffer. Returns 0 if no data was
available.
*/
static ssize_t srv_read(server_pt srv,
union fd_id cuuid,
void* buffer,
size_t max_len);
/** Copies & writes data to the socket, managing an asyncronous buffer.
returns 0 on success. success means that the data is in a buffer waiting to
be written. If the socket is forced to close at this point, the buffer will be
destroyed (never sent).
On error, returns -1 otherwise returns the number of bytes in `len`.
*/
static ssize_t srv_write(server_pt server,
union fd_id cuuid,
void* data,
size_t len);
/** Writes data to the socket, moving the data's pointer directly to the buffer.
Once the data was written, `free` will be called to free the data's memory.
returns 0 on success. success means that the data is in a buffer waiting to
be written. If the socket is forced to close at this point, the buffer will be
destroyed (never sent).
On error, returns -1 otherwise returns the number of bytes in `len`.
*/
static ssize_t srv_write_move(server_pt server,
union fd_id cuuid,
void* data,
size_t len);
/** Copies & writes data to the socket, managing an asyncronous buffer.
Each call to a `write` function considers it's data atomic (a single package).
The `urgent` varient will send the data as soon as possible, without distrupting
any data packages (data written using `write` will not be interrupted in the
middle).
returns 0 on success. success means that the data is in a buffer waiting to
be written. If the socket is forced to close at this point, the buffer will be
destroyed (never sent).
On error, returns -1 otherwise returns the number of bytes in `len`.
*/
static ssize_t srv_write_urgent(server_pt server,
union fd_id cuuid,
void* data,
size_t len);
/** Writes data to the socket, moving the data's pointer directly to the buffer.
Once the data was written, `free` will be called to free the data's memory.
Each call to a `write` function considers it's data atomic (a single package).
The `urgent` varient will send the data as soon as possible, without distrupting
any data packages (data written using `write` will not be interrupted in the
middle).
returns 0 on success. success means that the data is in a buffer waiting to
be written. If the socket is forced to close at this point, the buffer will be
destroyed (never sent).
On error, returns -1 otherwise returns the number of bytes in `len`.
*/
static ssize_t srv_write_move_urgent(server_pt server,
union fd_id cuuid,
void* data,
size_t len);
/**
Sends a whole file as if it were a single atomic packet.
Once the file was sent, the `FILE *` will be closed using `fclose`.
The file will be buffered to the socket chunk by chunk, so that memory
consumption is capped at ~ 64Kb.
*/
static ssize_t srv_sendfile(server_pt server, union fd_id cuuid, FILE* file);
////////////////////////////////////////////////////////////////////////////////
// Tasks + Async
/** Schedules a specific task to run asyncronously for each connection.
a NULL service identifier == all connections (all protocols). */
static int each(server_pt server,
union fd_id origin,
char* service,
void (*task)(server_pt server, uint64_t target_fd, void* arg),
void* arg,
void (*on_finish)(server_pt server,
uint64_t origin,
void* arg));
/** Schedules a specific task to run for each connection. The tasks will be
* performed sequencially, in a blocking manner. The method will only return
* once all the tasks were completed. A NULL service identifier == all
* connections (all protocols).
*/
static int each_block(server_pt server,
union fd_id origin,
char* service,
void (*task)(server_pt server,
uint64_t target_fd,
void* arg),
void* arg);
/** Schedules a specific task to run asyncronously for a specific connection.
returns -1 on failure, 0 on success (success being scheduling or performing
the task).
*/
static int fd_task(server_pt server,
union fd_id target,
void (*task)(server_pt server, uint64_t fd, void* arg),
void* arg,
void (*fallback)(server_pt server, uint64_t fd, void* arg));
/** Runs an asynchronous task, IF threading is enabled (set the
`threads` to 1 (the default) or greater).
If threading is disabled, the current thread will perform the task and return.
Returns -1 on error or 0
on succeess.
*/
static int run_async(server_pt self, void task(void*), void* arg);
/** Creates a system timer (at the cost of 1 file descriptor) and pushes the
timer to the reactor. The task will NOT repeat. Returns -1 on error or the
new file descriptor on succeess.
NOTICE: Do NOT create timers from within the on_close callback, as this might
block resources from being properly freed (if the timer and the on_close
object share the same fd number).
*/
static int run_after(server_pt self,
long milliseconds,
void task(void*),
void* arg);
/** Creates a system timer (at the cost of 1 file descriptor) and pushes the
timer to the reactor. The task will repeat `repetitions` times. if
`repetitions` is set to 0, task will repeat forever. Returns -1 on error
or the new file descriptor on succeess.
NOTICE: Do NOT create timers from within the on_close callback, as this might
block resources from being properly freed (if the timer and the on_close
object share the same fd number).
*/
static int run_every(server_pt self,
long milliseconds,
int repetitions,
void task(void*),
void* arg);
// Global Task helpers
static inline int perform_single_task(server_pt srv,
uint64_t connection_id,
void (*task)(server_pt server,
uint64_t connection_id,
void* arg),
void* arg);
// FDTask helpers
struct FDTask* new_fd_task(server_pt srv);
void destroy_fd_task(server_pt srv, struct FDTask* task);
static void perform_fd_task(struct FDTask* task);
// GroupTask helpers
struct GroupTask* new_group_task(server_pt srv);
void destroy_group_task(server_pt srv, struct GroupTask* task);
static void perform_group_task(struct GroupTask* task);
void add_to_group_task(server_pt server, int fd, void* arg);
////////////////////////////////////////////////////////////////////////////////
// The actual Server API access setup
// The following allows access to helper functions and defines a namespace
// for the API in this library.
const struct Server__API___ Server = {
/* accessor and server functions */
.reactor = srv_reactor,
.settings = srv_settings,
.capacity = srv_capacity,
.listen = srv_listen,
.stop = srv_stop,
.stop_all = srv_stop_all,
.root_pid = root_pid,
/* connection data functions */
.is_busy = (uint8_t (*)(server_pt, uint64_t))is_busy,
.is_open = (uint8_t (*)(server_pt, uint64_t))is_open,
.get_protocol = (struct Protocol * (*)(server_pt, uint64_t))get_protocol,
.set_protocol =
(int (*)(server_pt, uint64_t, struct Protocol*))set_protocol,
.get_udata = (void* (*)(server_pt, uint64_t))get_udata,
.set_udata = (void* (*)(server_pt, uint64_t, void*))set_udata,
.set_timeout = (void (*)(server_pt, uint64_t, uint8_t))set_timeout,
/* connection managment functions */
.attach = srv_attach,
.close = (void (*)(server_pt, uint64_t))srv_close,
.hijack = (int (*)(server_pt, uint64_t))srv_hijack,
.count = srv_count,
/* connection activity and read/write functions */
.touch = (void (*)(server_pt, uint64_t))srv_touch,
.rw_hooks = (void (*)(server_pt,
uint64_t,
ssize_t (*)(server_pt, int, void*, size_t),
ssize_t (*)(server_pt, int, void*, size_t)))rw_hooks,
.read = (ssize_t (*)(server_pt, uint64_t, void*, size_t))srv_read,
.write = (ssize_t (*)(server_pt, uint64_t, void*, size_t))srv_write,
.write_move =
(ssize_t (*)(server_pt, uint64_t, void*, size_t))srv_write_move,
.write_urgent =
(ssize_t (*)(server_pt, uint64_t, void*, size_t))srv_write_urgent,
.write_move_urgent = (ssize_t (*)(server_pt,
uint64_t,
void*,
size_t))srv_write_move_urgent, //
.sendfile = (ssize_t (*)(server_pt, uint64_t, FILE*))srv_sendfile,
/* connection tasks functions */
.each = (int (*)(server_pt,
uint64_t,
char*,
void (*)(server_pt, uint64_t, void*),
void*,
void (*)(server_pt, uint64_t, void*)))each,
.each_block = (int (*)(server_pt,
uint64_t,
char*,
void (*)(server_pt, uint64_t, void*),
void*))each_block,
.fd_task = (int (*)(server_pt,
uint64_t,
void (*)(server_pt, uint64_t, void*),
void*,
void (*)(server_pt, uint64_t, void*)))fd_task,
/* global task functions */
.run_async = run_async,
.run_after = run_after,
.run_every = run_every,
};
////////////////////////////////////////////////////////////////////////////////
// timer protocol
// service name
static char* timer_protocol_name = "timer";
// a timer protocol, will be allocated for each timer.
struct TimerProtocol {
// must be first for pointer compatability
struct Protocol parent;
void (*task)(void*);
void* arg;
// how many repeats?
int repeat;
};
// the timer's on_data callback
static void tp_perform_on_data(server_pt self, uint64_t conn) {
struct TimerProtocol* timer =
(struct TimerProtocol*)Server.get_protocol(self, conn);
if (timer) {
// set local data copy, to avoid race contitions related to `free`.
void (*task)(void*) = (void (*)(void*))timer->task;
void* arg = timer->arg;
// perform the task
if (task)
task(arg);
// reset the timer
reactor_reset_timer(server_uuid_to_fd(conn));
// close the file descriptor
if (timer->repeat < 0)
return;
if (timer->repeat == 0) {
reactor_close((struct Reactor*)self, server_uuid_to_fd(conn));
return;
}
timer->repeat--;
}
}
// the timer's on_close (cleanup)
static void tp_perform_on_close(server_pt self, uint64_t conn) {
// free the protocol object when it was "aborted" using `close`.
struct TimerProtocol* timer =
(struct TimerProtocol*)Server.get_protocol(self, conn);
if (timer)
free(timer);
}
// creates a new TimeProtocol object.
// use: TimerProtocol(task, arg, rep)
static struct TimerProtocol* TimerProtocol(void* task,
void* arg,
int repetitions) {
struct TimerProtocol* tp = malloc(sizeof(struct TimerProtocol));
*tp = (struct TimerProtocol){.parent.on_data = tp_perform_on_data,
.parent.on_close = tp_perform_on_close,
.parent.service = timer_protocol_name,
.task = task,
.arg = arg,
.repeat = repetitions - 1};
return tp;
}
////////////////////////////////////////////////////////////////////////////////
// Server settings and objects
/** returns the originating process pid */
static pid_t root_pid(server_pt server) {
return server->root_pid;
}
/// allows direct access to the reactor object. use with care.
static struct Reactor* srv_reactor(server_pt server) {
return (struct Reactor*)server;
}
/// allows direct access to the server's original settings. use with care.
static struct ServerSettings* srv_settings(server_pt server) {
return server->settings;
}
////////////////////////////////////////////////////////////////////////////////
// Socket settings and data
/** Returns true if a specific connection's protected callback is running AND
the connection is still active
Protected callbacks include only the `on_message` callback and tasks forwarded
to the connection using the `td_task` or `each` functions.
*/
static uint8_t is_busy(server_pt server, union fd_id cuuid) {
return server->connections[cuuid.data.fd].counter == cuuid.data.counter &&
server->connections[cuuid.data.fd].busy;
}
/** Returns true if a specific connection is still valid (on_close hadn't
completed) */
static uint8_t is_open(server_pt server, union fd_id cuuid) {
return validate_connection(server, cuuid);
}
/// retrives the active protocol object for the requested file descriptor.
static struct Protocol* get_protocol(server_pt server, union fd_id conn) {
validate_connection_or_return(server, conn, NULL);
return server->connections[conn.data.fd].protocol;
}
/// sets the active protocol object for the requested file descriptor.
static int set_protocol(server_pt server,
union fd_id conn,
struct Protocol* new_protocol) {
validate_connection_or_return(server, conn, -1);
// on_close and set_protocol should never conflict.
// we should also prevent the same thread from deadlocking
// (i.e., when on_close tries to update the protocol)
if (pthread_mutex_trylock(&(server->lock)))
return -1;
// review the connection's validity again (in proteceted state)
if (validate_connection(server, conn) ||
!server->connections[conn.data.fd].protocol) {
pthread_mutex_unlock(&(server->lock));
// fprintf(stderr,
// "ERROR: Cannot set a protocol for a disconnected socket.\n");
return -1;
}
// set the new protocol
server->connections[conn.data.fd].protocol = new_protocol;
// release the lock
pthread_mutex_unlock(&(server->lock));
// return 0 (no error)
return 0;
}
/** retrives an opaque pointer set by `set_udata` and associated with the
connection.
since no new connections are expected on fd == 0..2, it's possible to store
global data in these locations. */
static void* get_udata(server_pt server, union fd_id conn) {
validate_connection_or_return(server, conn, NULL);
return server->connections[conn.data.fd].udata;
}
/** Sets the opaque pointer to be associated with the connection. returns the
old pointer, if any.
Returns NULL both on error (i.e. closed connection or old connection ID) and
sucess (no previous value). Check that the value was set using `get_udata`.
*/
static void* set_udata(server_pt server, union fd_id conn, void* udata) {
validate_connection_or_return(server, conn, NULL);
pthread_mutex_lock(&(server->lock));
if (validate_connection(server, conn)) {
pthread_mutex_unlock(&(server->lock));
return NULL;
}
void* old = connection_data(server, conn).udata;
connection_data(server, conn).udata = udata;
pthread_mutex_unlock(&(server->lock));
return old;
}
/** Sets the timeout limit for the specified connectionl, in seconds, up to
255 seconds (the maximum allowed timeout count). */
static void set_timeout(server_pt server, union fd_id conn, uint8_t timeout) {
connection_data(server, conn).tout = timeout;
}
////////////////////////////////////////////////////////////////////////////////
// Server actions & Core
// clears a connection's data
static void clear_conn_data(server_pt server, int fd) {
Buffer.clear(server->connections[fd].buffer);
server->connections[fd].counter++;
server->connections[fd].idle = 0;
server->connections[fd].protocol = NULL;
server->connections[fd].reading_hook = NULL;
server->connections[fd].tout = 0;
server->connections[fd].udata = NULL;
server->connections[fd].busy = 0;
}
// on_ready, handles async buffer sends
static void on_ready(struct Reactor* reactor, int fd) {
if (Buffer.flush(_server_(reactor)->connections[fd].buffer,
_fd_uuid_(reactor, fd)) > 0)
_server_(reactor)->connections[fd].idle = 0;
if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_ready)
_protocol_(reactor, fd)
->on_ready(_server_(reactor), _fd_uuid_(reactor, fd));
}
/// called for any open file descriptors when the reactor is shutting down.
static void on_shutdown(struct Reactor* reactor, int fd) {
// call the callback for the mentioned active(?) connection.
if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_shutdown)
_protocol_(reactor, fd)
->on_shutdown(_server_(reactor), _fd_uuid_(reactor, fd));
}
// called when a file descriptor was closed (either locally or by the other
// party, when it's a socket or a pipe).
static void on_close(struct Reactor* reactor, int fd) {
if (!_protocol_(reactor, fd))
return;
// file descriptors can be added from external threads (i.e. creating timers),
// this could cause race conditions and data corruption,
// (i.e., when on_close is releasing memory).
int lck = 0;
// this should preven the same thread from calling on_close recursively
// (i.e., when using the Server.attach)
if ((lck = pthread_mutex_trylock(&(_server_(reactor)->lock)))) {
if (lck != EAGAIN)
return;
pthread_mutex_lock(&(_server_(reactor)->lock));
}
if (_protocol_(reactor, fd)) {
if (_protocol_(reactor, fd)->on_close)
_protocol_(reactor, fd)
->on_close(_server_(reactor), _fd_uuid_(reactor, fd));
clear_conn_data(_server_(reactor), fd);
}
pthread_mutex_unlock(&(_server_(reactor)->lock));
}
// The busy flag is used to make sure that a single connection doesn't perform
// two "critical" tasks at the same time. Critical tasks are defined as the
// `on_message` callback and any user task requested by `each` or `fd_task`.
static int set_to_busy(server_pt server, int sockfd) {
static pthread_mutex_t locker = PTHREAD_MUTEX_INITIALIZER;
if (!_protocol_(server, sockfd))
return 0;
pthread_mutex_lock(&locker);
if (_connection_(server, sockfd).busy == 1) {
pthread_mutex_unlock(&locker);
return 0;
}
_connection_(server, sockfd).busy = 1;
pthread_mutex_unlock(&locker);
return 1;
}
// accepts new connections
static void accept_async(server_pt server) {
static socklen_t cl_addrlen = 0;
int client = 1;
while (1) {
#ifdef SOCK_NONBLOCK
client = accept4(server->srvfd, NULL, &cl_addrlen, SOCK_NONBLOCK);
if (client <= 0)
return;
#else
client = accept(server->srvfd, NULL, &cl_addrlen);
if (client <= 0)
return;
set_non_blocking_socket(client);
#endif
// handle server overload
if (client >= _reactor_(server)->maxfd) {
if (server->settings->busy_msg)
if (write(client, server->settings->busy_msg,
strlen(server->settings->busy_msg)) < 0)
;
close(client);
continue;
}
// attach the new client (performs on_close if needed)
srv_attach(server, client, server->settings->protocol);
}
}
// makes sure that the on_data callback isn't overlapping a previous on_data
static void async_on_data(struct ConnectionData* conn) {
// compute sockfd by comparing the distance between the original pointer and
// the passed pointer's address.
if (!conn || !conn->protocol || !conn->protocol->on_data)
return;
// if we get the handle, perform the task
if (set_to_busy(conn->srv, conn->fd)) {
conn->idle = 0;
conn->protocol->on_data(conn->srv,
_fd_counter_uuid_(conn->fd, conn->counter));
// release the handle
conn->busy = 0;
return;
}
// we didn't get the handle, reschedule - but only if the connection is still
// open.
if (conn->protocol)
Async.run(conn->srv->async, (void (*)(void*))async_on_data, conn);
}
static void on_data(struct Reactor* reactor, int fd) {
// static socklen_t cl_addrlen = 0;
if (fd == _server_(reactor)->srvfd) {
// listening socket. accept connections.
Async.run(_server_(reactor)->async, (void (*)(void*))accept_async, reactor);
} else if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_data) {
_connection_(reactor, fd).idle = 0;
// clients, forward on.
Async.run(_server_(reactor)->async, (void (*)(void*))async_on_data,
&_connection_(reactor, fd));
}
}
// calls the reactor's core and checks for timeouts.
// schedules it's own execution when done.
// shouldn't be called by more then a single thread at a time (NOT thread safe)
static void srv_cycle_core(server_pt server) {
static size_t idle_performed = 0;
int delta; // we also use this for other things, but it's the TOut delta
// review reactor events
delta = reactor_review(_reactor_(server));
if (delta < 0) {
srv_stop(server);
return;
} else if (delta == 0) {
if (server->settings->on_idle && idle_performed == 0)
server->settings->on_idle(server);
idle_performed = 1;
} else
idle_performed = 0;
// timeout + local close management
if (server->last_to != _reactor_(server)->last_tick) {
// We use the delta with fuzzy logic (only after the first second)
int delta = _reactor_(server)->last_tick - server->last_to;
for (long i = 3; i <= _reactor_(server)->maxfd; i++) {
if (_protocol_(server, i) && fcntl(i, F_GETFL) < 0 && errno == EBADF) {
reactor_close(_reactor_(server), i);
}
if (_connection_(server, i).tout) {
if (_connection_(server, i).tout > _connection_(server, i).idle)
_connection_(server, i).idle +=
_connection_(server, i).idle ? delta : 1;
else {
if (_protocol_(server, i) && _protocol_(server, i)->ping)
_protocol_(server, i)->ping(server, i);
else if (!_connection_(server, i).busy ||
_connection_(server, i).idle == 255)
reactor_close(_reactor_(server), i);
}
}
}
// ready for next call.
server->last_to = _reactor_(server)->last_tick;
// double en = (float)clock()/CLOCKS_PER_SEC;
// printf("timeout review took %f milliseconds\n", (en-st)*1000);
}
if (server->run &&
Async.run(server->async, (void (*)(void*))srv_cycle_core, server)) {
perror(
"FATAL ERROR:"
"couldn't schedule the server's reactor in the task queue");
exit(1);
}
}
/** listens to a server with the following server settings (which MUST include
a default protocol). */
static int srv_listen(struct ServerSettings settings) {
// review the settings, setup defaults when something's missing
if (!settings.protocol) {
fprintf(stderr,
"Server err: (protocol == NULL) Running a server requires "
"a protocol for new connections.\n");
return -1;
}
if (!settings.port)
settings.port = "3000";
if (!settings.timeout)
settings.timeout = 5;
if (!settings.threads || settings.threads <= 0)
settings.threads = 1;
if (!settings.processes || settings.processes <= 0)
settings.processes = 1;
// V.3 Avoids using the Stack, allowing userspace to use the memory
long capacity = srv_capacity();
struct ConnectionData* connections = calloc(sizeof(*connections), capacity);
if (!connections)
return -1;
// populate the Server structure with the data
struct Server srv = {
// initialize the server object
.settings = &settings, // store a pointer to the settings
.last_to = 0, // last timeout review
.capacity = capacity, // the server's capacity
.connections = connections,
.fd_task_pool = NULL,
.fd_task_pool_size = 0,
.group_task_pool = NULL,
.group_task_pool_size = 0,
.reactor.maxfd = capacity - 1,
.reactor.on_data = on_data,
.reactor.on_ready = on_ready,
.reactor.on_shutdown = on_shutdown,
.reactor.on_close = on_close,
};
// initialize the server data mutex
if (pthread_mutex_init(&srv.lock, NULL)) {
free(connections);
return -1;
}
// initialize the server task pool mutex
if (pthread_mutex_init(&srv.task_lock, NULL)) {
pthread_mutex_destroy(&srv.lock);
free(connections);
return -1;
}
// bind the server's socket - if relevent
int srvfd = 0;
if (settings.port > 0) {
srvfd = bind_server_socket(&srv);
// if we didn't get a socket, quit now.
if (srvfd < 0) {
pthread_mutex_destroy(&srv.task_lock);
pthread_mutex_destroy(&srv.lock);
free(connections);
return -1;
}
srv.srvfd = srvfd;
}
// initialize connection data...
for (int i = 0; i < capacity; i++) {
connections[i].srv = &srv;
connections[i].fd = i;
connections[i].buffer = Buffer.create(&srv);
}
// register signals - do this before concurrency, so that they are inherited.
struct sigaction old_term, old_int, old_pipe, new_int, new_pipe;
sigemptyset(&new_int.sa_mask);
sigemptyset(&new_pipe.sa_mask);
new_pipe.sa_flags = new_int.sa_flags = 0;
new_pipe.sa_handler = SIG_IGN;
new_int.sa_handler = on_signal;
sigaction(SIGINT, &new_int, &old_int);
sigaction(SIGTERM, &new_int, &old_term);
sigaction(SIGPIPE, &new_pipe, &old_pipe);
// setup concurrency
srv.root_pid = getpid();
pid_t pids[settings.processes > 0 ? settings.processes : 0];
if (settings.processes > 1) {
pids[0] = 0;
for (int i = 1; i < settings.processes; i++) {
if (getpid() == srv.root_pid)
pids[i] = fork();
}
}
// once we forked, we can initiate a thread pool for each process
srv.async = Async.create(settings.threads);
if (srv.async <= 0) {
if (srvfd)
close(srvfd);
pthread_mutex_destroy(&srv.task_lock);
pthread_mutex_destroy(&srv.lock);
free(connections);
return -1;
}
// register server for signals
register_server(&srv);
// let'm know...
if (srvfd)
printf(
"(pid %d x %d threads) Listening on port %s (max sockets: %lu, ~%d "
"used)\n",
getpid(), srv.settings->threads, srv.settings->port, srv.capacity,
srv.srvfd + 2);
else
printf(
"(pid %d x %d threads) Started a non-listening network service"
"(max sockets: %lu ~ at least 6 are in system use)\n",
getpid(), srv.settings->threads, srv.capacity);
// initialize reactor
reactor_init(&srv.reactor);
// bind server data to reactor loop
if (srvfd)
reactor_add(&srv.reactor, srv.srvfd);
// call the on_init callback
if (settings.on_init) {
settings.on_init(&srv);
}
// initiate the core's cycle
srv.run = 1;
Async.run(srv.async, (void (*)(void*))srv_cycle_core, &srv);
Async.wait(srv.async);
// cleanup
reactor_stop(&srv.reactor);
if (settings.processes > 1 && getpid() == srv.root_pid) {
int sts;
for (int i = 1; i < settings.processes; i++) {
// printf("sending signal to pid %d\n", pids[i]);
kill(pids[i], SIGINT);
}
for (int i = 1; i < settings.processes; i++) {
sts = 0;
// printf("waiting for pid %d\n", pids[i]);
if (waitpid(pids[i], &sts, 0) != pids[i])
perror("waiting for child process had failed");
}
}
if (settings.on_finish)
settings.on_finish(&srv);
printf("(pid %d) Stopped listening on port %s\n", getpid(), settings.port);
if (getpid() != srv.root_pid) {
fflush(NULL);
exit(0);
}
// restore signal state
sigaction(SIGINT, &old_int, NULL);
sigaction(SIGTERM, &old_term, NULL);
sigaction(SIGPIPE, &old_pipe, NULL);
// destroy the buffers.
for (int i = 0; i < capacity; i++) {
Buffer.destroy(connections[i].buffer);
connections[i].buffer = NULL;
}
// destroy the task pools
destroy_fd_task(&srv, NULL);
destroy_group_task(&srv, NULL);
// destroy the mutexes
pthread_mutex_destroy(&srv.lock);
pthread_mutex_destroy(&srv.task_lock);
// free the connection data array
free(connections);
return 0;
}
////////////////////////////////////////////////////////////////////////////////
// Socket actions
/** Attaches an existing connection (fd) to the server's reactor and protocol
management system, so that the server can be used also to manage connection
based resources asynchronously (i.e. database resources etc').
*/
static int srv_attach_core(server_pt server,
int sockfd,
struct Protocol* protocol,
long milliseconds) {
if (sockfd >= server->capacity)
return -1;
if (_protocol_(server, sockfd)) {
on_close((struct Reactor*)server, sockfd);
}
// setup protocol
_protocol_(server, sockfd) = protocol; // set_protocol() would cost more
// setup timeouts
_connection_(server, sockfd).tout = server->settings->timeout;
_connection_(server, sockfd).idle = 0;
// register the client - start it off as busy, protocol still initializing
// we don't need the mutex, because it's all fresh
_connection_(server, sockfd).busy = 1;
// attach the socket to the reactor
if (((milliseconds > 0) && (reactor_add_timer((struct Reactor*)server, sockfd,
milliseconds) < 0)) ||
((milliseconds <= 0) &&
(reactor_add((struct Reactor*)server, sockfd) < 0))) {
clear_conn_data(server, sockfd);
return -1;
}
// call on_open
if (protocol->on_open)
protocol->on_open(server, _fd_uuid_(server, sockfd));
_connection_(server, sockfd).busy = 0;
return 0;
}
static int srv_attach(server_pt server, int sockfd, struct Protocol* protocol) {
return srv_attach_core(server, sockfd, protocol, 0);
}
/** Closes the connection.
If any data is waiting to be written, close will
return immediately and the connection will only be closed once all the data
was sent. */
static void srv_close(server_pt server, union fd_id conn) {
if (validate_connection(server, conn) || !_protocol_(server, conn.data.fd))
return;
if (Buffer.is_empty(_connection_(server, conn.data.fd).buffer)) {
reactor_close((struct Reactor*)server, conn.data.fd);
} else
Buffer.close_when_done(_connection_(server, conn.data.fd).buffer,
conn.data.fd);
}
/** Hijack a socket (file descriptor) from the server, clearing up it's
resources. The control of the socket is totally relinquished.
This method will block until all the data in the buffer is sent before
releasing control of the socket. */
static int srv_hijack(server_pt server, union fd_id conn) {
if (validate_connection(server, conn) || !_protocol_(server, conn.data.fd))
return -1;
reactor_remove((struct Reactor*)server, conn.data.fd);
while (!Buffer.is_empty(_connection_(server, conn.data.fd).buffer) &&
Buffer.flush(_connection_(server, conn.data.fd).buffer,
conn.data.fd) >= 0)
;
clear_conn_data(server, conn.data.fd);
return 0;
}
/** Counts the number of connections for the specified protocol (NULL = all
protocols). */
static long srv_count(server_pt server, char* service) {
int c = 0;
if (service) {
for (int i = 0; i < server->capacity; i++) {
if (_protocol_(server, i) &&
(_protocol_(server, i)->service == service ||
!strcmp((_protocol_(server, i)->service), service)))
c++;
}
} else {
for (int i = 0; i < server->capacity; i++) {
if (_protocol_(server, i) &&
_protocol_(server, i)->service != timer_protocol_name)
c++;
}
}
return c;
}
/// "touches" a socket, reseting it's timeout counter.
static void srv_touch(server_pt server, union fd_id conn) {
if (validate_connection(server, conn))
return;
_connection_(server, conn.data.fd).idle = 0;
}
////////////////////////////////////////////////////////////////////////////////
// Read and Write
/**
Sets up the read/write hooks, allowing for transport layer extensions (i.e.
SSL/TLS) or monitoring extensions.
*/
void rw_hooks(
server_pt srv,
union fd_id conn,
ssize_t (*reading_hook)(server_pt srv, int fd, void* buffer, size_t size),
ssize_t (*writing_hook)(server_pt srv, int fd, void* data, size_t len)) {
if (validate_connection(srv, conn))
return;
_connection_(srv, conn.data.fd).reading_hook = reading_hook;
Buffer.set_whook(_connection_(srv, conn.data.fd).buffer, writing_hook);
}
/** Reads up to `max_len` of data from a socket. the data is stored in the
`buffer` and the number of bytes received is returned.
Returns -1 if an error was raised and the connection was closed.
Returns the number of bytes written to the buffer. Returns 0 if no data was
available.
*/
static ssize_t srv_read(server_pt srv,
union fd_id conn,
void* buffer,
size_t max_len) {
ssize_t read = 0;
if (_connection_(srv, conn.data.fd).reading_hook == NULL) {
// no reading hook
if ((read = recv(conn.data.fd, buffer, max_len, 0)) > 0) {
// reset timeout
_connection_(srv, conn.data.fd).idle = 0;
// return read count
return read;
} else {
if (read && (errno & (EWOULDBLOCK | EAGAIN)))
return 0;
}
return -1;
} else { // existing reading hook
read = _connection_(srv, conn.data.fd)
.reading_hook(srv, conn.data.fd, buffer, max_len);
if (read > 0)
_connection_(srv, conn.data.fd).idle = 0;
return read;
}
}
/** Copies & writes data to the socket, managing an asyncronous buffer.
On error, returns -1 otherwise returns the number of bytes already sent.
*/
static ssize_t srv_write(server_pt server,
union fd_id conn,
void* data,
size_t len) {
validate_connection_or_return(server, conn, -1);
// reset timeout
_connection_(server, conn.data.fd).idle = 0;
// send data
Buffer.write(_connection_(server, conn.data.fd).buffer, data, len);
if (Buffer.flush(_connection_(server, conn.data.fd).buffer, conn.uuid) < 0)
return -1;
return 0;
}
/** Writes data to the socket, moving the data's pointer directly to the buffer.
Once the data was written, `free` will be called to free the data's memory.
On error, returns -1 otherwise returns the number of bytes already sent.
*/
static ssize_t srv_write_move(server_pt server,
union fd_id conn,
void* data,
size_t len) {
validate_connection_or_return(server, conn, -1);
// reset timeout
_connection_(server, conn.data.fd).idle = 0;
// send data
Buffer.write_move(_connection_(server, conn.data.fd).buffer, data, len);
if (Buffer.flush(_connection_(server, conn.data.fd).buffer, conn.uuid) < 0)
return -1;
return 0;
}
/** Copies & writes data to the socket, managing an asyncronous buffer.
Each call to a `write` function considers it's data atomic (a single package).
The `urgent` varient will send the data as soon as possible, without distrupting
any data packages (data written using `write` will not be interrupted in the
middle).
On error, returns -1 otherwise returns the number of bytes already sent.
*/
static ssize_t srv_write_urgent(server_pt server,
union fd_id conn,
void* data,
size_t len) {
validate_connection_or_return(server, conn, -1);
// reset timeout
_connection_(server, conn.data.fd).idle = 0;
// send data
Buffer.write_next(_connection_(server, conn.data.fd).buffer, data, len);
if (Buffer.flush(_connection_(server, conn.data.fd).buffer, conn.uuid) < 0)
return -1;
return 0;
}
/** Writes data to the socket, moving the data's pointer directly to the buffer.
Once the data was written, `free` will be called to free the data's memory.
Each call to a `write` function considers it's data atomic (a single package).
The `urgent` varient will send the data as soon as possible, without distrupting
any data packages (data written using `write` will not be interrupted in the
middle).
On error, returns -1 otherwise returns the number of bytes already sent.
*/
static ssize_t srv_write_move_urgent(server_pt server,
union fd_id conn,
void* data,
size_t len) {
validate_connection_or_return(server, conn, -1);
// reset timeout
_connection_(server, conn.data.fd).idle = 0;
// send data
Buffer.write_move_next(_connection_(server, conn.data.fd).buffer, data, len);
if (Buffer.flush(_connection_(server, conn.data.fd).buffer, conn.uuid) < 0)
return -1;
return 0;
}
/**
Sends a whole file as if it were a single atomic packet.
Once the file was sent, the `FILE *` will be closed using `fclose`.
The file will be buffered to the socket chunk by chunk, so that memory
consumption is capped at ~ 64Kb.
*/
static ssize_t srv_sendfile(server_pt server, union fd_id conn, FILE* file) {
validate_connection_or_return(server, conn, -1);
// reset timeout
_connection_(server, conn.data.fd).idle = 0;
// send data
if (Buffer.sendfile(_connection_(server, conn.data.fd).buffer, file))
return -1;
if (Buffer.flush(_connection_(server, conn.data.fd).buffer, conn.uuid) < 0)
return -1;
return 0;
}
////////////////////////////////////////////////////////////////////////////////
// Tasks + Async
// macro for each task review
#define check_if_connection_fits_service(server, i, service_name) \
((service_name && server->connections[i].protocol && \
server->connections[i].protocol->service && \
(server->connections[i].protocol->service == service_name || \
!strcmp(server->connections[i].protocol->service, service_name))) || \
(service_name == NULL && server->connections[i].protocol && \
server->connections[i].protocol->service != timer_protocol_name))
// Global Task helpers
static inline int perform_single_task(server_pt srv,
uint64_t connection_id,
void (*task)(server_pt server,
uint64_t connection_id,
void* arg),
void* arg) {
validate_connection_or_return(srv, ((union fd_id)connection_id), 0);
if (set_to_busy(srv, ((union fd_id)connection_id).data.fd)) {
// perform the task
task(srv, connection_id, arg);
// release the busy flag
_connection_(srv, connection_id).busy = 0;
// return completion flag;
return 1;
}
return 0;
}
// FDTask helpers
struct FDTask* new_fd_task(server_pt srv) {
struct FDTask* ret = NULL;
pthread_mutex_lock(&srv->task_lock);
if (srv->fd_task_pool) {
ret = srv->fd_task_pool;
srv->fd_task_pool = srv->fd_task_pool->next;
--srv->fd_task_pool_size;
pthread_mutex_unlock(&srv->task_lock);
return ret;
}
pthread_mutex_unlock(&srv->task_lock);
ret = malloc(sizeof(*ret));
return ret;
}
void destroy_fd_task(server_pt srv, struct FDTask* task) {
if (task == NULL) {
pthread_mutex_lock(&srv->task_lock);
struct FDTask* tmp;
srv->fd_task_pool_size = 0;
while ((tmp = srv->fd_task_pool)) {
srv->fd_task_pool = srv->fd_task_pool->next;
free(tmp);
}
pthread_mutex_unlock(&srv->task_lock);
return;
}
pthread_mutex_lock(&srv->task_lock);
if (srv->fd_task_pool_size >= 128) {
pthread_mutex_unlock(&srv->task_lock);
free(task);
return;
}
task->next = srv->fd_task_pool;
srv->fd_task_pool = task;
++srv->fd_task_pool_size;
pthread_mutex_unlock(&srv->task_lock);
}
static void perform_fd_task(struct FDTask* task) {
// is it okay to perform the task?
if (validate_connection(task->server, task->conn_id)) {
if (task->fallback) // check for fallback, call if requested
task->fallback(task->server, task->conn_id.uuid, task->arg);
destroy_fd_task(task->server, task);
return;
}
if (perform_single_task(task->server, task->conn_id.uuid, task->task,
task->arg)) {
// free the memory
destroy_fd_task(task->server, task);
} else
Async.run(task->server->async, (void (*)(void*))perform_fd_task, task);
}
// GroupTask helpers
struct GroupTask* new_group_task(server_pt srv) {
struct GroupTask* ret = NULL;
pthread_mutex_lock(&srv->task_lock);
if (srv->group_task_pool) {
ret = srv->group_task_pool;
srv->group_task_pool = srv->group_task_pool->next;
--srv->group_task_pool_size;
pthread_mutex_unlock(&srv->task_lock);
return ret;
}
pthread_mutex_unlock(&srv->task_lock);
ret = malloc(sizeof(*ret));
memset(ret, 0, sizeof(*ret));
return ret;
}
void destroy_group_task(server_pt srv, struct GroupTask* task) {
if (task == NULL) {
pthread_mutex_lock(&srv->task_lock);
struct GroupTask* tmp;
srv->group_task_pool_size = 0;
while ((tmp = srv->group_task_pool)) {
srv->group_task_pool = srv->group_task_pool->next;
free(tmp);
}
pthread_mutex_unlock(&srv->task_lock);
return;
}
pthread_mutex_lock(&srv->task_lock);
if (srv->group_task_pool_size >= 64) {
pthread_mutex_unlock(&srv->task_lock);
free(task);
return;
}
task->next = srv->group_task_pool;
srv->group_task_pool = task;
++srv->group_task_pool_size;
pthread_mutex_unlock(&srv->task_lock);
}
static void perform_group_task(struct GroupTask* task) {
// the maximum number of bytes to review (each bit == 1 fd);
long fd_max = task->server->capacity;
// continue cycle
while (fd_max < task->pos) {
// the byte contains at least one bit marker for a task related fd
// is it okay to perform the task?
if (task->pos != server_uuid_to_fd(task->conn_origin) &&
check_if_connection_fits_service(task->server, task->pos,
task->service)) {
if (perform_single_task(task->server, task->pos, task->task, task->arg))
task->pos++;
goto rescedule;
} else // closed connection, ignore it and clear it's flag.
task->pos++;
}
// clear the task away...
if (task->on_finished) {
if (fd_task(task->server, (union fd_id)task->conn_origin, task->on_finished,
task->arg, task->on_finished))
task->on_finished(task->server, task->conn_origin, task->arg);
}
destroy_group_task(task->server, task);
return;
rescedule:
Async.run(task->server->async, (void (*)(void*)) & perform_group_task, task);
return;
}
/** Schedules a specific task to run asyncronously for each connection.
a NULL service identifier == all connections (all protocols). */
static int each(server_pt server,
union fd_id origin_fd,
char* service,
void (*task)(server_pt server, uint64_t target_fd, void* arg),
void* arg,
void (*on_finish)(server_pt server,
uint64_t origin_fd,
void* arg)) {
struct GroupTask* gtask = new_group_task(server);
if (!gtask || !task)
return -1;
gtask->arg = arg;
gtask->task = task;
gtask->server = server;
gtask->on_finished = on_finish;
gtask->conn_origin = origin_fd.uuid;
gtask->pos = 0;
gtask->service = service;
Async.run(server->async, (void (*)(void*)) & perform_group_task, gtask);
return 0;
}
/** Schedules a specific task to run for each connection. The tasks will be
* performed sequencially, in a blocking manner. The method will only return
* once all the tasks were completed. A NULL service identifier == all
* connections (all protocols).
*/
static int each_block(server_pt server,
union fd_id origin_fd,
char* service,
void (*task)(server_pt server, uint64_t fd, void* arg),
void* arg) {
int c = 0;
for (int i = 0; i < server->capacity; i++) {
if (i != origin_fd.data.fd &&
check_if_connection_fits_service(server, i, service)) {
task(server, i, arg);
++c;
}
}
return c;
}
/** Schedules a specific task to run asyncronously for a specific connection.
returns -1 on failure, 0 on success (success being scheduling or performing
the task).
*/
static int fd_task(server_pt server,
union fd_id conn,
void (*task)(server_pt server, uint64_t conn, void* arg),
void* arg,
void (*fallback)(server_pt server,
uint64_t conn,
void* arg)) {
if (validate_connection(server, conn)) {
struct FDTask* msg = new_fd_task(server);
if (!msg)
return -1;
*msg = (struct FDTask){.conn_id = conn,
.server = server,
.task = task,
.arg = arg,
.fallback = fallback};
Async.run(server->async, (void (*)(void*)) & perform_fd_task, msg);
return 0;
}
return -1;
}
/** Runs an asynchronous task, IF threading is enabled (set the
`threads` to 1 (the default) or greater).
If threading is disabled, the current thread will perform the task and return.
Returns -1 on error or 0
on succeess.
*/
static int run_async(server_pt self, void task(void*), void* arg) {
return Async.run(self->async, task, arg);
}
/** Creates a system timer (at the cost of 1 file descriptor) and pushes the
timer to the reactor. The task will NOT repeat. Returns -1 on error or the
new file descriptor on succeess.
NOTICE: Do NOT create timers from within the on_close callback, as this might
block resources from being properly freed (if the timer and the on_close
object share the same fd number).
*/
static int run_after(server_pt self,
long milliseconds,
void task(void*),
void* arg) {
return run_every(self, milliseconds, 1, task, arg);
}
/** Creates a system timer (at the cost of 1 file descriptor) and pushes the
timer to the reactor. The task will repeat `repetitions` times. if
`repetitions` is set to 0, task will repeat forever. Returns -1 on error
or the new file descriptor on succeess.
NOTICE: Do NOT create timers from within the on_close callback, as this might
block resources from being properly freed (if the timer and the on_close
object share the same fd number).
*/
static int run_every(server_pt self,
long milliseconds,
int repetitions,
void task(void*),
void* arg) {
int tfd = reactor_make_timer();
if (tfd <= 0)
return -1;
struct Protocol* timer_protocol =
(struct Protocol*)TimerProtocol(task, arg, repetitions);
if (srv_attach_core(self, tfd, timer_protocol, milliseconds)) {
free(timer_protocol);
return -1;
}
// remove the default timeout (timers shouldn't timeout)
_connection_(self, tfd).tout = 0;
return 0;
}
////////////////////////////////////////////////////////////////////////////////
// Server Registry
// (stopping and signal managment)
// types and global data
struct ServerSet {
struct ServerSet* next;
server_pt server;
};
static struct ServerSet* global_servers_set = NULL;
static pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER;
// register a server
static void register_server(server_pt srv) {
pthread_mutex_lock(&global_lock);
struct ServerSet* n_reg = malloc(sizeof(struct ServerSet));
n_reg->server = srv;
n_reg->next = global_servers_set;
global_servers_set = n_reg;
pthread_mutex_unlock(&global_lock);
}
// stop a server (+unregister)
static void srv_stop(server_pt srv) {
pthread_mutex_lock(&global_lock);
// remove from regisrty
struct ServerSet* set = global_servers_set;
struct ServerSet* tmp = global_servers_set;
if (global_servers_set && global_servers_set->server == srv) {
global_servers_set = global_servers_set->next;
free(tmp);
goto sig_srv;
} else
while (set) {
if (set->next && set->next->server == srv) {
tmp = set->next;
set->next = set->next->next;
free(tmp);
goto sig_srv;
}
set = set->next;
}
// the server wasn't in the registry - we shouldn't stop it again...
srv = NULL;
// send a signal to the server, if it was in the registry
sig_srv:
if (srv) {
// close the listening socket, preventing new connections from coming in.
reactor_add((struct Reactor*)srv, srv->srvfd);
// set the stop server flag.
srv->run = 0;
// signal the async object to finish
Async.signal(srv->async);
}
pthread_mutex_unlock(&global_lock);
}
// deregisters and stops all servers
static void srv_stop_all(void) {
while (global_servers_set)
srv_stop(global_servers_set->server);
}
// handles signals
static void on_signal(int sig) {
if (!global_servers_set) {
signal(sig, SIG_DFL);
raise(sig);
return;
}
fprintf(stderr, "(pid %d) Exit signal received.\n", getpid());
srv_stop_all();
}
////////////////////////////////////////////////////////////////////////////////
// socket helpers
// sets a socket to non blocking state
static inline int set_non_blocking_socket(int fd) // Thanks to Bjorn Reese
{
/* If they have O_NONBLOCK, use the Posix way to do it */
#if defined(O_NONBLOCK)
/* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
static int flags;
if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
flags = 0;
// printf("flags initial value was %d\n", flags);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#else
/* Otherwise, use the old way of doing it */
static int flags = 1;
return ioctl(fd, FIOBIO, &flags);
#endif
}
// helper functions used in the context of this file
static int bind_server_socket(server_pt self) {
int srvfd;
// setup the address
struct addrinfo hints;
struct addrinfo* servinfo; // will point to the results
memset(&hints, 0, sizeof hints); // make sure the struct is empty
hints.ai_family = AF_UNSPEC; // don't care IPv4 or IPv6
hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
hints.ai_flags = AI_PASSIVE; // fill in my IP for me
if (getaddrinfo(self->settings->address, self->settings->port, &hints,
&servinfo)) {
perror("addr err");
return -1;
}
// get the file descriptor
srvfd =
socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
if (srvfd <= 0) {
perror("socket err");
freeaddrinfo(servinfo);
return -1;
}
// make sure the socket is non-blocking
if (set_non_blocking_socket(srvfd) < 0) {
perror("couldn't set socket as non blocking! ");
freeaddrinfo(servinfo);
close(srvfd);
return -1;
}
// avoid the "address taken"
{
int optval = 1;
setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
// bind the address to the socket
{
int bound = 0;
for (struct addrinfo* p = servinfo; p != NULL; p = p->ai_next) {
if (!bind(srvfd, p->ai_addr, p->ai_addrlen))
bound = 1;
}
if (!bound) {
perror("bind err");
freeaddrinfo(servinfo);
close(srvfd);
return -1;
}
}
freeaddrinfo(servinfo);
// listen in
if (listen(srvfd, SOMAXCONN) < 0) {
perror("couldn't start listening");
close(srvfd);
return -1;
}
return srvfd;
}
////////////////
// file limit helper
static long srv_capacity(void) {
// get current limits
static long flim = 0;
if (flim)
return flim;
#ifdef _SC_OPEN_MAX
flim = sysconf(_SC_OPEN_MAX) - 1;
#elif defined(OPEN_MAX)
flim = OPEN_MAX - 1;
#endif
// try to maximize limits - collect max and set to max
struct rlimit rlim;
getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
// rlim.rlim_max);
rlim.rlim_cur = rlim.rlim_max;
setrlimit(RLIMIT_NOFILE, &rlim);
getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
// rlim.rlim_max);
// if the current limit is higher than it was, update
if (flim < rlim.rlim_cur)
flim = rlim.rlim_cur;
// review stack memory limits - don't use more than half...?
// use 1 byte (char) for busy_map;
// use 1 byte (char) for idle_map;
// use 1 byte (char) for tout_map;
// use 8 byte (void *) for protocol_map;
// use 8 byte (void *) for server_map;
// ------
// total of 19 (assume 20) bytes per connection.
//
// assume a 2Kb stack allocation (per connection) for
// network IO buffer and request management...?
// (this would be almost wrong to assume, as buffer might be larger)
// -------
// 2068 Byte
// round up to page size?
// 4096
//
getrlimit(RLIMIT_STACK, &rlim);
if (flim * 4096 > rlim.rlim_cur && flim * 4096 < rlim.rlim_max) {
rlim.rlim_cur = flim * 4096;
setrlimit(RLIMIT_STACK, &rlim);
getrlimit(RLIMIT_STACK, &rlim);
}
if (flim > rlim.rlim_cur / 4096) {
// printf("The current maximum file limit (%d) if reduced due to stack
// memory "
// "limits(%d)\n",
// flim, (int)(rlim.rlim_cur / 2068));
flim = rlim.rlim_cur / 4096;
} else {
// printf(
// "The current maximum file limit (%d) is supported by the stack
// memory
// "
// "limits(%d)\n",
// flim, (int)(rlim.rlim_cur / 2068));
}
// how many Kb per connection? assume 8Kb for kernel? x2 (16Kb).
// (http://www.metabrew.com/article/a-million-user-comet-application-with-mochiweb-part-3)
// 10,000 connections == 16*1024*10000 == +- 160Mb? seems a tight fit...
// i.e. the Http request buffer is 8Kb... maybe 24Kb is a better minimum?
// Some per connection heap allocated data (i.e. 88 bytes per user-land
// buffer) also matters.
getrlimit(RLIMIT_DATA, &rlim);
if (flim > rlim.rlim_cur / (24 * 1024)) {
printf(
"The current maximum file limit (%ld) if reduced due to system "
"memory "
"limits(%ld)\n",
flim, (long)(rlim.rlim_cur / (24 * 1024)));
flim = rlim.rlim_cur / (24 * 1024);
} else {
// printf(
// "The current maximum file limit (%d) is supported by the stack
// memory
// "
// "limits(%d)\n",
// flim, (int)(rlim.rlim_cur / 2068));
}
// return what we have
return flim;
}