/*
copyright: Boaz segev, 2016
license: MIT

Feel free to copy, use and enjoy according to the license provided.
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include "libasync.h"

#include <errno.h>
#include <execinfo.h>
#include <fcntl.h>
#include <pthread.h>
#include <sched.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>

/* *****************************************************************************
Performance options.
*/

#ifndef ASYNC_TASK_POOL_SIZE
#define ASYNC_TASK_POOL_SIZE 170
#endif

/* Spinlock vs. Mutex data protection. */
#ifndef ASYNC_USE_SPINLOCK
#define ASYNC_USE_SPINLOCK 1
#endif

/* use pipe for wakeup if == 0 else, use nanosleep when no tasks. */
#ifndef ASYNC_NANO_SLEEP
#define ASYNC_NANO_SLEEP 16777216 // 8388608  // 1048576  // 524288
#endif

/* Sentinal thread to respawn crashed threads - limited crash resistance. */
#ifndef ASYNC_USE_SENTINEL
#define ASYNC_USE_SENTINEL 0
#endif

/* *****************************************************************************
Forward declarations - used for functions that might be needed before they are
defined.
*/

// the actual working thread
static void *worker_thread_cycle(void *);

// A thread sentinal (optional - edit the ASYNC_USE_SENTINEL macro to use or
// disable)
static void *sentinal_thread(void *);

/******************************************************************************
Portability - used to help port this to different frameworks (i.e. Ruby).
*/

#ifndef THREAD_TYPE
#define THREAD_TYPE pthread_t

static void *join_thread(THREAD_TYPE thr) {
  void *ret;
  pthread_join(thr, &ret);
  return ret;
}

static int create_thread(THREAD_TYPE *thr, void *(*thread_func)(void *),
                         void *async) {
  return pthread_create(thr, NULL, thread_func, async);
}

#endif
/******************************************************************************
Data Types
*/

/**
A task
*/
typedef struct {
  void (*task)(void *);
  void *arg;
} task_s;

/**
A task node
*/
typedef struct async_task_ns {
  task_s task;
  struct async_task_ns *next;
} async_task_ns;

/* *****************************************************************************
Use spinlocks "spnlock.h".

For portability, it's possible copy "spnlock.h" directly after this line.
*/
#include "spnlock.h"

/******************************************************************************
Core Data
*/

typedef struct {
#if !defined(ASYNC_USE_SPINLOCK) || ASYNC_USE_SPINLOCK != 1
  /* if using mutex */
  pthread_mutex_t lock;
#endif

  /* task management*/
  async_task_ns memory[ASYNC_TASK_POOL_SIZE];
  async_task_ns *pool;
  async_task_ns *tasks;
  async_task_ns **pos;

  /* thread management*/
  size_t thread_count;

#if ASYNC_NANO_SLEEP == 0
  /* when using pipes */
  struct {
    int in;
    int out;
  } io;
#endif

#if defined(ASYNC_USE_SPINLOCK) && ASYNC_USE_SPINLOCK == 1 // use spinlock
  /* if using spinlock */
  spn_lock_i lock;
#endif

  /* state management*/
  struct {
    unsigned run : 1;
  } flags;

  /** the threads array, must be last */
  THREAD_TYPE threads[];
} async_data_s;

static async_data_s *async;

/******************************************************************************
Core Data initialization and lock/unlock
*/

#if defined(ASYNC_USE_SPINLOCK) && ASYNC_USE_SPINLOCK == 1 // use spinlock
#define lock_async_init() (spn_unlock(&(async->lock)), 0)
#define lock_async_destroy() ;
#define lock_async() spn_lock(&(async->lock))
#define unlock_async() spn_unlock(&(async->lock))

#else // Using Mutex
#define lock_async_init() (pthread_mutex_init(&((async)->lock), NULL))
#define lock_async_destroy() (pthread_mutex_destroy(&((async)->lock)))
#define lock_async() (pthread_mutex_lock(&((async)->lock)))
#define unlock_async() (pthread_mutex_unlock(&((async)->lock)))
#endif

static inline void async_free(void) {
#if ASYNC_NANO_SLEEP == 0
  if (async->io.in) {
    close(async->io.in);
    close(async->io.out);
  }
#endif
  lock_async_destroy();
  munmap(async, (sizeof(async_data_s) +
                 (sizeof(THREAD_TYPE) * (async->thread_count))));
  async = NULL;
}

static inline void async_alloc(size_t threads) {
  async = mmap(NULL, (sizeof(async_data_s) + (sizeof(THREAD_TYPE) * (threads))),
               PROT_READ | PROT_WRITE | PROT_EXEC, MAP_PRIVATE | MAP_ANONYMOUS,
               -1, 0);
  if (async == MAP_FAILED) {
    async = NULL;
  }
  *async = (async_data_s){.flags.run = 1};
  async->pos = &async->tasks;

  if (lock_async_init()) {
    async_free();
    return;
  }

#if ASYNC_NANO_SLEEP == 0 // using pipes?
  if (pipe(&async->io.in)) {
    async_free();
    return;
  }
  fcntl(async->io.out, F_SETFL, O_NONBLOCK);
#endif

  // initialize pool
  for (size_t i = 0; i < ASYNC_TASK_POOL_SIZE - 1; i++) {
    async->memory[i].next = async->memory + i + 1;
  }
  async->memory[ASYNC_TASK_POOL_SIZE - 1].next = NULL;
  async->pool = async->memory;
}

/******************************************************************************
Perfoming tasks
*/

static inline void perform_tasks(void) {
  task_s tsk;
  async_task_ns *t;
  while (async) {
    lock_async();
    t = async->tasks;
    if (t) {
      async->tasks = t->next;
      if (async->tasks == NULL)
        async->pos = &(async->tasks);
      tsk = t->task;
      if (t >= async->memory &&
          (t <= (async->memory + ASYNC_TASK_POOL_SIZE - 1))) {
        t->next = async->pool;
        async->pool = t;
      } else {
        free(t);
      }
      unlock_async();
      tsk.task(tsk.arg);
      continue;
    }
    async->pos = &(async->tasks);
    unlock_async();
    return;
  }
}

/******************************************************************************
Pausing and resuming threads
*/

static inline void pause_thread() {
#if ASYNC_NANO_SLEEP == 0
  if (async && async->flags.run) {
    uint8_t tmp;
    read(async->io.in, &tmp, 1);
  }
#else
  struct timespec act, tm = {.tv_sec = 0, .tv_nsec = ASYNC_NANO_SLEEP};
  nanosleep(&tm, &act);
// sched_yield();
#endif
}

static inline void wake_thread() {
#if ASYNC_NANO_SLEEP == 0
  write(async->io.out, async, 1);
#endif
}

static inline void wake_all_threads() {
#if ASYNC_NANO_SLEEP == 0
  write(async->io.out, async, async->thread_count + 16);
#endif
}

/******************************************************************************
Worker threads
*/

// on thread failure, a backtrace should be printed (if
// using sentinal)
// manage thread error signals
static void on_err_signal(int sig) {
  void *array[22];
  size_t size;
  char **strings;
  size_t i;
  size = backtrace(array, 22);
  strings = backtrace_symbols(array, size);
  perror("\nERROR");
  fprintf(stderr, "Async: Error signal received"
                  " - %s (errno %d).\nBacktrace (%zd):\n",
          strsignal(sig), errno, size);
  for (i = 2; i < size; i++)
    fprintf(stderr, "%s\n", strings[i]);
  free(strings);
  fprintf(stderr, "\n");
  // pthread_exit(0); // for testing
  pthread_exit((void *)on_err_signal);
}

// The worker cycle
static void *worker_thread_cycle(void *_) {
  // register error signals when using a sentinal
  if (ASYNC_USE_SENTINEL) {
    signal(SIGSEGV, on_err_signal);
    signal(SIGFPE, on_err_signal);
    signal(SIGILL, on_err_signal);
#ifdef SIGBUS
    signal(SIGBUS, on_err_signal);
#endif
#ifdef SIGSYS
    signal(SIGSYS, on_err_signal);
#endif
#ifdef SIGXFSZ
    signal(SIGXFSZ, on_err_signal);
#endif
  }

  // ignore pipe issues
  signal(SIGPIPE, SIG_IGN);

  // pause for signal for as long as we're active.
  while (async && async->flags.run) {
    perform_tasks();
    pause_thread();
  }
  perform_tasks();
  return 0;
}

// an optional sentinal
static void *sentinal_thread(void *_) {
  THREAD_TYPE thr;
  while (async != NULL && async->flags.run == 1 &&
         create_thread(&thr, worker_thread_cycle, _) == 0)
    join_thread(thr);
  return 0;
}

/******************************************************************************
API
*/

/**
Starts running the global thread pool. Use:

  async_start(8);

*/
ssize_t async_start(size_t threads) {
  async_alloc(threads);
  if (async == NULL)
    return -1;
  // initialize threads
  for (size_t i = 0; i < threads; i++) {
    if (create_thread(
            async->threads + i,
            (ASYNC_USE_SENTINEL ? sentinal_thread : worker_thread_cycle),
            NULL) < 0) {
      async->flags.run = 0;
      wake_all_threads();
      async_free();
      return -1;
    }
    ++async->thread_count;
  }
  signal(SIGPIPE, SIG_IGN);
  return 0;
}

/**
Waits for all the present tasks to complete.

The thread pool will remain active, waiting for new tasts.

This function will wait forever or until a signal is
received and all the tasks in the queue have been processed.

Unline finish (that uses `join`) this is an **active** wait where the waiting
thread acts as a working thread and performs any pending tasks.

Use:

  Async.wait(async);

*/
void async_perform() { perform_tasks(); }

/**
Schedules a task to be performed by the thread pool.

The Task should be a function such as `void task(void
*arg)`.

Use:

  void task(void * arg) { printf("%s", arg); }

  char arg[] = "Demo Task";

  async_run(task, arg);

*/
int async_run(void (*task)(void *), void *arg) {
  if (async == NULL)
    return -1;
  async_task_ns *tsk;
  lock_async();
  tsk = async->pool;
  if (tsk) {
    async->pool = tsk->next;
  } else {
    tsk = malloc(sizeof(*tsk));
    if (!tsk)
      goto error;
  }
  *tsk = (async_task_ns){.task.task = task, .task.arg = arg};
  *(async->pos) = tsk;
  async->pos = &(tsk->next);
  unlock_async();
  wake_thread();
  return 0;
error:
  unlock_async();
  return -1;
}

/**
Waits for existing tasks to complete and releases the thread
pool and it's
resources.
*/
void async_join() {
  if (async == NULL)
    return;
  for (size_t i = 0; i < async->thread_count; i++) {
    join_thread(async->threads[i]);
  }
  perform_tasks();
  async_free();
};

/**
Waits for existing tasks to complete and releases the thread
pool and it's
resources.
*/
void async_signal() {
  if (async == NULL)
    return;
  async->flags.run = 0;
  wake_all_threads();
};

/******************************************************************************
Test
*/

#ifdef DEBUG

#define ASYNC_SPEED_TEST_THREAD_COUNT 120

static spn_lock_i i_lock = SPN_LOCK_INIT;
static size_t i_count = 0;

static void sample_task(void *_) {
  spn_lock(&i_lock);
  i_count++;
  spn_unlock(&i_lock);
}

static void sched_sample_task(void *_) {
  for (size_t i = 0; i < 1024; i++) {
    async_run(sample_task, async);
  }
}

static void text_task_text(void *_) {
  spn_lock(&i_lock);
  fprintf(stderr, "this text should print before async_finish returns\n");
  spn_unlock(&i_lock);
}

static void text_task(void *_) {
  sleep(2);
  async_run(text_task_text, _);
}

#if ASYNC_USE_SENTINEL == 1
static void evil_task(void *_) {
  __asm__ volatile("" ::: "memory");
  fprintf(stderr, "EVIL CODE RUNNING!\n");
  sprintf(NULL,
          "Never write text to a NULL pointer, this is a terrible idea that "
          "should segfault.\n");
}
#endif

void async_test_library_speed(void) {
  spn_lock(&i_lock);
  i_count = 0;
  spn_unlock(&i_lock);
  time_t start, end;
  fprintf(stderr, "Starting Async testing\n");
  if (async_start(ASYNC_SPEED_TEST_THREAD_COUNT) == 0) {
    fprintf(stderr, "Thread count test %s %lu/%d\n",
            (async->thread_count == ASYNC_SPEED_TEST_THREAD_COUNT ? "PASSED"
                                                                  : "FAILED"),
            async->thread_count, ASYNC_SPEED_TEST_THREAD_COUNT);
    start = clock();
    for (size_t i = 0; i < 1024; i++) {
      async_run(sched_sample_task, async);
    }
    async_finish();
    end = clock();
    fprintf(stderr, "Async performance test %lu cycles with i_count = %lu\n",
            end - start, i_count);
  } else {
    fprintf(stderr, "Async test couldn't be initialized\n");
    exit(-1);
  }
  if (async_start(8)) {
    fprintf(stderr, "Couldn't start thread pool!\n");
    exit(-1);
  }
  fprintf(stderr, "calling async_perform.\n");
  async_run(text_task, NULL);
  sleep(1);
  async_perform();
  fprintf(stderr, "async_perform returned.\n");
  fprintf(stderr, "calling finish.\n");
  async_run(text_task, NULL);
  sleep(1);
  async_finish();
  fprintf(stderr, "finish returned.\n");

#if ASYNC_USE_SENTINEL == 1
  if (async_start(8)) {
    fprintf(stderr, "Couldn't start thread pool!\n");
    exit(-1);
  }
  sleep(1);
  fprintf(stderr, "calling evil task.\n");
  async_run(evil_task, NULL);
  sleep(1);
  fprintf(stderr, "calling finish.\n");
  async_finish();
#endif

  // async_start(8);
  // fprintf(stderr,
  //         "calling a few tasks and sleeping 12 seconds before finishing
  //         up...\n"
  //         "check the processor CPU cycles - are we busy?\n");
  // async_run(sched_sample_task, NULL);
  // sleep(12);
  // async_finish();
}

#endif
