| /* |
| copyright: Boaz segev, 2015 |
| license: MIT |
| |
| Feel free to copy, use and enjoy according to the license provided. |
| */ |
| #include "libasync.h" |
| |
| #include <stdlib.h> |
| #include <stdio.h> |
| #include <pthread.h> |
| #include <errno.h> |
| #include <signal.h> |
| #include <unistd.h> |
| #include <execinfo.h> |
| #include <fcntl.h> |
| |
| //////////////////// |
| // types used |
| |
| struct Async { |
| /// the number of threads in the array. |
| int count; |
| /// The read only part of the pipe used to push tasks. |
| int in; |
| /// The write only part of the pipe used to push tasks. |
| int out; |
| /// a callback used whenever a new thread a spawned. |
| void (*init_thread)(struct Async*, void*); |
| /// a pointer for the callback. |
| void* arg; |
| /// an array to `pthread_t` objects `count` long. |
| pthread_t thread_pool[]; |
| }; |
| |
| // A task structure. |
| struct Task { |
| void (*task)(void*); |
| void* arg; |
| }; |
| |
| ///////////////////// |
| // the thread loop functions |
| |
| // manage thread error signals |
| static void on_signal(int sig) { |
| void* array[22]; |
| size_t size; |
| char** strings; |
| size_t i; |
| size = backtrace(array, 22); |
| strings = backtrace_symbols(array, size); |
| perror("\nERROR"); |
| fprintf( |
| stderr, |
| "Async: task in thread pool raised an error (%d-%s). Backtrace (%zd):\n", |
| errno, sig == SIGSEGV |
| ? "SIGSEGV" |
| : sig == SIGFPE ? "SIGFPE" : sig == SIGILL |
| ? "SIGILL" |
| : sig == SIGPIPE ? "SIGPIPE" |
| : "unknown", |
| size); |
| for (i = 2; i < size; i++) |
| fprintf(stderr, "%s\n", strings[i]); |
| free(strings); |
| fprintf(stderr, "\n"); |
| // pthread_exit(0); // for testing |
| pthread_exit((void*)on_signal); |
| } |
| |
| // the main thread loop function |
| static void* thread_loop(struct Async* async) { |
| struct Task task = {}; |
| signal(SIGSEGV, on_signal); |
| signal(SIGFPE, on_signal); |
| signal(SIGILL, on_signal); |
| signal(SIGPIPE, SIG_IGN); |
| #ifdef SIGBUS |
| signal(SIGBUS, on_signal); |
| #endif |
| #ifdef SIGSYS |
| signal(SIGSYS, on_signal); |
| #endif |
| #ifdef SIGXFSZ |
| signal(SIGXFSZ, on_signal); |
| #endif |
| if (async->init_thread) |
| async->init_thread(async, async->arg); |
| int in = async->in; // keep a copy of the pipe's address on the stack |
| while (read(in, &task, sizeof(struct Task)) > 0) { |
| if (!task.task) { |
| close(in); |
| break; |
| } |
| task.task(task.arg); |
| } |
| return 0; |
| } |
| |
| // the thread's "watch-dog" / sentinal |
| static void* thread_sentinal(struct Async* async) { |
| // do we need a sentinal that will reinitiate the thread if a task causes it |
| // to fail? |
| // signal(int, void (*)(int)) |
| pthread_t active_thread; |
| void* thread_error = (void*)on_signal; |
| int in = async->in; |
| while (thread_error && (fcntl(in, F_GETFL) >= 0)) { |
| pthread_create(&active_thread, NULL, (void* (*)(void*))thread_loop, async); |
| pthread_join(active_thread, &thread_error); |
| if (thread_error) { |
| perror("Async: thread sentinal reinitiating worker thread"); |
| } |
| } |
| return 0; |
| } |
| |
| ///////////////////// |
| // a single task performance, for busy waiting |
| static int perform_single_task(async_p async) { |
| struct Task task = {}; |
| if (read(async->in, &task, sizeof(struct Task)) > 0) { |
| if (!task.task) { |
| close(async->in); |
| return 0; |
| } |
| task.task(task.arg); |
| return 0; |
| } else |
| return -1; |
| } |
| ///////////////////// |
| // the functions |
| |
| // creates a new aync object |
| static struct Async* async_new(int threads, |
| void (*on_init)(struct Async* self, void* arg), |
| void* arg) { |
| if (threads <= 0) |
| return NULL; |
| // create the tasking pipe. |
| int io[2]; |
| if (pipe(io)) |
| return NULL; |
| |
| // allocate the memory |
| size_t memory_required = sizeof(struct Async) + (sizeof(pthread_t) * threads); |
| struct Async* async = malloc(memory_required); |
| if (!async) { |
| close(io[0]); |
| close(io[1]); |
| return NULL; |
| } |
| |
| // setup the struct data |
| async->count = threads; |
| async->init_thread = on_init; |
| async->in = io[0]; |
| async->out = io[1]; |
| async->arg = arg; |
| |
| // make sure write isn't blocking, otherwise we might deadlock. |
| fcntl(async->out, F_SETFL, O_NONBLOCK); |
| // disable SIGPIPE isn't required, as the main thread isn't likely to make |
| // this mistake... but still... |
| signal(SIGPIPE, SIG_IGN); |
| |
| // create the thread pool |
| for (int i = 0; i < threads; i++) { |
| if ((pthread_create(async->thread_pool + i, NULL, |
| (void* (*)(void*))thread_sentinal, async)) < 0) { |
| for (int j = 0; j < i; j++) { |
| pthread_cancel(async->thread_pool[j]); |
| } |
| close(io[0]); |
| close(io[1]); |
| free(async); |
| return NULL; |
| } |
| } |
| // return the pointer |
| return async; |
| } |
| static int async_run(struct Async* self, void (*task)(void*), void* arg) { |
| if (!(task && self)) |
| return -1; |
| struct Task package = {.task = task, .arg = arg}; |
| int written; |
| // "busy" wait for the task buffer to complete tasks by performing tasks in |
| // the buffer. |
| while ((written = write(self->out, &package, sizeof(struct Task))) != |
| sizeof(struct Task)) { |
| if (written > 0) { |
| // this is fatal to the Async engine, as a partial write will now mess up |
| // all the task-data! --- This shouldn't be possible because it's all |
| // powers of 2. (buffer size is power of 2 and struct size is power of 2). |
| fprintf( |
| stderr, |
| "FATAL: Async queue corruption, cannot continue processing data.\n"); |
| exit(2); |
| } |
| // closed pipe or other error, return error |
| if (perform_single_task(self)) |
| return -1; |
| } |
| return 0; |
| } |
| |
| static void async_signal(struct Async* self) { |
| struct Task package = {.task = 0, .arg = 0}; |
| while (write(self->out, &package, sizeof(struct Task)) != |
| sizeof(struct Task)) { |
| // closed pipe, return error |
| if (perform_single_task(self)) |
| return; |
| } |
| } |
| static void async_wait(struct Async* self) { |
| for (int i = 0; i < self->count; i++) { |
| pthread_join(self->thread_pool[i], NULL); |
| } |
| close(self->in); |
| close(self->out); |
| free(self); |
| } |
| |
| static void async_finish(struct Async* self) { |
| async_signal(self); |
| async_wait(self); |
| } |
| static void async_kill(struct Async* self) { |
| struct Task package = {.task = 0, .arg = 0}; |
| if (write(self->out, &package, sizeof(struct Task))) |
| ; |
| for (int i = 0; i < self->count; i++) { |
| pthread_cancel(self->thread_pool[i]); |
| } |
| close(self->in); |
| close(self->out); |
| free(self); |
| } |
| |
| //////////// |
| // API gateway |
| |
| // the API gateway |
| const struct AsyncAPI Async = {.new = async_new, |
| .run = async_run, |
| .signal = async_signal, |
| .wait = async_wait, |
| .finish = async_finish, |
| .kill = async_kill}; |