blob: 679a806c09ccdba588318509557c4a6a3638fdc1 [file] [log] [blame] [raw]
/*
copyright: Boaz segev, 2015
license: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#include "libasync.h"
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#include <execinfo.h>
#include <fcntl.h>
////////////////////
// types used
struct Async {
/// The read only part of the pipe used to push tasks.
int in;
/// The write only part of the pipe used to push tasks.
int out;
/// the number of threads in the array.
int count;
/// a callback used whenever a new thread a spawned.
void (*init_thread)(struct Async*, void*);
/// a pointer for the callback.
void* arg;
/// an object mutex (locker).
pthread_mutex_t locker;
/// an array to `pthread_t` objects `count` long.
pthread_t thread_pool[];
};
// A task structure.
struct Task {
void (*task)(void*);
void* arg;
};
/////////////////////
// the thread loop functions
// manage thread error signals
static void on_signal(int sig) {
void* array[22];
size_t size;
char** strings;
size_t i;
size = backtrace(array, 22);
strings = backtrace_symbols(array, size);
perror("\nERROR");
fprintf(
stderr,
"Async: task in thread pool raised an error (%d-%s). Backtrace (%zd):\n",
errno, sig == SIGSEGV
? "SIGSEGV"
: sig == SIGFPE ? "SIGFPE" : sig == SIGILL
? "SIGILL"
: sig == SIGPIPE ? "SIGPIPE"
: "unknown",
size);
for (i = 2; i < size; i++)
fprintf(stderr, "%s\n", strings[i]);
free(strings);
fprintf(stderr, "\n");
// pthread_exit(0); // for testing
pthread_exit((void*)on_signal);
}
// the main thread loop function
static void* thread_loop(struct Async* async) {
struct Task task = {};
signal(SIGSEGV, on_signal);
signal(SIGFPE, on_signal);
signal(SIGILL, on_signal);
signal(SIGPIPE, SIG_IGN);
#ifdef SIGBUS
signal(SIGBUS, on_signal);
#endif
#ifdef SIGSYS
signal(SIGSYS, on_signal);
#endif
#ifdef SIGXFSZ
signal(SIGXFSZ, on_signal);
#endif
if (async->init_thread)
async->init_thread(async, async->arg);
int in = async->in; // keep a copy of the pipe's address on the stack
int out = async->out; // keep a copy of the pipe's address on the stack
while (read(in, &task, sizeof(struct Task)) > 0) {
if (!task.task) {
close(in);
close(out);
break;
}
task.task(task.arg);
}
return 0;
}
// the thread's "watch-dog" / sentinal
static void* thread_sentinal(struct Async* async) {
// do we need a sentinal that will reinitiate the thread if a task causes it
// to fail?
// signal(int, void (*)(int))
pthread_t active_thread;
void* thread_error = (void*)on_signal;
int in = async->in;
while (thread_error && (fcntl(in, F_GETFL) >= 0)) {
pthread_create(&active_thread, NULL, (void* (*)(void*))thread_loop, async);
pthread_join(active_thread, &thread_error);
if (thread_error) {
perror("Async: thread sentinal reinitiating worker thread");
}
}
return 0;
}
/////////////////////
// a single task performance, for busy waiting
static int perform_single_task(async_p async) {
if (!async || fcntl(async->in, F_GETFL, NULL) == -1)
return -1;
fprintf(stderr,
"Warning: event queue overloaded!\n"
"Perfoming out of band tasks, failure could occure.\n"
"Consider adding process workers of threads for concurrency.\n"
"\n");
struct Task task = {};
if (read(async->in, &task, sizeof(struct Task)) > 0) {
if (!task.task) {
close(async->in);
close(async->out);
return 0;
}
pthread_mutex_unlock(&async->locker);
task.task(task.arg);
pthread_mutex_lock(&async->locker);
return 0;
} else
return -1;
}
/////////////////////
// Queue pipe extension management
struct ExtQueueData {
struct {
int in;
int out;
} io;
async_p async;
};
static void* extended_queue_thread(void* _data) {
struct ExtQueueData* data = _data;
struct Task task;
int i;
// make sure to ignore broken pipes
signal(SIGPIPE, SIG_IGN);
// get the core out pipe flags (blocking state not important)
i = fcntl(data->async->out, F_GETFL, NULL);
if (i < 0)
return (void*)-1;
// change the original queue writer object to a blocking state
fcntl(data->io.out, F_SETFL, i & (~O_NONBLOCK));
// make sure the reader doesn't block
fcntl(data->io.in, F_SETFL, (O_NONBLOCK));
while (1) {
// we're checking the status of our queue, so we don't want stuff to be
// added while we review.
pthread_mutex_lock(&data->async->locker);
i = read(data->io.in, &task, sizeof(struct Task));
if (i <= 0) {
// we're done, return the async object to it's previous status
i = data->async->out;
data->async->out = data->io.out;
data->io.out = i;
// get the core pipe flags (blocking state not important)
i = fcntl(data->io.out, F_GETFL, NULL);
// return the writer to a non-blocking state.
fcntl(data->async->out, F_SETFL, i | O_NONBLOCK);
// unlock the queue
pthread_mutex_unlock(&data->async->locker);
// close the extra pipes
close(data->io.in);
close(data->io.out);
// free the data object
free(data);
return 0;
}
// unlock the queue - let it be filled.
pthread_mutex_unlock(&data->async->locker);
// write to original queue in a blocking manner.
if (write(data->io.out, &task, sizeof(struct Task)) <= 0) {
// there was an error while writing - it could be that we're shutting
// down.
// close the extra pipes (no need to swap, as all pipes are broken anyway)
close(data->io.in);
close(data->io.out);
free(data);
return (void*)-1;
};
}
}
static int extend_queue(async_p async, struct Task* task) {
// create the data carrier
struct ExtQueueData* data = malloc(sizeof(struct ExtQueueData));
if (!data || !async || (fcntl(async->in, F_GETFL) < 0)) // closed queue
return -1;
// create the extra pipes
if (pipe(&data->io.in)) {
free(data);
return -1;
};
// set the data's async pointer
data->async = async;
// get the core out pipe flags (blocking state not important)
int flags = fcntl(async->out, F_GETFL, NULL);
// make the new task writer object non-blocking
fcntl(data->io.out, F_SETFL, flags | O_NONBLOCK);
// swap the two writers
async->out = async->out + data->io.out;
data->io.out = async->out - data->io.out;
async->out = async->out - data->io.out;
// write the task to the new queue - otherwise, our thread might quit before
// it starts.
if (write(async->out, task, sizeof(struct Task)) <= 0) {
// failed to write the new task... can this happen?
// swap the two writers back
async->out = async->out + data->io.out;
data->io.out = async->out - data->io.out;
async->out = async->out - data->io.out;
// close
close(data->io.in);
close(data->io.out);
// free
free(data);
// inform about the error
return -1;
}
// create a thread that listens to the new queue and pushes it to the old
// queue
pthread_t thr;
if (pthread_create(&thr, NULL, extended_queue_thread, data)) {
// // damn, we were so close to finish... but got an error
// swap the two writers back
async->out = async->out + data->io.out;
data->io.out = async->out - data->io.out;
async->out = async->out - data->io.out;
// close
close(data->io.in);
close(data->io.out);
// free
free(data);
// inform about the error
return -1;
};
return 0;
}
/////////////////////
// the functions
/////////////////////
// the functions
// creates a new aync object
static struct Async* async_new(int threads,
void (*on_init)(struct Async* self, void* arg),
void* arg) {
if (threads <= 0)
return NULL;
// create the tasking pipe.
int io[2];
if (pipe(io))
return NULL;
// allocate the memory
size_t memory_required = sizeof(struct Async) + (sizeof(pthread_t) * threads);
struct Async* async = malloc(memory_required);
if (!async) {
close(io[0]);
close(io[1]);
return NULL;
}
// create the mutex
if (pthread_mutex_init(&async->locker, NULL)) {
close(io[0]);
close(io[1]);
free(async);
return NULL;
};
// setup the struct data
async->count = threads;
async->init_thread = on_init;
async->in = io[0];
async->out = io[1];
async->arg = arg;
// make sure write isn't blocking, otherwise we might deadlock.
int flags = fcntl(async->out, F_GETFL, NULL);
fcntl(async->out, F_SETFL, flags | O_NONBLOCK);
// disable SIGPIPE isn't required, as the main thread isn't likely to make
// this mistake... but still... it might...
signal(SIGPIPE, SIG_IGN);
// create the thread pool - CHANGE this to re move sentinal thread
for (int i = 0; i < threads; i++) {
if ((pthread_create(async->thread_pool + i, NULL,
(void* (*)(void*))thread_sentinal, async)) < 0) {
for (int j = 0; j < i; j++) {
pthread_cancel(async->thread_pool[j]);
}
close(io[0]);
close(io[1]);
free(async);
return NULL;
}
}
// return the pointer
return async;
}
static int async_run(struct Async* self, void (*task)(void*), void* arg) {
if (!(task && self))
return -1;
struct Task package = {.task = task, .arg = arg};
int written = 0;
// "busy" wait for the task buffer to complete tasks by performing tasks in
// the buffer
pthread_mutex_lock(&self->locker);
while ((written = write(self->out, &package, sizeof(struct Task))) !=
sizeof(struct Task)) {
if (written > 0) {
// this is fatal to the Async engine, as a partial write will now mess up
// all the task-data! --- This shouldn't be possible because it's all
// powers of 2. (buffer size is power of 2 and struct size is power of 2).
fprintf(
stderr,
"FATAL: Async queue corruption, cannot continue processing data.\n");
exit(2);
}
if (!extend_queue(self, &package))
break;
// closed pipe or other error, return error
if (perform_single_task(self)) {
pthread_mutex_unlock(&self->locker);
return -1;
}
}
pthread_mutex_unlock(&self->locker);
return 0;
}
static void async_signal(struct Async* self) {
struct Task package = {.task = 0, .arg = 0};
pthread_mutex_lock(&self->locker);
while (write(self->out, &package, sizeof(struct Task)) !=
sizeof(struct Task)) {
if (!extend_queue(self, &package))
break;
// closed pipe, return error
if (perform_single_task(self))
break;
}
pthread_mutex_unlock(&self->locker);
}
static void async_wait(struct Async* self) {
for (int i = 0; i < self->count; i++) {
pthread_join(self->thread_pool[i], NULL);
}
close(self->in);
close(self->out);
pthread_mutex_destroy(&self->locker);
free(self);
}
static void async_finish(struct Async* self) {
async_signal(self);
async_wait(self);
}
static void async_kill(struct Async* self) {
struct Task package = {.task = 0, .arg = 0};
if (write(self->out, &package, sizeof(struct Task)))
;
for (int i = 0; i < self->count; i++) {
pthread_cancel(self->thread_pool[i]);
}
close(self->in);
close(self->out);
free(self);
}
////////////
// API gateway
// the API gateway
const struct AsyncAPI Async = {.new = async_new,
.run = async_run,
.signal = async_signal,
.wait = async_wait,
.finish = async_finish,
.kill = async_kill};