blob: 59220972ba90574c54b6bcbfad92291d36093956 [file] [log] [blame] [raw]
/*
Copyright: Boaz Segev, 2016-2017
License: MIT
Feel free to copy, use and enjoy according to the license provided.
*/
#include "spnlock.inc"
#include "defer.h"
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
/* *****************************************************************************
Compile time settings
***************************************************************************** */
#ifndef DEFER_THROTTLE
#define DEFER_THROTTLE 524287UL
#endif
#ifndef DEFER_THROTTLE_LIMIT
#define DEFER_THROTTLE_LIMIT 1572864UL
#endif
#ifndef DEFER_QUEUE_BLOCK_COUNT
#if UINTPTR_MAX <= 0xFFFFFFFF
/* Almost a page of memory on most 32 bit machines: ((4096/4)-4)/3 */
#define DEFER_QUEUE_BLOCK_COUNT 340
#else
/* Almost a page of memory on most 64 bit machines: ((4096/8)-4)/3 */
#define DEFER_QUEUE_BLOCK_COUNT 168
#endif
#endif
/* *****************************************************************************
Data Structures
***************************************************************************** */
/* task node data */
typedef struct {
void (*func)(void *, void *);
void *arg1;
void *arg2;
} task_s;
/* task queue block */
typedef struct queue_block_s {
task_s tasks[DEFER_QUEUE_BLOCK_COUNT];
struct queue_block_s *next;
size_t write;
size_t read;
unsigned char state;
} queue_block_s;
static queue_block_s static_queue;
/* the state machine - this holds all the data about the task queue and pool */
static struct {
/* a lock for the state machine, used for multi-threading support */
spn_lock_i lock;
/* current active block to pop tasks */
queue_block_s *reader;
/* current active block to push tasks */
queue_block_s *writer;
} deferred = {.reader = &static_queue, .writer = &static_queue};
/* *****************************************************************************
Internal Data API
***************************************************************************** */
#if DEBUG
static size_t count_alloc, count_dealloc;
#define COUNT_ALLOC spn_add(&count_alloc, 1)
#define COUNT_DEALLOC spn_add(&count_dealloc, 1)
#define COUNT_RESET \
do { \
count_alloc = count_dealloc = 0; \
} while (0)
#else
#define COUNT_ALLOC
#define COUNT_DEALLOC
#define COUNT_RESET
#endif
static inline void push_task(task_s task) {
spn_lock(&deferred.lock);
/* test if full */
if (deferred.writer->state &&
deferred.writer->write == deferred.writer->read) {
/* return to static buffer or allocate new buffer */
if (static_queue.state == 2) {
deferred.writer->next = &static_queue;
} else {
deferred.writer->next = malloc(sizeof(*deferred.writer->next));
COUNT_ALLOC;
if (!deferred.writer->next)
goto critical_error;
}
deferred.writer = deferred.writer->next;
deferred.writer->write = 0;
deferred.writer->read = 0;
deferred.writer->state = 0;
deferred.writer->next = NULL;
}
/* place task and finish */
deferred.writer->tasks[deferred.writer->write++] = task;
/* cycle buffer */
if (deferred.writer->write == DEFER_QUEUE_BLOCK_COUNT) {
deferred.writer->write = 0;
deferred.writer->state = 1;
}
spn_unlock(&deferred.lock);
return;
critical_error:
spn_unlock(&deferred.lock);
perror("ERROR CRITICAL: defer can't allocate task");
kill(0, SIGINT);
exit(errno);
}
static inline task_s pop_task(void) {
task_s ret = (task_s){NULL};
queue_block_s *to_free = NULL;
/* lock the state machine, to grab/create a task and place it at the tail
*/ spn_lock(&deferred.lock);
/* empty? */
if (deferred.reader->write == deferred.reader->read &&
!deferred.reader->state)
goto finish;
/* collect task */
ret = deferred.reader->tasks[deferred.reader->read++];
/* cycle */
if (deferred.reader->read == DEFER_QUEUE_BLOCK_COUNT) {
deferred.reader->read = 0;
deferred.reader->state = 0;
}
/* did we finish the queue in the buffer? */
if (deferred.reader->write == deferred.reader->read) {
if (deferred.reader->next) {
to_free = deferred.reader;
deferred.reader = deferred.reader->next;
} else {
deferred.reader->write = deferred.reader->read = deferred.reader->state =
0;
}
goto finish;
}
finish:
if (to_free == &static_queue) {
static_queue.state = 2;
}
spn_unlock(&deferred.lock);
if (to_free && to_free != &static_queue) {
free(to_free);
COUNT_DEALLOC;
}
return ret;
}
static inline void clear_tasks(void) {
spn_lock(&deferred.lock);
while (deferred.reader) {
queue_block_s *tmp = deferred.reader;
deferred.reader = deferred.reader->next;
if (tmp != &static_queue) {
COUNT_DEALLOC;
free(tmp);
}
}
static_queue = (queue_block_s){.next = NULL};
deferred.reader = deferred.writer = &static_queue;
spn_unlock(&deferred.lock);
}
#define push_task(...) push_task((task_s){__VA_ARGS__})
/* *****************************************************************************
API
***************************************************************************** */
/** Defer an execution of a function for later. */
int defer(void (*func)(void *, void *), void *arg1, void *arg2) {
/* must have a task to defer */
if (!func)
goto call_error;
push_task(.func = func, .arg1 = arg1, .arg2 = arg2);
return 0;
call_error:
return -1;
}
/** Performs all deferred functions until the queue had been depleted. */
void defer_perform(void) {
task_s task = pop_task();
while (task.func) {
task.func(task.arg1, task.arg2);
task = pop_task();
}
}
/** Returns true if there are deferred functions waiting for execution. */
int defer_has_queue(void) {
return deferred.reader->read != deferred.reader->write;
}
/** Clears the queue. */
void defer_clear_queue(void) { clear_tasks(); }
/* *****************************************************************************
Thread Pool Support
***************************************************************************** */
#if defined(__unix__) || defined(__APPLE__) || defined(__linux__) || \
defined(DEBUG)
#include <pthread.h>
/* `weak` functions can be overloaded to change the thread implementation. */
#pragma weak defer_new_thread
void *defer_new_thread(void *(*thread_func)(void *), pool_pt arg) {
pthread_t *thread = malloc(sizeof(*thread));
if (thread == NULL || pthread_create(thread, NULL, thread_func, (void *)arg))
goto error;
return thread;
error:
free(thread);
return NULL;
}
#pragma weak defer_join_thread
int defer_join_thread(void *p_thr) {
if (!p_thr)
return -1;
pthread_join(*((pthread_t *)p_thr), NULL);
free(p_thr);
return 0;
}
#pragma weak defer_thread_throttle
void defer_thread_throttle(unsigned long microsec) {
throttle_thread(microsec);
}
#else /* No pthreads... BYO thread implementation. This one simply fails. */
#pragma weak defer_new_thread
void *defer_new_thread(void *(*thread_func)(void *), void *arg) {
(void)thread_func;
(void)arg;
return NULL;
}
#pragma weak defer_join_thread
int defer_join_thread(void *p_thr) {
(void)p_thr;
return -1;
}
#pragma weak defer_thread_throttle
void defer_thread_throttle(unsigned long microsec) { return; }
#endif /* DEBUG || pthread default */
/* thread pool data container */
struct defer_pool {
unsigned int flag;
unsigned int count;
void *threads[];
};
/* a thread's cycle. This is what a worker thread does... repeatedly. */
static void *defer_worker_thread(void *pool_) {
volatile pool_pt pool = pool_;
signal(SIGPIPE, SIG_IGN);
/* the throttle replaces conditional variables for better performance */
size_t throttle = (pool->count) * DEFER_THROTTLE;
if (!throttle || throttle > DEFER_THROTTLE_LIMIT)
throttle = DEFER_THROTTLE_LIMIT;
/* perform any available tasks */
defer_perform();
/* as long as the flag is true, wait for and perform tasks. */
do {
throttle_thread(throttle);
defer_perform();
} while (pool->flag);
return NULL;
}
/** Signals a running thread pool to stop. Returns immediately. */
void defer_pool_stop(pool_pt pool) { pool->flag = 0; }
/** Returns TRUE (1) if the pool is hadn't been signaled to finish up. */
int defer_pool_is_active(pool_pt pool) { return pool->flag; }
/** Waits for a running thread pool, joining threads and finishing all tasks. */
void defer_pool_wait(pool_pt pool) {
while (pool->count) {
pool->count--;
defer_join_thread(pool->threads[pool->count]);
}
}
/** The logic behind `defer_pool_start`. */
static inline pool_pt defer_pool_initialize(unsigned int thread_count,
pool_pt pool) {
pool->flag = 1;
pool->count = 0;
while (pool->count < thread_count &&
(pool->threads[pool->count] =
defer_new_thread(defer_worker_thread, pool)))
pool->count++;
if (pool->count == thread_count) {
return pool;
}
defer_pool_stop(pool);
return NULL;
}
/** Starts a thread pool that will run deferred tasks in the background. */
pool_pt defer_pool_start(unsigned int thread_count) {
if (thread_count == 0)
return NULL;
pool_pt pool = malloc(sizeof(*pool) + (thread_count * sizeof(void *)));
if (!pool)
return NULL;
return defer_pool_initialize(thread_count, pool);
}
/* *****************************************************************************
Child Process support (`fork`)
***************************************************************************** */
/**
OVERRIDE THIS to replace the default `fork` implementation or to inject hooks
into the forking function.
Behaves like the system's `fork`.
*/
#pragma weak defer_new_child
int defer_new_child(void) { return (int)fork(); }
/* forked `defer` workers use a global thread pool object. */
static pool_pt forked_pool;
/* handles the SIGINT and SIGTERM signals by shutting down workers */
static void sig_int_handler(int sig) {
if (sig != SIGINT && sig != SIGTERM)
return;
if (!forked_pool)
return;
defer_pool_stop(forked_pool);
}
/*
Zombie Reaping
With thanks to Dr Graham D Shaw.
http://www.microhowto.info/howto/reap_zombie_processes_using_a_sigchld_handler.html
*/
void reap_child_handler(int sig) {
(void)(sig);
int old_errno = errno;
while (waitpid(-1, NULL, WNOHANG) > 0)
;
errno = old_errno;
}
/* initializes zombie reaping for the process */
inline static void reap_children(void) {
struct sigaction sa;
sa.sa_handler = reap_child_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGCHLD, &sa, 0) == -1) {
perror("Child reaping initialization failed");
kill(0, SIGINT), exit(errno);
}
}
/* a global process identifier (0 == root) */
static int defer_fork_pid_id = 0;
/**
* Forks the process, starts up a thread pool and waits for all tasks to run.
* All existing tasks will run in all processes (multiple times).
*
* Returns 0 on success, -1 on error and a positive number if this is a child
* process that was forked.
*/
int defer_perform_in_fork(unsigned int process_count,
unsigned int thread_count) {
if (forked_pool)
return -1; /* we're already running inside an active `fork` */
/* we use a placeholder while initializing the forked thread pool, so calls to
* `defer_fork_is_active` don't fail.
*/
static struct defer_pool pool_placeholder = {.count = 1, .flag = 1};
/* setup signal handling */
struct sigaction act, old, old_term, old_pipe;
pid_t *pids = NULL;
int ret = 0;
unsigned int pids_count;
act.sa_handler = sig_int_handler;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGINT, &act, &old)) {
perror("couldn't set signal handler");
goto finish;
};
if (sigaction(SIGTERM, &act, &old_term)) {
perror("couldn't set signal handler");
goto finish;
};
act.sa_handler = SIG_IGN;
if (sigaction(SIGPIPE, &act, &old_pipe)) {
perror("couldn't set signal handler");
goto finish;
};
/* setup zomie reaping */
#if !defined(NO_CHILD_REAPER) || NO_CHILD_REAPER == 0
reap_children();
#endif
if (!process_count)
process_count = 1;
--process_count;
/* for `process_count == 0` nothing happens */
pids = calloc(process_count, sizeof(*pids));
if (process_count && !pids)
goto finish;
for (pids_count = 0; pids_count < process_count; pids_count++) {
if (!(pids[pids_count] = (pid_t)defer_new_child())) {
defer_fork_pid_id = pids_count + 1;
forked_pool = &pool_placeholder;
forked_pool = defer_pool_start(thread_count);
defer_pool_wait(forked_pool);
defer_perform();
defer_perform();
return 1;
}
if (pids[pids_count] == -1) {
ret = -1;
goto finish;
}
}
forked_pool = &pool_placeholder;
forked_pool = defer_pool_start(thread_count);
defer_pool_wait(forked_pool);
forked_pool = NULL;
defer_perform();
finish:
if (pids) {
for (size_t j = 0; j < pids_count; j++) {
kill(pids[j], SIGINT);
}
for (size_t j = 0; j < pids_count; j++) {
waitpid(pids[j], NULL, 0);
}
free(pids);
}
sigaction(SIGINT, &old, &act);
sigaction(SIGTERM, &old_term, &act);
sigaction(SIGTERM, &old_pipe, &act);
return ret;
}
/** Returns TRUE (1) if the forked thread pool hadn't been signaled to finish
* up. */
int defer_fork_is_active(void) { return forked_pool && forked_pool->flag; }
/** Returns the process number for the current working proceess. 0 == parent. */
int defer_fork_pid(void) { return defer_fork_pid_id; }
/* *****************************************************************************
Test
***************************************************************************** */
#ifdef DEBUG
#include <stdio.h>
#include <pthread.h>
#define DEFER_TEST_THREAD_COUNT 128
static spn_lock_i i_lock = 0;
static size_t i_count = 0;
static void sample_task(void *unused, void *unused2) {
(void)(unused);
(void)(unused2);
spn_lock(&i_lock);
i_count++;
spn_unlock(&i_lock);
}
static void single_counter_task(void *unused, void *unused2) {
(void)(unused);
(void)(unused2);
spn_lock(&i_lock);
i_count++;
spn_unlock(&i_lock);
if (i_count < (1024 * 1024))
defer(single_counter_task, NULL, NULL);
}
static void sched_sample_task(void *unused, void *unused2) {
(void)(unused);
(void)(unused2);
for (size_t i = 0; i < 1024; i++) {
defer(sample_task, NULL, NULL);
}
}
static void thrd_sched(void *unused, void *unused2) {
for (size_t i = 0; i < (1024 / DEFER_TEST_THREAD_COUNT); i++) {
sched_sample_task(unused, unused2);
}
}
static void text_task_text(void *unused, void *unused2) {
(void)(unused);
(void)(unused2);
spn_lock(&i_lock);
fprintf(stderr, "this text should print before defer_perform returns\n");
spn_unlock(&i_lock);
}
static void text_task(void *a1, void *a2) {
static const struct timespec tm = {.tv_sec = 2};
nanosleep(&tm, NULL);
defer(text_task_text, a1, a2);
}
static void pid_task(void *arg, void *unused2) {
(void)(unused2);
fprintf(stderr, "* %d pid is going to sleep... (%s)\n", getpid(),
arg ? (char *)arg : "unknown");
}
void defer_test(void) {
time_t start, end;
fprintf(stderr, "Starting defer testing\n");
spn_lock(&i_lock);
i_count = 0;
spn_unlock(&i_lock);
start = clock();
for (size_t i = 0; i < (1024 * 1024); i++) {
sample_task(NULL, NULL);
}
end = clock();
fprintf(stderr,
"Deferless (direct call) counter: %lu cycles with i_count = %lu, "
"%lu/%lu free/malloc\n",
end - start, i_count, count_dealloc, count_alloc);
spn_lock(&i_lock);
COUNT_RESET;
i_count = 0;
spn_unlock(&i_lock);
start = clock();
defer(single_counter_task, NULL, NULL);
defer(single_counter_task, NULL, NULL);
defer_perform();
end = clock();
fprintf(stderr,
"Defer single thread, two tasks: "
"%lu cycles with i_count = %lu, %lu/%lu "
"free/malloc\n",
end - start, i_count, count_dealloc, count_alloc);
spn_lock(&i_lock);
COUNT_RESET;
i_count = 0;
spn_unlock(&i_lock);
start = clock();
for (size_t i = 0; i < 1024; i++) {
defer(sched_sample_task, NULL, NULL);
}
defer_perform();
end = clock();
fprintf(stderr,
"Defer single thread: %lu cycles with i_count = %lu, %lu/%lu "
"free/malloc\n",
end - start, i_count, count_dealloc, count_alloc);
spn_lock(&i_lock);
COUNT_RESET;
i_count = 0;
spn_unlock(&i_lock);
start = clock();
pool_pt pool = defer_pool_start(DEFER_TEST_THREAD_COUNT);
if (pool) {
for (size_t i = 0; i < DEFER_TEST_THREAD_COUNT; i++) {
defer(thrd_sched, NULL, NULL);
}
// defer((void (*)(void *))defer_pool_stop, pool);
defer_pool_stop(pool);
defer_pool_wait(pool);
end = clock();
fprintf(stderr,
"Defer multi-thread (%d threads): %lu cycles with i_count = %lu, "
"%lu/%lu free/malloc\n",
DEFER_TEST_THREAD_COUNT, end - start, i_count, count_dealloc,
count_alloc);
} else
fprintf(stderr, "Defer multi-thread: FAILED!\n");
spn_lock(&i_lock);
COUNT_RESET;
i_count = 0;
spn_unlock(&i_lock);
start = clock();
for (size_t i = 0; i < 1024; i++) {
defer(sched_sample_task, NULL, NULL);
}
defer_perform();
end = clock();
fprintf(stderr,
"Defer single thread (2): %lu cycles with i_count = %lu, %lu/%lu "
"free/malloc\n",
end - start, i_count, count_dealloc, count_alloc);
fprintf(stderr, "calling defer_perform.\n");
defer(text_task, NULL, NULL);
defer_perform();
fprintf(stderr,
"defer_perform returned. i_count = %lu, %lu/%lu free/malloc\n",
i_count, count_dealloc, count_alloc);
fprintf(stderr, "press ^C to finish PID test\n");
defer(pid_task, "pid test", NULL);
if (defer_perform_in_fork(4, 64) > 0) {
fprintf(stderr, "* %d finished\n", getpid());
exit(0);
};
fprintf(stderr, "* Defer queue %lu/%lu free/malloc\n", count_dealloc,
count_alloc);
defer_clear_queue();
fprintf(stderr, "* Defer queue %lu/%lu free/malloc\n", count_dealloc,
count_alloc);
}
#endif