#include "gskthreadpool.h"
#include "gskerrno.h"
#include "gskghelpers.h"
#include <unistd.h>
#include <errno.h>

typedef struct _TaskInfo TaskInfo;
typedef struct _ThreadInfo ThreadInfo;

struct _GskThreadPool
{

  /*< private >*/
  GskSource *wakeup_source;
  int wakeup_read_fd;
  int wakeup_write_fd;

  guint num_threads;
  guint max_threads;

  GCond *task_available;
  GMutex *task_available_mutex;

  GMutex *lock;
  GQueue *unstarted_tasks;
  GQueue *done_results;
  GQueue *idle_threads;

  gboolean destroy_pending;
  GDestroyNotify destroy_notify;
  gpointer destroy_data;
};

/* Per task information */
struct _TaskInfo
{
  GskThreadPoolRunFunc     run;
  GskThreadPoolResultFunc  handle_result;
  gpointer                 run_data;
  gpointer                 result_data;
  GskThreadPoolDestroyFunc destroy;
};

/* Per GThread information */
struct _ThreadInfo
{
  GskThreadPool           *pool;
  GThread                 *thread;
  GCond                   *cond;
  TaskInfo                *running_task;
  gboolean                 cancelled;
};

static void
destroy_now (GskThreadPool *pool)
{
  if (pool->wakeup_source != NULL)
    gsk_source_remove (pool->wakeup_source);

  g_mutex_free (pool->lock);
  g_queue_free (pool->done_results);
  g_queue_free (pool->idle_threads);
  if (pool->destroy_notify)
    (*pool->destroy_notify) (pool->destroy_data);
  g_free (pool);
}


static gboolean
handle_wakeup_fd_pinged (int fd, GIOCondition condition, gpointer data)
{
  GskThreadPool *pool = data;
  char buf[4096];
  int rv = read (pool->wakeup_read_fd, buf, sizeof (buf));
  TaskInfo *task_info;
  if (rv == 0)
    {
      /* end-of-file??? */
      g_message ("got eof from pipe");
      return TRUE;
    }
  else if (rv < 0)
    {
      int e = errno;
      if (!gsk_errno_is_ignorable (e))
	{
	  g_warning ("error reading wakeup pipe: %s", g_strerror (e));
	  return TRUE;
	}
    }
  g_mutex_lock (pool->lock);
  while ((task_info = g_queue_pop_head (pool->done_results)) != NULL)
    {
      g_mutex_unlock (pool->lock);
      (*task_info->handle_result) (task_info->run_data, task_info->result_data);
      if (task_info->destroy != NULL)
	(*task_info->destroy) (task_info->run_data, task_info->result_data);
      g_free (task_info);
      g_mutex_lock (pool->lock);
    }
  g_mutex_unlock (pool->lock);

  if (pool->destroy_pending && pool->num_threads == 0)
    return FALSE;

  return TRUE;
}

static void
wakefd_source_destroyed (gpointer data)
{
  GskThreadPool *pool = data;
  pool->wakeup_source = NULL;

  if (pool->destroy_pending && pool->num_threads == 0)
    destroy_now (pool);
}

/**
 * gsk_thread_pool_new:
 * @main_loop: the main loop that will manage the thread pool.
 * @max_threads: maximum number of threads that may be used by
 * this thread pool, or 0 to indicate that there is no limit.
 *
 * Make a new thread pool.  A thread pool is a way of recycling threads
 * to reduce thread construction costs.
 *
 * returns: the newly allocated thread pool.
 */
GskThreadPool *gsk_thread_pool_new     (GskMainLoop             *main_loop,
                                        guint                    max_threads)
{
  GskThreadPool *thread_pool;
  int pipe_fds[2];
  if (pipe (pipe_fds) < 0)
    {
      g_error ("error creating pipe: %s", g_strerror (errno));
    }

  gsk_fd_set_nonblocking (pipe_fds[0]);

  thread_pool = g_new (GskThreadPool, 1);
  thread_pool->wakeup_read_fd = pipe_fds[0];
  thread_pool->wakeup_write_fd = pipe_fds[1];
  thread_pool->wakeup_source = gsk_main_loop_add_io (main_loop,
						     pipe_fds[0],
						     G_IO_IN,
						     handle_wakeup_fd_pinged,
						     thread_pool,
						     wakefd_source_destroyed);
  thread_pool->num_threads = 0;
  thread_pool->max_threads = max_threads;
  thread_pool->destroy_pending = FALSE;
  thread_pool->lock = g_mutex_new ();
  thread_pool->idle_threads = g_queue_new ();
  thread_pool->unstarted_tasks = g_queue_new ();
  thread_pool->done_results = g_queue_new ();
  return thread_pool;
}

static void
write_byte (int fd)
{
  char zero = 0;
  write (fd, &zero, 1);
}

static gpointer
the_thread_func (gpointer data)
{
  ThreadInfo *thread_info = data;
  GskThreadPool *pool = thread_info->pool;
  while (thread_info->running_task
      && !thread_info->cancelled
      && !pool->destroy_pending)
    {
      TaskInfo *task = thread_info->running_task;
      task->result_data = (*task->run) (task->run_data);

      g_mutex_lock (pool->lock);
      g_queue_push_tail (pool->done_results, task);
      write_byte (pool->wakeup_write_fd);
      thread_info->running_task = g_queue_pop_head (pool->unstarted_tasks);
      if (thread_info->running_task == NULL)
	{
	  g_queue_push_tail (pool->idle_threads, thread_info);
	  while (!pool->destroy_pending
             && !thread_info->cancelled
             && thread_info->running_task == NULL)
	    {
	      g_cond_wait (thread_info->cond, pool->lock);
	    }
	}
      g_mutex_unlock (pool->lock);
    }

  g_mutex_lock (pool->lock);
  --pool->num_threads;
  g_mutex_unlock (pool->lock);

  write_byte (pool->wakeup_write_fd);

  g_cond_free (thread_info->cond);
  g_free (thread_info);

  return NULL;
}

/**
 * gsk_thread_pool_push:
 * @pool: the pool to add the new task to.
 * @run: function to invoke in the other thread.
 * @handle_result: function to invoke in the main-loop's thread.
 * It is invoked with both @run_data and the return value from @run.
 * @run_data: data to pass to both @run and @handle_result and @destroy.
 * @destroy: function to be invoked once everything else is done,
 * with both @run_data and the return value from @run.
 *
 * Add a new task for the thread-pool.
 *
 * The @run function should be the slow function that must
 * be run in a background thread.
 *
 * The @handle_result function will be called in the current
 * thread (which must be the same as the thread of the main-loop
 * that was used to construct this pool) with the return
 * value of @run.
 *
 * The @destroy function will be invoked in the main thread,
 * after @run and @handle_result are done.
 */
void
gsk_thread_pool_push   (GskThreadPool           *pool,
			GskThreadPoolRunFunc     run,
			GskThreadPoolResultFunc  handle_result,
			gpointer                 run_data,
			GskThreadPoolDestroyFunc destroy)
{
  TaskInfo *info = g_new (TaskInfo, 1);
  ThreadInfo *thread_info;
  g_return_if_fail (pool->destroy_pending == FALSE);
  info->run = run;
  info->handle_result = handle_result;
  info->run_data = run_data;
  info->destroy = destroy;

  g_mutex_lock (pool->lock);
  thread_info = g_queue_pop_head (pool->idle_threads);

  if (thread_info != NULL)
    {
      thread_info->running_task = info;
      g_cond_signal (thread_info->cond);
    }
  else if (pool->max_threads == 0 || pool->num_threads < pool->max_threads)
    {
      GError *error = NULL;
      thread_info = g_new (ThreadInfo, 1);
      thread_info->pool = pool;
      thread_info->cond = g_cond_new ();
      thread_info->running_task = info;
      thread_info->cancelled = FALSE;
      thread_info->thread = g_thread_create (the_thread_func, thread_info, TRUE, &error);
      if (thread_info->thread == NULL)
	{
	  /* uh, destroy thread_info and print a warning. */
	  g_message ("error creating thread: %s", error->message);
	  g_cond_free (thread_info->cond);
	  g_free (thread_info);
	  thread_info = NULL;
	}
      else
	pool->num_threads++;
    }
  if (thread_info == NULL)
    g_queue_push_tail (pool->unstarted_tasks, info);
  g_mutex_unlock (pool->lock);
}

/**
 * gsk_thread_pool_destroy:
 * @pool: the pool to destroy.
 * @destroy: function to invoke when the thread-pool is really done.
 * @destroy_data: data to pass to @destroy when all the threads in
 * the thread pool as gone.
 *
 * Destroy a thread-pool.
 * This may take some time,
 * so you may register a handler that will be 
 * called from the main thread once the thread-pool
 * is destructed.  (The memory is not yet deallocated though,
 * so that hash-tables keyed off the thread-pool
 * will have no race condition)
 */
void
gsk_thread_pool_destroy (GskThreadPool           *pool,
			 GDestroyNotify           destroy,
			 gpointer                 destroy_data)
{
  ThreadInfo *info;
  gboolean do_destroy;
  g_return_if_fail (pool->destroy_pending == FALSE);
  pool->destroy_pending = TRUE;
  pool->destroy_notify = destroy;
  pool->destroy_data = destroy_data;
  g_mutex_lock (pool->lock);
  while ((info = g_queue_pop_head (pool->idle_threads)) != NULL)
    {
      /* Destroy the idle thread. */
      info->cancelled = TRUE;
      g_cond_signal (info->cond);
    }
  do_destroy = pool->num_threads == 0;
  g_mutex_unlock (pool->lock);

  if (do_destroy)
    destroy_now (pool);
}


syntax highlighted by Code2HTML, v. 0.9.1