blob: 4ac369dd16d12ed37590b9b8b96b5026368998eb [file] [log] [blame] [raw]
/*
Copyright: Boaz Segev, 2016-2017
License: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#include "spnlock.h"
#include "defer.h"
#include <errno.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
/* *****************************************************************************
Compile time settings
***************************************************************************** */
#ifndef DEFER_THROTTLE
#define DEFER_THROTTLE 1048574UL
#endif
#ifndef DEFER_THROTTLE_LIMIT
#define DEFER_THROTTLE_LIMIT 2097148UL
#endif
/**
* The progressive throttling model makes concurrency and parallelism more
* likely.
*
* Otherwise threads are assumed to be intended for "fallback" in case of slow
* user code, where a single thread should be active most of the time and other
* threads are activated only when that single thread is slow to perform.
*/
#ifndef DEFER_THROTTLE_PROGRESSIVE
#define DEFER_THROTTLE_PROGRESSIVE 1
#endif
#ifndef DEFER_QUEUE_BLOCK_COUNT
#if UINTPTR_MAX <= 0xFFFFFFFF
/* Almost a page of memory on most 32 bit machines: ((4096/4)-5)/3 */
#define DEFER_QUEUE_BLOCK_COUNT 339
#else
/* Almost a page of memory on most 64 bit machines: ((4096/8)-5)/3 */
#define DEFER_QUEUE_BLOCK_COUNT 168
#endif
#endif
/* *****************************************************************************
Data Structures
***************************************************************************** */
/* task node data */
typedef struct {
void (*func)(void *, void *);
void *arg1;
void *arg2;
} task_s;
/* task queue block */
typedef struct queue_block_s {
task_s tasks[DEFER_QUEUE_BLOCK_COUNT];
struct queue_block_s *next;
size_t write;
size_t read;
unsigned char state;
} queue_block_s;
static queue_block_s static_queue;
/* the state machine - this holds all the data about the task queue and pool */
static struct {
/* a lock for the state machine, used for multi-threading support */
spn_lock_i lock;
/* current active block to pop tasks */
queue_block_s *reader;
/* current active block to push tasks */
queue_block_s *writer;
} deferred = {.reader = &static_queue, .writer = &static_queue};
/* *****************************************************************************
Internal Data API
***************************************************************************** */
#if DEBUG
static size_t count_alloc, count_dealloc;
#define COUNT_ALLOC spn_add(&count_alloc, 1)
#define COUNT_DEALLOC spn_add(&count_dealloc, 1)
#define COUNT_RESET \
do { \
count_alloc = count_dealloc = 0; \
} while (0)
#else
#define COUNT_ALLOC
#define COUNT_DEALLOC
#define COUNT_RESET
#endif
static inline void push_task(task_s task) {
spn_lock(&deferred.lock);
/* test if full */
if (deferred.writer->state &&
deferred.writer->write == deferred.writer->read) {
/* return to static buffer or allocate new buffer */
if (static_queue.state == 2) {
deferred.writer->next = &static_queue;
} else {
deferred.writer->next = malloc(sizeof(*deferred.writer->next));
COUNT_ALLOC;
if (!deferred.writer->next)
goto critical_error;
}
deferred.writer = deferred.writer->next;
deferred.writer->write = 0;
deferred.writer->read = 0;
deferred.writer->state = 0;
deferred.writer->next = NULL;
}
/* place task and finish */
deferred.writer->tasks[deferred.writer->write++] = task;
/* cycle buffer */
if (deferred.writer->write == DEFER_QUEUE_BLOCK_COUNT) {
deferred.writer->write = 0;
deferred.writer->state = 1;
}
spn_unlock(&deferred.lock);
return;
critical_error:
spn_unlock(&deferred.lock);
perror("ERROR CRITICAL: defer can't allocate task");
kill(0, SIGINT);
exit(errno);
}
static inline task_s pop_task(void) {
task_s ret = (task_s){.func = NULL};
queue_block_s *to_free = NULL;
/* lock the state machine, grab/create a task and place it at the tail */
spn_lock(&deferred.lock);
/* empty? */
if (deferred.reader->write == deferred.reader->read &&
!deferred.reader->state)
goto finish;
/* collect task */
ret = deferred.reader->tasks[deferred.reader->read++];
/* cycle */
if (deferred.reader->read == DEFER_QUEUE_BLOCK_COUNT) {
deferred.reader->read = 0;
deferred.reader->state = 0;
}
/* did we finish the queue in the buffer? */
if (deferred.reader->write == deferred.reader->read) {
if (deferred.reader->next) {
to_free = deferred.reader;
deferred.reader = deferred.reader->next;
} else {
if (deferred.reader != &static_queue && static_queue.state == 2) {
to_free = deferred.reader;
deferred.writer = &static_queue;
deferred.reader = &static_queue;
}
deferred.reader->write = deferred.reader->read = deferred.reader->state =
0;
}
goto finish;
}
finish:
if (to_free == &static_queue) {
static_queue.state = 2;
static_queue.next = NULL;
}
spn_unlock(&deferred.lock);
if (to_free && to_free != &static_queue) {
free(to_free);
COUNT_DEALLOC;
}
return ret;
}
static inline void clear_tasks(void) {
spn_lock(&deferred.lock);
while (deferred.reader) {
queue_block_s *tmp = deferred.reader;
deferred.reader = deferred.reader->next;
if (tmp != &static_queue) {
COUNT_DEALLOC;
free(tmp);
}
}
static_queue = (queue_block_s){.next = NULL};
deferred.reader = deferred.writer = &static_queue;
spn_unlock(&deferred.lock);
}
void defer_on_fork(void) { deferred.lock = SPN_LOCK_INIT; }
#define push_task(...) push_task((task_s){__VA_ARGS__})
/* *****************************************************************************
API
***************************************************************************** */
/** Defer an execution of a function for later. */
int defer(void (*func)(void *, void *), void *arg1, void *arg2) {
/* must have a task to defer */
if (!func)
goto call_error;
push_task(.func = func, .arg1 = arg1, .arg2 = arg2);
defer_thread_signal();
return 0;
call_error:
return -1;
}
/** Performs all deferred functions until the queue had been depleted. */
void defer_perform(void) {
task_s task = pop_task();
while (task.func) {
task.func(task.arg1, task.arg2);
task = pop_task();
}
}
/** Returns true if there are deferred functions waiting for execution. */
int defer_has_queue(void) {
return deferred.reader->read != deferred.reader->write;
}
/** Clears the queue. */
void defer_clear_queue(void) { clear_tasks(); }
/* *****************************************************************************
Thread Pool Support
***************************************************************************** */
/* thread pool data container */
struct defer_pool {
volatile unsigned int flag;
unsigned int count;
struct thread_msg_s {
pool_pt pool;
void *thrd;
} threads[];
};
#if defined(__unix__) || defined(__APPLE__) || defined(__linux__) || \
defined(DEBUG)
#include <pthread.h>
/* `weak` functions can be overloaded to change the thread implementation. */
#pragma weak defer_new_thread
void *defer_new_thread(void *(*thread_func)(void *), void *arg) {
pthread_t *thread = malloc(sizeof(*thread));
if (thread == NULL || pthread_create(thread, NULL, thread_func, arg))
goto error;
return thread;
error:
free(thread);
return NULL;
}
/**
* OVERRIDE THIS to replace the default pthread implementation.
*
* Frees the memory asociated with a thread indentifier (allows the thread to
* run it's course, just the identifier is freed).
*/
#pragma weak defer_free_thread
void defer_free_thread(void *p_thr) {
if (*((pthread_t *)p_thr)) {
pthread_detach(*((pthread_t *)p_thr));
}
free(p_thr);
}
#pragma weak defer_join_thread
int defer_join_thread(void *p_thr) {
if (!p_thr || !(*((pthread_t *)p_thr)))
return -1;
pthread_join(*((pthread_t *)p_thr), NULL);
*((pthread_t *)p_thr) = (pthread_t)NULL;
defer_free_thread(p_thr);
return 0;
}
#pragma weak defer_thread_throttle
void defer_thread_throttle(unsigned long microsec) {
throttle_thread(microsec);
}
#else /* No pthreads... BYO thread implementation. This one simply fails. */
#pragma weak defer_new_thread
void *defer_new_thread(void *(*thread_func)(void *), void *arg) {
(void)thread_func;
(void)arg;
return NULL;
}
#pragma weak defer_free_thread
void defer_free_thread(void *p_thr) { void(p_thr); }
#pragma weak defer_join_thread
int defer_join_thread(void *p_thr) {
(void)p_thr;
return -1;
}
#pragma weak defer_thread_throttle
void defer_thread_throttle(unsigned long microsec) { return; }
#endif /* DEBUG || pthread default */
/**
* A thread entering this function should wait for new evennts.
*/
#pragma weak defer_thread_wait
void defer_thread_wait(pool_pt pool, void *p_thr) {
if (DEFER_THROTTLE_PROGRESSIVE) {
/* keeps threads active (concurrent), but reduces performance */
static __thread size_t static_throttle = 1;
if (static_throttle < DEFER_THROTTLE_LIMIT)
static_throttle = (static_throttle << 1);
throttle_thread(static_throttle);
if (defer_has_queue())
static_throttle = 1;
(void)p_thr;
(void)pool;
} else {
/* Protects against slow user code, but mostly a single active thread */
size_t throttle =
pool ? ((pool->count) * DEFER_THROTTLE) : DEFER_THROTTLE_LIMIT;
if (!throttle || throttle > DEFER_THROTTLE_LIMIT)
throttle = DEFER_THROTTLE_LIMIT;
if (throttle == DEFER_THROTTLE)
throttle <<= 1;
throttle_thread(throttle);
(void)p_thr;
}
}
/**
* This should signal a single waiting thread to wake up (a new task entered the
* queue).
*/
#pragma weak defer_thread_signal
void defer_thread_signal(void) { (void)0; }
/* a thread's cycle. This is what a worker thread does... repeatedly. */
static void *defer_worker_thread(void *pool_) {
struct thread_msg_s volatile *data = pool_;
signal(SIGPIPE, SIG_IGN);
/* perform any available tasks */
defer_perform();
/* as long as the flag is true, wait for and perform tasks. */
do {
defer_thread_wait(data->pool, data->thrd);
defer_perform();
} while (data->pool->flag);
return NULL;
}
/** Signals a running thread pool to stop. Returns immediately. */
void defer_pool_stop(pool_pt pool) {
if (!pool)
return;
pool->flag = 0;
for (size_t i = 0; i < pool->count; ++i) {
defer_thread_signal();
}
}
/** Returns TRUE (1) if the pool is hadn't been signaled to finish up. */
int defer_pool_is_active(pool_pt pool) { return (int)pool->flag; }
/**
* Waits for a running thread pool, joining threads and finishing all tasks.
*
* This function MUST be called in order to free the pool's data (the
* `pool_pt`).
*/
void defer_pool_wait(pool_pt pool) {
while (pool->count) {
pool->count--;
defer_join_thread(pool->threads[pool->count].thrd);
}
free(pool);
}
/** The logic behind `defer_pool_start`. */
static inline pool_pt defer_pool_initialize(unsigned int thread_count,
pool_pt pool) {
pool->flag = 1;
pool->count = 0;
while (pool->count < thread_count &&
(pool->threads[pool->count].pool = pool) &&
(pool->threads[pool->count].thrd = defer_new_thread(
defer_worker_thread, (void *)(pool->threads + pool->count))))
pool->count++;
if (pool->count == thread_count) {
return pool;
}
defer_pool_stop(pool);
return NULL;
}
/** Starts a thread pool that will run deferred tasks in the background. */
pool_pt defer_pool_start(unsigned int thread_count) {
if (thread_count == 0)
return NULL;
pool_pt pool =
malloc(sizeof(*pool) + (thread_count * sizeof(*pool->threads)));
if (!pool)
return NULL;
return defer_pool_initialize(thread_count, pool);
}
/* *****************************************************************************
Test
***************************************************************************** */
#ifdef DEBUG
#include <pthread.h>
#include <stdio.h>
#include <sys/stat.h>
static size_t i_count = 0;
#define TOTAL_COUNT (512 * 1024)
static void sample_task(void *unused, void *unused2) {
(void)(unused);
(void)(unused2);
spn_add(&i_count, 1);
}
static void sched_sample_task(void *count, void *unused2) {
(void)(unused2);
for (size_t i = 0; i < (uintptr_t)count; i++) {
defer(sample_task, NULL, NULL);
}
}
static void text_task_text(void *unused, void *unused2) {
(void)(unused);
(void)(unused2);
fprintf(stderr, "this text should print before defer_perform returns\n");
}
static void text_task(void *a1, void *a2) {
static const struct timespec tm = {.tv_sec = 2};
nanosleep(&tm, NULL);
defer(text_task_text, a1, a2);
}
void defer_test(void) {
#define TEST_ASSERT(cond, ...) \
if (!(cond)) { \
fprintf(stderr, "* " __VA_ARGS__); \
fprintf(stderr, "Testing failed.\n"); \
exit(-1); \
}
clock_t start, end;
fprintf(stderr, "Starting defer testing\n");
i_count = 0;
start = clock();
for (size_t i = 0; i < TOTAL_COUNT; i++) {
sample_task(NULL, NULL);
}
end = clock();
fprintf(stderr,
"Deferless (direct call) counter: %lu cycles with i_count = %lu, "
"%lu/%lu free/malloc\n",
(unsigned long)(end - start), (unsigned long)i_count,
(unsigned long)count_dealloc, (unsigned long)count_alloc);
size_t i_count_should_be = i_count;
fprintf(stderr, "\n");
for (int i = 1; TOTAL_COUNT >> i; ++i) {
COUNT_RESET;
i_count = 0;
const size_t per_task = TOTAL_COUNT >> i;
const size_t tasks = 1 << i;
start = clock();
for (size_t j = 0; j < tasks; ++j) {
defer(sched_sample_task, (void *)per_task, NULL);
}
defer_perform();
end = clock();
fprintf(stderr,
"- Defer single thread, %zu scheduling loops (%zu each):\n"
" %lu cycles with i_count = %lu, %lu/%lu "
"free/malloc\n",
tasks, per_task, (unsigned long)(end - start),
(unsigned long)i_count, (unsigned long)count_dealloc,
(unsigned long)count_alloc);
TEST_ASSERT(i_count == i_count_should_be, "ERROR: defer count invalid\n");
}
ssize_t cpu_count = 8;
#ifdef _SC_NPROCESSORS_ONLN
cpu_count = (sysconf(_SC_NPROCESSORS_ONLN) >> 1) | 1;
#endif
fprintf(stderr, "\n");
for (int i = 1; TOTAL_COUNT >> i; ++i) {
COUNT_RESET;
i_count = 0;
const size_t per_task = TOTAL_COUNT >> i;
const size_t tasks = 1 << i;
pool_pt pool = defer_pool_start(cpu_count);
start = clock();
for (size_t j = 0; j < tasks; ++j) {
defer(sched_sample_task, (void *)per_task, NULL);
}
defer_pool_stop(pool);
defer_pool_wait(pool);
end = clock();
fprintf(stderr,
"- Defer %zu threads, %zu scheduling loops (%zu each):\n"
" %lu cycles with i_count = %lu, %lu/%lu "
"free/malloc\n",
(size_t)cpu_count, tasks, per_task, (unsigned long)(end - start),
(unsigned long)i_count, (unsigned long)count_dealloc,
(unsigned long)count_alloc);
TEST_ASSERT(i_count == i_count_should_be, "ERROR: defer count invalid\n");
}
COUNT_RESET;
i_count = 0;
for (size_t i = 0; i < 1024; i++) {
defer(sched_sample_task, NULL, NULL);
}
defer_perform();
defer(text_task, NULL, NULL);
fprintf(stderr, "calling defer_perform.\n");
defer_perform();
fprintf(stderr,
"defer_perform returned. i_count = %lu, %lu/%lu free/malloc\n",
(unsigned long)i_count, (unsigned long)count_dealloc,
(unsigned long)count_alloc);
COUNT_RESET;
i_count = 0;
defer_clear_queue();
fprintf(stderr, "* Defer cleared queue: %lu/%lu free/malloc\n\n",
(unsigned long)count_dealloc, (unsigned long)count_alloc);
}
#endif