blob: ee39bc7575ea3d12439c63dfbd1ef5adafcea892 [file] [log] [blame] [raw]
/*
copyright: Boaz segev, 2016
license: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include "libasync.h"
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#include <execinfo.h>
#include <pthread.h>
#include <fcntl.h>
#include <sched.h>
#include <stdatomic.h>
#include <sys/mman.h>
#include <string.h>
#ifdef __has_include
#if __has_include(<x86intrin.h>)
#include <x86intrin.h>
#define HAVE_X86Intrin
#define sched_yield() _mm_pause()
// see: https://software.intel.com/en-us/node/513411
// and: https://software.intel.com/sites/landingpage/IntrinsicsGuide/
#endif
#endif
/* *****************************************************************************
Performance options.
*/
#ifndef ASYNC_TASK_POOL_SIZE
#define ASYNC_TASK_POOL_SIZE 170
#endif
/* Spinlock vs. Mutex data protection. */
#ifndef ASYNC_USE_SPINLOCK
#define ASYNC_USE_SPINLOCK 1
#endif
/* use pipe for wakeup if == 0 else, use nanosleep when no tasks. */
#ifndef ASYNC_NANO_SLEEP
#define ASYNC_NANO_SLEEP 16777216 // 8388608 // 1048576 // 524288
#endif
/* Sentinal thread to respawn crashed threads - limited crash resistance. */
#ifndef ASYNC_USE_SENTINEL
#define ASYNC_USE_SENTINEL 0
#endif
/* *****************************************************************************
Forward declarations - used for functions that might be needed before they are
defined.
*/
// the actual working thread
static void* worker_thread_cycle(void*);
// A thread sentinal (optional - edit the ASYNC_USE_SENTINEL macro to use or
// disable)
static void* sentinal_thread(void*);
/******************************************************************************
Portability - used to help port this to different frameworks (i.e. Ruby).
*/
#ifndef THREAD_TYPE
#define THREAD_TYPE pthread_t
static void* join_thread(THREAD_TYPE thr) {
void* ret;
pthread_join(thr, &ret);
return ret;
}
static int create_thread(THREAD_TYPE* thr,
void* (*thread_func)(void*),
void* async) {
return pthread_create(thr, NULL, thread_func, async);
}
#endif
/******************************************************************************
Data Types
*/
/**
A task
*/
typedef struct {
void (*task)(void*);
void* arg;
} task_s;
/**
A task node
*/
typedef struct async_task_ns {
task_s task;
struct async_task_ns* next;
} async_task_ns;
/******************************************************************************
Core Data
*/
typedef struct {
#if !defined(ASYNC_USE_SPINLOCK) || ASYNC_USE_SPINLOCK != 1
/* if using mutex */
pthread_mutex_t lock;
#endif
/* task management*/
async_task_ns memory[ASYNC_TASK_POOL_SIZE];
async_task_ns* pool;
async_task_ns* tasks;
async_task_ns** pos;
/* thread management*/
size_t thread_count;
#if ASYNC_NANO_SLEEP == 0
/* when using pipes */
struct {
int in;
int out;
} io;
#endif
#if defined(ASYNC_USE_SPINLOCK) && ASYNC_USE_SPINLOCK == 1 // use spinlock
/* if using spinlock */
atomic_flag lock;
#endif
/* state management*/
struct {
unsigned run : 1;
} flags;
/** the threads array, must be last */
THREAD_TYPE threads[];
} async_data_s;
static async_data_s* async;
/******************************************************************************
Core Data initialization and lock/unlock
*/
#if defined(ASYNC_USE_SPINLOCK) && ASYNC_USE_SPINLOCK == 1 // use spinlock
#define lock_async_init() (atomic_flag_clear(&(async->lock)), 0)
#define lock_async_destroy() ;
#define lock_async() \
while (atomic_flag_test_and_set(&(async->lock))) \
sched_yield();
#define unlock_async() (atomic_flag_clear(&(async->lock)))
#else // Using Mutex
#define lock_async_init() (pthread_mutex_init(&((async)->lock), NULL))
#define lock_async_destroy() (pthread_mutex_destroy(&((async)->lock)))
#define lock_async() (pthread_mutex_lock(&((async)->lock)))
#define unlock_async() (pthread_mutex_unlock(&((async)->lock)))
#endif
static inline void async_free(void) {
#if ASYNC_NANO_SLEEP == 0
if (async->io.in) {
close(async->io.in);
close(async->io.out);
}
#endif
lock_async_destroy();
munmap(async, (sizeof(async_data_s) +
(sizeof(THREAD_TYPE) * (async->thread_count))));
async = NULL;
}
static inline void async_alloc(size_t threads) {
async = mmap(NULL, (sizeof(async_data_s) + (sizeof(THREAD_TYPE) * (threads))),
PROT_READ | PROT_WRITE | PROT_EXEC, MAP_PRIVATE | MAP_ANONYMOUS,
-1, 0);
if (async == MAP_FAILED) {
async = NULL;
}
*async = (async_data_s){.flags.run = 1};
async->pos = &async->tasks;
if (lock_async_init()) {
async_free();
return;
}
#if ASYNC_NANO_SLEEP == 0 // using pipes?
if (pipe(&async->io.in)) {
async_free();
return;
}
fcntl(async->io.out, F_SETFL, O_NONBLOCK);
#endif
// initialize pool
for (size_t i = 0; i < ASYNC_TASK_POOL_SIZE - 1; i++) {
async->memory[i].next = async->memory + i + 1;
}
async->memory[ASYNC_TASK_POOL_SIZE - 1].next = NULL;
async->pool = async->memory;
}
/******************************************************************************
Perfoming tasks
*/
static inline void perform_tasks(void) {
task_s tsk;
async_task_ns* t;
while (async) {
lock_async();
t = async->tasks;
if (t) {
async->tasks = t->next;
if (async->tasks == NULL)
async->pos = &(async->tasks);
tsk = t->task;
if (t >= async->memory &&
(t <= (async->memory + ASYNC_TASK_POOL_SIZE - 1))) {
t->next = async->pool;
async->pool = t;
} else {
free(t);
}
unlock_async();
tsk.task(tsk.arg);
continue;
}
async->pos = &(async->tasks);
unlock_async();
return;
}
}
/******************************************************************************
Pasuing and resuming threads
*/
static inline void pause_thread() {
#if ASYNC_NANO_SLEEP == 0
if (async && async->flags.run) {
uint8_t tmp;
read(async->io.in, &tmp, 1);
}
#else
struct timespec act, tm = {.tv_sec = 0, .tv_nsec = ASYNC_NANO_SLEEP};
nanosleep(&tm, &act);
// sched_yield();
#endif
}
static inline void wake_thread() {
#if ASYNC_NANO_SLEEP == 0
write(async->io.out, async, 1);
#endif
}
static inline void wake_all_threads() {
#if ASYNC_NANO_SLEEP == 0
write(async->io.out, async, async->thread_count + 16);
#endif
}
/******************************************************************************
Worker threads
*/
// on thread failure, a backtrace should be printed (if
// using sentinal)
// manage thread error signals
static void on_err_signal(int sig) {
void* array[22];
size_t size;
char** strings;
size_t i;
size = backtrace(array, 22);
strings = backtrace_symbols(array, size);
perror("\nERROR");
fprintf(stderr,
"Async: Error signal received"
" - %s (errno %d).\nBacktrace (%zd):\n",
strsignal(sig), errno, size);
for (i = 2; i < size; i++)
fprintf(stderr, "%s\n", strings[i]);
free(strings);
fprintf(stderr, "\n");
// pthread_exit(0); // for testing
pthread_exit((void*)on_err_signal);
}
// The worker cycle
static void* worker_thread_cycle(void* _) {
// register error signals when using a sentinal
if (ASYNC_USE_SENTINEL) {
signal(SIGSEGV, on_err_signal);
signal(SIGFPE, on_err_signal);
signal(SIGILL, on_err_signal);
#ifdef SIGBUS
signal(SIGBUS, on_err_signal);
#endif
#ifdef SIGSYS
signal(SIGSYS, on_err_signal);
#endif
#ifdef SIGXFSZ
signal(SIGXFSZ, on_err_signal);
#endif
}
// ignore pipe issues
signal(SIGPIPE, SIG_IGN);
// pause for signal for as long as we're active.
while (async && async->flags.run) {
perform_tasks();
pause_thread();
}
perform_tasks();
return 0;
}
// an optional sentinal
static void* sentinal_thread(void* _) {
THREAD_TYPE thr;
while (async != NULL && async->flags.run == 1 &&
create_thread(&thr, worker_thread_cycle, _) == 0)
join_thread(thr);
return 0;
}
/******************************************************************************
API
*/
/**
Starts running the global thread pool. Use:
async_start(8);
*/
ssize_t async_start(size_t threads) {
async_alloc(threads);
if (async == NULL)
return -1;
// initialize threads
for (size_t i = 0; i < threads; i++) {
if (create_thread(
async->threads + i,
(ASYNC_USE_SENTINEL ? sentinal_thread : worker_thread_cycle),
NULL) < 0) {
async->flags.run = 0;
wake_all_threads();
async_free();
return -1;
}
++async->thread_count;
}
signal(SIGPIPE, SIG_IGN);
return 0;
}
/**
Waits for all the present tasks to complete.
The thread pool will remain active, waiting for new tasts.
This function will wait forever or until a signal is
received and all the tasks in the queue have been processed.
Unline finish (that uses `join`) this is an **active** wait where the waiting
thread acts as a working thread and performs any pending tasks.
Use:
Async.wait(async);
*/
void async_perform() {
perform_tasks();
}
/**
Schedules a task to be performed by the thread pool.
The Task should be a function such as `void task(void
*arg)`.
Use:
void task(void * arg) { printf("%s", arg); }
char arg[] = "Demo Task";
async_run(task, arg);
*/
int async_run(void (*task)(void*), void* arg) {
if (async == NULL)
return -1;
async_task_ns* tsk;
lock_async();
tsk = async->pool;
if (tsk) {
async->pool = tsk->next;
} else {
tsk = malloc(sizeof(*tsk));
if (!tsk)
goto error;
}
*tsk = (async_task_ns){.task.task = task, .task.arg = arg};
*(async->pos) = tsk;
async->pos = &(tsk->next);
unlock_async();
wake_thread();
return 0;
error:
unlock_async();
return -1;
}
/**
Waits for existing tasks to complete and releases the thread
pool and it's
resources.
*/
void async_join() {
if (async == NULL)
return;
for (size_t i = 0; i < async->thread_count; i++) {
join_thread(async->threads[i]);
}
perform_tasks();
async_free();
};
/**
Waits for existing tasks to complete and releases the thread
pool and it's
resources.
*/
void async_signal() {
if (async == NULL)
return;
async->flags.run = 0;
wake_all_threads();
};
/******************************************************************************
Test
*/
#if ASYNC_TEST_INC == 1
#define ASYNC_SPEED_TEST_THREAD_COUNT 120
static size_t _Atomic i_count = 0;
static void sample_task(void* _) {
__asm__ volatile("" ::: "memory");
atomic_fetch_add(&i_count, 1);
}
static void sched_sample_task(void* _) {
for (size_t i = 0; i < 1024; i++) {
async_run(sample_task, async);
}
}
static void text_task_text(void* _) {
__asm__ volatile("" ::: "memory");
fprintf(stderr, "this text should print before async_finish returns\n");
}
static void text_task(void* _) {
sleep(2);
async_run(text_task_text, _);
}
#if ASYNC_USE_SENTINEL == 1
static void evil_task(void* _) {
__asm__ volatile("" ::: "memory");
fprintf(stderr, "EVIL CODE RUNNING!\n");
sprintf(NULL,
"Never write text to a NULL pointer, this is a terrible idea that "
"should segfault.\n");
}
#endif
void async_test_library_speed(void) {
atomic_store(&i_count, 0);
time_t start, end;
fprintf(stderr, "Starting Async testing\n");
if (async_start(ASYNC_SPEED_TEST_THREAD_COUNT) == 0) {
fprintf(stderr, "Thread count test %s %lu/%d\n",
(async->thread_count == ASYNC_SPEED_TEST_THREAD_COUNT ? "PASSED"
: "FAILED"),
async->thread_count, ASYNC_SPEED_TEST_THREAD_COUNT);
start = clock();
for (size_t i = 0; i < 1024; i++) {
async_run(sched_sample_task, async);
}
async_finish();
end = clock();
fprintf(stderr, "Async performance test %lu cycles with i_count = %lu\n",
end - start, atomic_load(&i_count));
} else {
fprintf(stderr, "Async test couldn't be initialized\n");
exit(-1);
}
if (async_start(8)) {
fprintf(stderr, "Couldn't start thread pool!\n");
exit(-1);
}
fprintf(stderr, "calling async_perform.\n");
async_run(text_task, NULL);
sleep(1);
async_perform();
fprintf(stderr, "async_perform returned.\n");
fprintf(stderr, "calling finish.\n");
async_run(text_task, NULL);
sleep(1);
async_finish();
fprintf(stderr, "finish returned.\n");
#if ASYNC_USE_SENTINEL == 1
if (async_start(8)) {
fprintf(stderr, "Couldn't start thread pool!\n");
exit(-1);
}
sleep(1);
fprintf(stderr, "calling evil task.\n");
async_run(evil_task, NULL);
sleep(1);
fprintf(stderr, "calling finish.\n");
async_finish();
#endif
// async_start(8);
// fprintf(stderr,
// "calling a few tasks and sleeping 12 seconds before finishing
// up...\n"
// "check the processor CPU cycles - are we busy?\n");
// async_run(sched_sample_task, NULL);
// sleep(12);
// async_finish();
}
#endif