blob: 70944cf773cfea4a71ba8cb0d41e29d317d3ab9e [file] [log] [blame] [raw]
/*
copyright: Boaz segev, 2015
license: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#include "lib-server.h"
// sys includes
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netdb.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#include <errno.h>
////////////////////////////////////////////////////////////////////////////////
// helper predefinitions
// signal management
static void register_server(struct Server* server);
static void on_signal(int sig);
static void stop_one(struct Server* server);
// socket binding and server limits
static int bind_server_socket(struct Server*);
static int set_non_blocking_socket(int fd);
static long calculate_file_limit(void);
// async data sending buffer and helpers
struct Buffer {
struct Buffer* next;
int fd;
void* data;
int len;
int sent;
unsigned notification : 1;
unsigned moved : 1;
unsigned final : 1;
unsigned urgent : 1;
unsigned destroy : 4;
unsigned rsv : 3;
};
////////////////////////////////////////////////////////////////////////////////
// The server data object container
struct Server {
// inherit the reactor (must be first in the struct, for pointer conformity).
struct ReactorSettings reactor;
// a pointer to the server's settings object.
struct ServerSettings* settings;
// a pointer to the thread pool object (libasync).
struct Async* async;
// maps each connection to it's protocol.
struct Protocol** protocol_map;
// Used to send the server data + sockfd number to any worker threads.
// pointer offset calculations are used to calculate the sockfd.
struct Server** server_map;
// maps each udata with it's associated connection fd.
void** udata_map;
/// maps a connection's "busy" flag, preventing the same connection from
/// running `on_data` on two threads. use busy[fd] to get the status of the
/// flag.
char* busy;
/// maps all connection's timeout values. use tout[fd] to get/set the
/// timeout.
unsigned char* tout;
/// maps all connection's idle cycle count values. use idle[fd] to get/set
/// the count.
unsigned char* idle;
// a buffer map.
void** buffer_map;
// old Signal handlers
void (*old_sigint)(int);
void (*old_sigterm)(int);
// socket capacity
long capacity;
/// the last timeout review
time_t last_to;
/// the server socket
int srvfd;
/// the original process pid
pid_t root_pid;
};
////////////////////////////////////////////////////////////////////////////////
// timer helpers
static char* timer_protocol_name = "timer";
// a timer protocol, will be allocated for each timer.
struct TimerProtocol {
// must be first for pointer compatability
struct Protocol parent;
void (*task)(void*);
void* arg;
// how many repeats?
int repeat;
};
// the timer's on_data callback
static void tp_perform_on_data(struct Server* self, int fd) {
struct TimerProtocol* timer =
(struct TimerProtocol*)Server.get_protocol(self, fd);
if (timer) {
// set local data copy, to avoid race contitions related to `free`.
void (*task)(void*) = (void (*)(void*))timer->task;
void* arg = timer->arg;
// perform the task
if (task)
task(arg);
// on epoll we need to reset the timer
self->reactor.reset_timer(&self->reactor, fd);
// close the file descriptor
if (timer->repeat < 0)
return;
if (timer->repeat == 0)
close(fd);
timer->repeat--;
}
}
// the timer's on_close (cleanup)
static void tp_perform_on_close(struct Server* self, int fd) {
// free the protocol object when it was "aborted" using `close`.
struct TimerProtocol* timer =
(struct TimerProtocol*)Server.get_protocol(self, fd);
if (timer)
free(timer);
}
// creates a new TimeProtocol object.
static struct TimerProtocol* TimerProtocol(void* task,
void* arg,
int repetitions) {
struct TimerProtocol* tp = malloc(sizeof(struct TimerProtocol));
*tp = (struct TimerProtocol){.parent.on_data = tp_perform_on_data,
.parent.on_close = tp_perform_on_close,
.parent.service = timer_protocol_name,
.task = task,
.arg = arg,
.repeat = repetitions - 1};
return tp;
}
////////////////////////////////////////////////////////////////////////////////
// The busy flag is used to make sure that a single connection doesn't perform
// two "critical" tasks at the same time. Critical tasks are defined as the
// `on_message` callback and any user task requested by `each` or `fd_task`.
static int set_to_busy(struct Server* server, int sockfd) {
static pthread_mutex_t locker = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&locker);
if (server->busy[sockfd] == 1) {
pthread_mutex_unlock(&locker);
return 0;
}
server->busy[sockfd] = 1;
pthread_mutex_unlock(&locker);
return 1;
}
////////////////////////////////////////////////////////////////////////////////
// The Server's callbacks
// use pointer inheritance casting to simplify access to the server object.
#define _server_(reactor) ((struct Server*)(reactor))
// a short-cut for grabbing the connection's protocol object.
#define _protocol_(reactor, fd) (_server_(reactor)->protocol_map[(fd)])
static void manage_timeout(struct ReactorSettings* reactor) {
static time_t last_to = 0;
static char working = 0;
if (working)
return;
if (last_to != reactor->last_tick) {
working = 1;
// ?? ??
// // ignore delta, add 1 to timer (even if delta is bigger) and add 1
// second
// // to timeout.
// // timeout isn't exact, but should also avoid killing a new connection
// // when timeout was set to 0
// ?? ??
// // or use delta...
int delta = reactor->last_tick - last_to;
for (long i = 3; i < _server_(reactor)->capacity; i++) {
if (_server_(reactor)->tout[i]) {
if (_server_(reactor)->tout[i] > _server_(reactor)->idle[i])
_server_(reactor)->idle[i] += _server_(reactor)->idle[i] ? delta : 1;
else {
if (_server_(reactor)->protocol_map[i] &&
_server_(reactor)->protocol_map[i]->ping)
_server_(reactor)->protocol_map[i]->ping(_server_(reactor), i);
else if (!_server_(reactor)->busy[i] ||
_server_(reactor)->idle[i] == 255)
close(i);
}
}
}
// ready for next call.
last_to = reactor->last_tick;
working = 0;
// double en = (float)clock()/CLOCKS_PER_SEC;
// printf("timeout review took %f milliseconds\n", (en-st)*1000);
}
}
/// called for any open file descriptors when the reactor is shutting down.
static void on_shutdown(struct ReactorSettings* reactor, int fd) {
// call the callback for the mentioned active(?) connection.
if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_shutdown)
_protocol_(reactor, fd)->on_shutdown(_server_(reactor), fd);
}
// called when a file descriptor was closed (either locally or by the other
// party, when it's a socket or a pipe).
static void on_close(struct ReactorSettings* reactor, int fd) {
// file descriptors can be added from external threads (i.e. creating timers),
// this could cause race conditions and data corruption,
// (i.e., when on_close is releasing memory).
static pthread_mutex_t locker = PTHREAD_MUTEX_INITIALIZER;
int lck = 0;
if ((lck = pthread_mutex_trylock(&locker))) {
if (lck != EAGAIN)
return;
pthread_mutex_lock(&locker);
}
if (_protocol_(reactor, fd)) {
if (_protocol_(reactor, fd)->on_close)
_protocol_(reactor, fd)->on_close(_server_(reactor), fd);
_protocol_(reactor, fd) = 0;
_server_(reactor)->tout[fd] = 0;
// we keep the buffer on standby for the nex connection...
if (_server_(reactor)->buffer_map[fd])
Buffer.clear(_server_(reactor)->buffer_map[fd]);
// _server_(reactor)->buffer_map[fd] = 0;
}
pthread_mutex_unlock(&locker);
}
static void async_on_data(struct Server** p_server) {
// compute sockfd by comparing the distance between the original pointer and
// the passed pointer's address.
if (!(*p_server))
return;
int sockfd = p_server - (*p_server)->server_map;
// printf("threaded on data for %d\n", sockfd);
struct Protocol* protocol = (*p_server)->protocol_map[sockfd];
if (!protocol || !protocol->on_data)
return;
// if we get the handle, perform the task
if (set_to_busy(*p_server, sockfd)) {
protocol->on_data((*p_server), sockfd);
// release the handle
(*p_server)->busy[sockfd] = 0;
return;
}
// we didn't get the handle, reschedule.
Async.run((*p_server)->async, (void (*)(void*))async_on_data, p_server);
}
// The heavy(!) on_data, handles accept and forwards events
static void on_data(struct ReactorSettings* reactor, int fd) {
static socklen_t cl_addrlen = 0;
if (fd == reactor->first) { // listening socket. accept connections.
int client = 1;
while (1) {
#ifdef SOCK_NONBLOCK
client = accept4(fd, NULL, &cl_addrlen, SOCK_NONBLOCK);
if (client <= 0)
return;
// perror("accept 4 called");
#else
client = accept(fd, NULL, &cl_addrlen);
if (client <= 0)
return;
set_non_blocking_socket(client);
// perror("accept reg called");
#endif
// handle server overload
if (client >= reactor->last - 1) {
if (_server_(reactor)->settings->busy_msg)
write(client, _server_(reactor)->settings->busy_msg,
strlen(_server_(reactor)->settings->busy_msg));
close(client);
continue;
}
// close the prior protocol stream, if needed
on_close(reactor, client);
// setup protocol
_protocol_(reactor, client) = _server_(reactor)->settings->protocol;
// setup timeouts
_server_(reactor)->tout[client] = _server_(reactor)->settings->timeout;
_server_(reactor)->idle[client] = 0;
// call on_open
// register the client - start it off as busy, protocol still initializing
// we don't need the mutex, because it's all fresh
_server_(reactor)->busy[client] = 1;
if (_protocol_(reactor, client)->on_open)
_protocol_(reactor, client)->on_open(_server_(reactor), client);
reactor->add(reactor, client);
_server_(reactor)->busy[client] = 0;
}
} else if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_data) {
_server_(reactor)->idle[fd] = 0;
// clients, forward on.
if (_server_(reactor)->async) { // perform multi-thread
Async.run(_server_(reactor)->async, (void (*)(void*))async_on_data,
&(_server_(reactor)->server_map[fd]));
} else { // perform single thread
_protocol_(reactor, fd)->on_data(_server_(reactor), fd);
}
}
}
// on_ready, handles async buffer sends
static void on_ready(struct ReactorSettings* reactor, int fd) {
if (_server_(reactor)->buffer_map[fd]) {
if (Buffer.flush(_server_(reactor)->buffer_map[fd], fd) > 0)
_server_(reactor)->idle[fd] = 0;
}
if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_ready)
_protocol_(reactor, fd)->on_ready(_server_(reactor), fd);
}
// global callbacks
// called after the event loop was created but before any cycling takes
// place, allowing for further initialization (such as adding a server
// socket, setting up timers, etc').
static void on_init(struct ReactorSettings* reactor) {
if (_server_(reactor)->settings->on_init)
_server_(reactor)->settings->on_init(_server_(reactor));
}
// called whenever an event loop had cycled (a "tick").
static void on_tick(struct ReactorSettings* reactor) {
if (_server_(reactor)->settings->on_tick)
_server_(reactor)->settings->on_tick(_server_(reactor));
manage_timeout(reactor);
}
// called if an event loop cycled with no pending events.
static void on_idle(struct ReactorSettings* reactor) {
// // Shrink buffers? - No... if they don't auto-shrink, wtf?
// for (size_t i = 0; i <= reactor->last; i++) {
// if (_server_(reactor)->buffer_map[i])
// Buffer.shrink(_server_(reactor)->buffer_map[i]);
// }
if (_server_(reactor)->settings->on_idle)
_server_(reactor)->settings->on_idle(_server_(reactor));
}
// called when a new thread is spawned
void on_init_thread(async_p async, void* server) {
if (((struct Server*)server)->settings->on_init_thread)
((struct Server*)server)->settings->on_init_thread((struct Server*)server);
}
////////////////////////////////////////////////////////////////////////////////
// The Server's main function
// this is the server's main action...
static int server_listen(struct ServerSettings settings) {
// review the settings, setup defaults when something's missing
if (!settings.protocol) {
fprintf(stderr,
"Server err: (protocol == NULL) Running a server requires "
"a protocol for new connections.\n");
return -1;
}
if (!settings.port)
settings.port = "3000";
if (!settings.timeout)
settings.timeout = 5;
if (!settings.threads)
settings.threads = 1;
if (!settings.processes)
settings.processes = 1;
// We'll use the stack for the server's core memory.
void* udata_map[calculate_file_limit()];
struct Protocol* protocol_map[calculate_file_limit()];
struct Server* server_map[calculate_file_limit()];
void* buffer_map[calculate_file_limit()];
char busy[calculate_file_limit()];
unsigned char tout[calculate_file_limit()];
unsigned char idle[calculate_file_limit()];
// populate the Server structure with the data
struct Server server = {
.settings = &settings,
.protocol_map = protocol_map,
.udata_map = udata_map,
.server_map = server_map,
.buffer_map = buffer_map,
.busy = busy,
.tout = tout,
.idle = idle,
.capacity = calculate_file_limit(),
.reactor.last = calculate_file_limit() - 1,
.reactor.on_tick = on_tick,
.reactor.on_idle = on_idle,
.reactor.on_init = on_init,
.reactor.on_data = on_data,
.reactor.on_ready = on_ready,
.reactor.on_shutdown = on_shutdown,
.reactor.on_close = on_close,
};
server.reactor.udata = &server;
// bind the server's socket
int srvfd = bind_server_socket(&server);
server.srvfd = srvfd;
// tell the reactor about the "first" socket it should react to.
server.reactor.first = srvfd;
// if we didn't get a socket, quit now.
if (srvfd < 0)
return -1;
// zero out data...
for (int i = 0; i < calculate_file_limit(); i++) {
protocol_map[i] = 0;
server_map[i] = &server;
busy[i] = 0;
tout[i] = 0;
idle[i] = 0;
udata_map[i] = 0;
// buffer_map[i] = 0;
buffer_map[i] = Buffer.new(0);
}
// setup concurrency
server.root_pid = getpid();
pid_t pids[settings.processes > 0 ? settings.processes : 0];
if (settings.processes > 1) {
pids[0] = 0;
for (int i = 1; i < settings.processes; i++) {
if (getpid() == server.root_pid)
pids[i] = fork();
}
}
if (settings.threads > 0) {
server.async = Async.new(settings.threads, on_init_thread, &server);
if (server.async < 0) {
close(srvfd);
return -1;
}
}
// register signals
server.old_sigint = signal(SIGINT, on_signal);
server.old_sigterm = signal(SIGTERM, on_signal);
// register server for signals
register_server(&server);
// let'm know...
printf("(%d) Starting to listen on port %s (max sockets: %lu, ~%d used)\n",
getpid(), settings.port, server.capacity,
(server.async ? server.srvfd + 2 : server.srvfd));
// bind server data to reactor loop
reactor_start(&server.reactor);
// cleanup
if (settings.processes > 1 && getpid() == server.root_pid) {
int sts;
for (int i = 1; i < settings.processes; i++) {
// printf("sending signal to pid %d\n", pids[i]);
kill(pids[i], SIGINT);
}
for (int i = 1; i < settings.processes; i++) {
sts = 0;
// printf("waiting for pid %d\n", pids[i]);
if (waitpid(pids[i], &sts, 0) != pids[i])
perror("waiting for child process had failed");
}
}
close(srvfd); // already performed by the reactor...
if (server.async)
Async.finish(server.async);
if (settings.on_finish)
settings.on_finish(&server);
printf("\n(%d) Finished listening on port %s\n", getpid(), settings.port);
if (getpid() != server.root_pid) {
fflush(NULL);
exit(0);
}
signal(SIGINT, server.old_sigint);
signal(SIGTERM, server.old_sigterm);
// remove server from registry, it it's still there...
stop_one(&server);
for (int i = 0; i < calculate_file_limit(); i++) {
Buffer.destroy(buffer_map[i]);
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////
// The Server API (helper methods and API container)
////////////////////////////////////////////////////////////////////////////////
// Connection data and protocol management
static int server_attach(struct Server* server,
int fd,
struct Protocol* protocol) {
if (server->capacity <= fd)
return -1;
on_close(&server->reactor, fd);
server->protocol_map[fd] = protocol;
server->reactor.add(&server->reactor, fd);
return 0;
}
static int server_hijack(struct Server* server, int sockfd) {
if (server->buffer_map[sockfd]) {
// make sure to finish sending all the data, as no more `on_ready` events
// will occur
while (!Buffer.empty(server->buffer_map[sockfd]) &&
Buffer.flush(server->buffer_map[sockfd], sockfd) >= 0)
;
Buffer.clear(server->buffer_map[sockfd]);
}
server->protocol_map[sockfd] = NULL;
server->tout[sockfd] = 0;
return server->reactor.hijack(&server->reactor, sockfd);
}
// get protocol for connection
static struct Protocol* get_protocol(struct Server* server, int sockfd) {
return server->protocol_map[sockfd];
}
// set protocol for connection
static void set_protocol(struct Server* server,
int sockfd,
struct Protocol* new_protocol) {
server->protocol_map[sockfd] = new_protocol;
}
// get protocol for connection
static void* get_udata(struct Server* server, int sockfd) {
return server->udata_map[sockfd];
}
// set protocol for connection
static void* set_udata(struct Server* server, int sockfd, void* udata) {
void* old = server->udata_map[sockfd];
server->udata_map[sockfd] = udata;
return old;
}
// count existing connections for a specific protocol (NULL is all)
static long count(struct Server* server, char* service) {
int c = 0;
for (long i = 0; i < server->capacity; i++) {
if (server->protocol_map[i] // there is a protocol
&& ( // one of the following must apply
!service // there's no service name required
|| // or
(server->protocol_map[i]->service && // there is a name that
!strcmp(service, server->protocol_map[i]->service)) // matches
))
c++;
}
return c;
}
// reset idle counter (timeout monitor) for a specific connection
// This is unprotected, as it seems that data curruption isn't super important
// for this one
static void touch(struct Server* self, int sockfd) {
self->idle[sockfd] = 0;
}
// sets the timeout limit for the specified connectionl, in seconds.
static void set_timeout(struct Server* server,
int sockfd,
unsigned char timeout) {
server->tout[sockfd] = timeout;
}
// returns true if the connection is performing a critical task.
static unsigned char is_busy(struct Server* self, int sockfd) {
return self->busy[sockfd];
}
// return a server's reactor
static struct ReactorSettings* reactor(struct Server* server) {
return &server->reactor;
}
// return the settings used to initiate the server
static struct ServerSettings* server_settings(struct Server* server) {
return server->settings;
}
// connects an existing connection (fd) to the Server's callback system.
static int server_connect(struct Server* self,
int fd,
struct Protocol* protocol) {
// if the connection is already owned by the server - ignore.
if (self->protocol_map[fd] == protocol)
return 0;
// make sure the fd recycled is clean
on_close(&self->reactor, fd);
// set protocol for new fd
self->protocol_map[fd] = protocol;
// add the fd to the reactor
if (self->reactor.add(&self->reactor, fd) < 0)
return -1;
// remember to call on_open
if (protocol->on_open)
protocol->on_open(self, fd);
return 0;
}
////////////////////////////////////////////////////////////////////////////////
// Connection tasks (implementing `each`)
// perform a blocking task on each connection
static long each_block(struct Server* server,
char* service,
void (*task)(struct Server* server, int fd, void* arg),
void* arg) {
int c = 0;
for (long i = 0; i < server->capacity; i++) {
if (server->protocol_map[i] // there is a protocol
&& ( // one of the following must apply
!service // there's no service name required
|| // or
(server->protocol_map[i]->service && // there is a name that
!strcmp(service, server->protocol_map[i]->service)) // matches
)) {
c++;
task(server, i, arg);
}
}
return c;
}
// perform a task on each existing connection for a specific protocol
// the data-type for async messages
struct ConnTask {
struct Server* server;
int fd;
void (*task)(struct Server* server, int fd, void* arg);
void* arg;
};
// the async handler
static void perform_each_task(struct ConnTask* task) {
// is it okay to perform the task?
if (set_to_busy(task->server, task->fd)) {
// perform the task
task->task(task->server, task->fd, task->arg);
// release the busy flag
task->server->busy[task->fd] = 0;
// free the memory
free(task);
return;
}
// reschedule
Async.run(task->server->async, (void (*)(void*))perform_each_task, task);
return;
}
// schedules a task for async performance
void make_a_task_async(struct Server* server, int fd, void* arg) {
if (server->async) {
struct ConnTask* task = malloc(sizeof(struct ConnTask));
if (!task)
return;
memcpy(task, arg, sizeof(struct ConnTask));
task->fd = fd;
Async.run(server->async, (void (*)(void*))perform_each_task, task);
} else {
struct ConnTask* task = arg;
task->task(server, fd, task->arg);
}
}
// the message scheduler (async) / performer (single-thread)
static long each(struct Server* server,
char* service,
void (*task)(struct Server* server, int fd, void* arg),
void* arg) {
struct ConnTask msg = {
.server = server, .task = task, .arg = arg,
};
return each_block(server, service, make_a_task_async, &msg);
}
static int fd_task(struct Server* server,
int sockfd,
void (*task)(struct Server* server, int fd, void* arg),
void* arg) {
if (server->protocol_map[sockfd]) {
struct ConnTask msg = {
.server = server, .task = task, .arg = arg,
};
make_a_task_async(server, sockfd, &msg);
return 0;
}
return -1;
}
////////////////////////////////////////////////////////////////////////////////
// Async tasks and timers helpers
// run a task asynchronously
static int run_async(struct Server* self, void (*task)(void*), void* arg) {
return Async.run(self->async, task, arg);
}
// run a timer, one single time
static int run_every(struct Server* self,
long milliseconds,
int repetitions,
void (*task)(void*),
void* arg) {
int tfd = self->reactor.open_timer_fd();
if (tfd <= 0)
return -1;
// make sure the fd recycled is clean
on_close(&self->reactor, tfd);
// set protocol for new fd (timer protocol)
self->protocol_map[tfd] =
(struct Protocol*)TimerProtocol(task, arg, repetitions);
if (self->reactor.add_as_timer(&self->reactor, tfd, milliseconds) < 0) {
close(tfd);
// on_close will be called by the server to free the resources - don't race
return -1;
};
return 0;
}
// run a timer, one single time
static int run_after(struct Server* self,
long milliseconds,
void (*task)(void*),
void* arg) {
return run_every(self, milliseconds, 1, task, arg);
}
////////////////////////////////////////////////////////////////////////////////
// socket read/write helpers
// reads data and closes socket on error
static ssize_t read_data(int fd, void* buffer, size_t max_len) {
// reads up to `max_len` of data from a socket. the data is storen in the
// `buffer` and the number of bytes received is returned.
// Returns 0 if no data was available. Returns -1 if an error was raised and
// the connection should be closed.
ssize_t read = 0;
if ((read = recv(fd, buffer, max_len, 0)) > 0) {
return read;
} else {
if (read && (errno & (EWOULDBLOCK | EAGAIN)))
return 0;
}
close(fd);
return -1;
}
// sends data to the socket, managing an async-write buffer if needed
static ssize_t buffer_send(struct Server* server,
int sockfd,
void* data,
size_t len,
char move,
char urgent) {
// reset timeout
server->idle[sockfd] = 0;
// // try to avoid the buffer if we can... - is this safe (too many
// assumptions)?
// ssize_t snt = -1;
// if (!server->buffer_map[sockfd]) {
// ssize_t snt = send(sockfd, data, len, 0);
// // sending failed with a socket error
// if (snt < 0 && !(errno & (EWOULDBLOCK | EAGAIN))) {
// if (move && data)
// free(data);
// close(sockfd);
// return -1;
// }
// // no need for a buffer.
// if (snt == len) {
// if (move && data)
// free(data);
// return 0;
// }
// server->buffer_map[sockfd] = Buffer.new(snt > 0 ? snt : 0);
// if (!server->buffer_map[sockfd]) {
// fprintf(stderr,
// "Couldn't initiate a buffer object for conection no. %d\n",
// sockfd);
// if (move && data)
// free(data);
// return snt;
// }
// }
// // creating a buffer now might expose us to race conditions
// if (!server->buffer_map[sockfd]) server->buffer_map[sockfd] =
// Buffer.new(0);
if ((move ? (urgent ? Buffer.write_move_next : Buffer.write_move)
: (urgent ? Buffer.write_next : Buffer.write))(
server->buffer_map[sockfd], data, len) == len) {
Buffer.flush(server->buffer_map[sockfd], sockfd);
return 0;
}
fprintf(stderr, "couldn't write to the buffer on address %p...\n",
server->buffer_map[sockfd]);
return -1;
}
static ssize_t buffer_write(struct Server* server,
int sockfd,
void* data,
size_t len) {
return buffer_send(server, sockfd, data, len, 0, 0);
}
static ssize_t buffer_write_urgent(struct Server* server,
int sockfd,
void* data,
size_t len) {
return buffer_send(server, sockfd, data, len, 0, 1);
}
static ssize_t buffer_move(struct Server* server,
int sockfd,
void* data,
size_t len) {
return buffer_send(server, sockfd, data, len, 1, 0);
}
static ssize_t buffer_write_urgent_move(struct Server* server,
int sockfd,
void* data,
size_t len) {
return buffer_send(server, sockfd, data, len, 1, 1);
}
static int buffer_sendfile(struct Server* server, int sockfd, FILE* file) {
return Buffer.sendfile(server->buffer_map[sockfd], file);
}
static void buffer_close(struct Server* server, int sockfd) {
server->buffer_map[sockfd]
? Buffer.close_when_done(server->buffer_map[sockfd], sockfd)
: close(sockfd);
}
////////////////////////////////////////////////////////////////////////////////
// stopping and signal managment
// types and global data
struct ServerSet {
struct ServerSet* next;
struct Server* server;
};
static struct ServerSet* global_servers_set = NULL;
static pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER;
// registers servers
static void register_server(struct Server* server) {
pthread_mutex_lock(&global_lock);
struct ServerSet** set = &global_servers_set;
while (*set)
set = &((*set)->next);
*set = malloc(sizeof(struct ServerSet));
if (!set) {
pthread_mutex_unlock(&global_lock);
raise(SIGSEGV);
}
(*set)->next = 0;
(*set)->server = server;
pthread_mutex_unlock(&global_lock);
}
// handles signals
static void on_signal(int sig) {
struct ServerSet* set = global_servers_set;
if (!set) {
signal(sig, SIG_DFL);
pthread_mutex_unlock(&global_lock);
raise(sig);
return;
}
struct ServerSet* prev = NULL;
while (set) {
set->server->reactor.stop(&(set->server->reactor));
prev = set;
set = set->next;
free(prev);
}
global_servers_set = NULL;
}
// stops a specific server
static void stop_one(struct Server* server) {
server->reactor.stop((struct ReactorSettings*)server);
pthread_mutex_lock(&global_lock);
struct ServerSet* set = global_servers_set;
struct ServerSet* prev = NULL;
while (set && set->server != server) {
prev = set;
set = set->next;
}
if (set) {
if (prev)
prev->next = set->next;
if (set == global_servers_set)
global_servers_set = set->next;
free(set);
}
pthread_mutex_unlock(&global_lock);
}
// stops all the servers.
static void stop_all(void) {
pthread_mutex_lock(&global_lock);
struct ServerSet* set = global_servers_set;
if (!set) {
pthread_mutex_unlock(&global_lock);
return;
}
while (set) {
set->server->reactor.stop(&(set->server->reactor));
global_servers_set = set;
set = set->next;
free(global_servers_set);
}
global_servers_set = NULL;
pthread_mutex_unlock(&global_lock);
}
// returns the server's root pid
pid_t root_pid(struct Server* server) {
return server->root_pid;
}
////////////////////////////////////////////////////////////////////////////////
// The actual Server API access setup
// The following allows access to helper functions and defines a namespace
// for the API in this library.
const struct ServerClass Server = {
.capacity = calculate_file_limit,
.listen = server_listen,
.stop = stop_one,
.stop_all = stop_all,
.touch = touch,
.set_timeout = set_timeout,
.connect = server_connect,
.is_busy = is_busy,
.reactor = reactor,
.settings = server_settings,
.get_protocol = get_protocol,
.set_protocol = set_protocol,
.get_udata = get_udata,
.set_udata = set_udata,
.run_async = run_async,
.run_after = run_after,
.run_every = run_every,
.read = read_data,
.write = buffer_write,
.write_move = buffer_move,
.write_urgent = buffer_write_urgent,
.write_move_urgent = buffer_write_urgent_move,
.sendfile = buffer_sendfile,
.close = buffer_close,
.hijack = server_hijack,
.attach = server_connect,
.count = count,
.each = each,
.each_block = each_block,
.fd_task = fd_task,
.root_pid = root_pid,
};
////////////////////////////////////////////////////////////////////////////////
// socket helpers
// sets a socket to non blocking state
static inline int set_non_blocking_socket(int fd) // Thanks to Bjorn Reese
{
/* If they have O_NONBLOCK, use the Posix way to do it */
#if defined(O_NONBLOCK)
/* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
static int flags;
if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
flags = 0;
// printf("flags initial value was %d\n", flags);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#else
/* Otherwise, use the old way of doing it */
static int flags = 1;
return ioctl(fd, FIOBIO, &flags);
#endif
}
// helper functions used in the context of this file
static int bind_server_socket(struct Server* self) {
int srvfd;
// setup the address
struct addrinfo hints;
struct addrinfo* servinfo; // will point to the results
memset(&hints, 0, sizeof hints); // make sure the struct is empty
hints.ai_family = AF_UNSPEC; // don't care IPv4 or IPv6
hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
hints.ai_flags = AI_PASSIVE; // fill in my IP for me
if (getaddrinfo(self->settings->address, self->settings->port, &hints,
&servinfo)) {
perror("addr err");
return -1;
}
// get the file descriptor
srvfd =
socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
if (srvfd <= 0) {
perror("socket err");
freeaddrinfo(servinfo);
return -1;
}
// make sure the socket is non-blocking
if (set_non_blocking_socket(srvfd) < 0) {
perror("couldn't set socket as non blocking! ");
freeaddrinfo(servinfo);
close(srvfd);
return -1;
}
// avoid the "address taken"
{
int optval = 1;
setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
// bind the address to the socket
{
int bound = 0;
for (struct addrinfo* p = servinfo; p != NULL; p = p->ai_next) {
if (!bind(srvfd, p->ai_addr, p->ai_addrlen))
bound = 1;
}
if (!bound) {
perror("bind err");
freeaddrinfo(servinfo);
close(srvfd);
return -1;
}
}
freeaddrinfo(servinfo);
// listen in
if (listen(srvfd, SOMAXCONN) < 0) {
perror("couldn't start listening");
close(srvfd);
return -1;
}
return srvfd;
}
////////////////
// file limit helper
static long calculate_file_limit(void) {
// get current limits
static long flim = 0;
if (flim)
return flim;
#ifdef _SC_OPEN_MAX
flim = sysconf(_SC_OPEN_MAX) - 1;
#elif defined(OPEN_MAX)
flim = OPEN_MAX - 1;
#endif
// try to maximize limits - collect max and set to max
struct rlimit rlim;
getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
// rlim.rlim_max);
rlim.rlim_cur = rlim.rlim_max;
setrlimit(RLIMIT_NOFILE, &rlim);
getrlimit(RLIMIT_NOFILE, &rlim);
// printf("Meximum open files are %llu out of %llu\n", rlim.rlim_cur,
// rlim.rlim_max);
// if the current limit is higher than it was, update
if (flim < rlim.rlim_cur)
flim = rlim.rlim_cur;
// review stack memory limits - don't use more than half...?
// use 1 byte (char) for busy_map;
// use 1 byte (char) for idle_map;
// use 1 byte (char) for tout_map;
// use 8 byte (void *) for protocol_map;
// use 8 byte (void *) for server_map;
// ------
// total of 19 (assume 20) bytes per connection.
//
// assume a 2Kb stack allocation (per connection) for
// network IO buffer and request management...?
// (this would be almost wrong to assume, as buffer might be larger)
// -------
// 2068 Byte
// round up to page size?
// 4096
//
getrlimit(RLIMIT_STACK, &rlim);
if (flim * 4096 > rlim.rlim_cur && flim * 4096 < rlim.rlim_max) {
rlim.rlim_cur = flim * 4096;
setrlimit(RLIMIT_STACK, &rlim);
getrlimit(RLIMIT_STACK, &rlim);
}
if (flim > rlim.rlim_cur / 4096) {
// printf("The current maximum file limit (%d) if reduced due to stack
// memory "
// "limits(%d)\n",
// flim, (int)(rlim.rlim_cur / 2068));
flim = rlim.rlim_cur / 4096;
} else {
// printf(
// "The current maximum file limit (%d) is supported by the stack
// memory
// "
// "limits(%d)\n",
// flim, (int)(rlim.rlim_cur / 2068));
}
// how many Kb per connection? assume 8Kb for kernel? x2 (16Kb).
// (http://www.metabrew.com/article/a-million-user-comet-application-with-mochiweb-part-3)
// 10,000 connections == 16*1024*10000 == +- 160Mb? seems a tight fit...
// i.e. the Http request buffer is 8Kb... maybe 24Kb is a better minimum?
// Some per connection heap allocated data (i.e. 88 bytes per user-land
// buffer) also matters.
getrlimit(RLIMIT_DATA, &rlim);
if (flim > rlim.rlim_cur / (24 * 1024)) {
printf(
"The current maximum file limit (%ld) if reduced due to system "
"memory "
"limits(%ld)\n",
flim, (long)(rlim.rlim_cur / (24 * 1024)));
flim = rlim.rlim_cur / (24 * 1024);
} else {
// printf(
// "The current maximum file limit (%d) is supported by the stack
// memory
// "
// "limits(%d)\n",
// flim, (int)(rlim.rlim_cur / 2068));
}
// return what we have
return flim;
}