/*-
 * $Id: rr-workpool.c,v 1.10 2002/10/06 18:17:03 jonas Exp $
 *
 * See the file LICENSE for redistribution information. 
 * If you have not received a copy of the license, please contact CodeFactory
 * by email at info@codefactory.se, or on the web at http://www.codefactory.se/
 * You may also write to: CodeFactory AB, SE-903 47, Umeå, Sweden.
 *
 * Copyright (c) 2002 Jonas Borgström <jonas@codefactory.se>
 * Copyright (c) 2002 Daniel Lundin   <daniel@codefactory.se>
 * Copyright (c) 2002 CodeFactory AB.  All rights reserved.
 */

#include <librr/rr-workpool.h>

typedef struct {
	GFunc func;
	RRWPGroup gid;
	gpointer data;
	gpointer user_data;
	RRWorkPool *pool;
} WorkItem;

/**
 * rr_work_pool_new:
 * @max_threads: The maximum number of threads this pool should use.
 * 
 * Create a new work pool. The #RRWorkPool works like an ordinary thread pool
 * with the exception that each "work item" is assigned a gid (group id). 
 * But the work pool guarantees that two items with the same gid value
 * cant be executed in parallel.
 * 
 * Return value: a newly allocated #RRWorkPool.
 **/
RRWorkPool *
rr_work_pool_new (gint max_threads)
{
	RRWorkPool *pool;
	
	g_return_val_if_fail (max_threads > 0, NULL);
	
	pool = g_new (RRWorkPool, 1);
	pool->num_threads = 0;
	pool->max_threads = max_threads;
	pool->mutex  = g_mutex_new ();
	pool->add_cond    = g_cond_new ();
	pool->remove_cond = g_cond_new ();
	pool->work   = NULL;
	pool->active = NULL;
	pool->shutdown = FALSE;

	return pool;
}

/**
 * rr_work_pool_free:
 * @pool: A #RRWorkPool.
 * 
 * Waits until all work items have been executed then all resourced used by
 * the work pool are returned to the operating system.
 **/
void
rr_work_pool_free (RRWorkPool *pool)
{
	g_return_if_fail (pool);

	g_mutex_lock   (pool->mutex);
	pool->shutdown = TRUE;
	g_cond_broadcast (pool->add_cond);
	while (pool->work || pool->active || pool->num_threads)
		g_cond_wait (pool->remove_cond, pool->mutex);
	g_mutex_unlock   (pool->mutex);

	g_mutex_free   (pool->mutex);
	g_cond_free    (pool->add_cond);
	g_cond_free    (pool->remove_cond);

	g_free (pool);
}

static gboolean
is_active (RRWorkPool *pool, RRWPGroup gid)
{
	GSList *list = pool->active;

	while (list) {
		if (gid == ((WorkItem *)list->data)->gid)
			return TRUE;
		list = list->next;
	}
	return FALSE;
}

static gboolean
is_pending (RRWorkPool *pool, RRWPGroup gid)
{
	GSList *list = pool->work;

	if (is_active (pool, gid))
		return TRUE;

	while (list) {
		if (gid == ((WorkItem *)list->data)->gid)
			return TRUE;
		list = list->next;
	}
	return FALSE;
}

static WorkItem *
get_next_item (RRWorkPool *pool)
{
	GSList *list;

	g_return_val_if_fail (pool, NULL);

	list = pool->work;
	while (list) {
		WorkItem *item = (WorkItem *)list->data;
		g_assert (item != NULL);

		if (!is_active (pool, item->gid)) {
			
			pool->work = g_slist_remove_link (pool->work,
							  list);
			list->next = pool->active;
			pool->active = list;
			return item;
		}
		list = list->next;
	}
	return NULL;
}

static gpointer
work_proxy (gpointer data)
{
	WorkItem *item = (WorkItem *)data;
	RRWorkPool *pool = item->pool;
	GTimeVal tv = {0, 0};

	while (item) {
		item->func (item->data, item->user_data);
		g_free (item);

		g_mutex_lock (pool->mutex);
		pool->active = g_slist_remove (pool->active, item);
		g_cond_broadcast (pool->remove_cond);
		while ((item = get_next_item (pool)) == NULL) {
			g_get_current_time (&tv);
			g_time_val_add (&tv, 1000000);
			if (!g_cond_timed_wait (pool->add_cond, 
						pool->mutex, &tv)) {
				item = NULL;
				break;
			}
			if (pool->shutdown && pool->work == NULL)
				break;
		}
		if (item == NULL) {
			pool->num_threads--;
			g_cond_broadcast (pool->remove_cond);
		}
		g_mutex_unlock (pool->mutex);
	}
	return NULL;
}

static void
process_item (RRWorkPool *pool)
{
	GError *error = NULL;
	WorkItem *item;

	g_return_if_fail (pool);

	if ((item = get_next_item (pool))) {
		pool->num_threads++;
		if (g_thread_create (work_proxy, item, FALSE, &error) == NULL)
			g_error ("g_thread_create failed: %s\n", 
				 error->message);
	}
}

/**
 * rr_work_pool_push:
 * @pool: a #RRWorkPool
 * @gid: a thread group id.
 * @func: a function to execute by one thread in the work pool.
 * @data: first argument to @func
 * @user_data: second argument to @func
 * 
 * Inserts @func to the list of task to be executed by the @pool.
 * The work pool guarantees that two or more tasks with the same
 * @gid will never be executed in parallel.
 **/
void
rr_work_pool_push (RRWorkPool *pool, RRWPGroup gid, GFunc func, 
		   gpointer data, gpointer user_data)
{
	WorkItem *item;
	g_return_if_fail (pool);

	g_mutex_lock (pool->mutex);

	item = g_new (WorkItem, 1);
	item->func = func;
	item->gid  = gid;
	item->data = data;
	item->user_data = user_data;
	item->pool = pool;

	pool->work = g_slist_append (pool->work, item);
	g_cond_broadcast (pool->add_cond);

	if (pool->num_threads < pool->max_threads)
		process_item (pool);

	g_mutex_unlock (pool->mutex);
}

/**
 * rr_work_pool_join:
 * @pool: a #RRWork pool.
 * @gid: a thread group id
 * 
 * blocks until all tasks of the given @gid are executed.
 **/
void
rr_work_pool_join (RRWorkPool *pool, RRWPGroup gid)
{
	g_return_if_fail (pool);

	g_mutex_lock   (pool->mutex);
	while (is_pending (pool, gid))
		g_cond_wait (pool->remove_cond, pool->mutex);
	g_mutex_unlock (pool->mutex);
}


#ifdef TESTCASE

static void
my_func (gpointer data, gpointer user_data)
{
	g_print ("my_func %s start\n", (gchar *)data);
	g_usleep (1000000);
	g_print ("my_func %s end\n", (gchar *)data);
}

int
main () {
	RRWorkPool *pool;
	g_thread_init (NULL);

	pool = rr_work_pool_new (2);

	rr_work_pool_push (pool, 1, my_func, "1", NULL);
	rr_work_pool_push (pool, 1, my_func, "1", NULL);
	rr_work_pool_push (pool, 1, my_func, "1", NULL);
	rr_work_pool_push (pool, 2, my_func, "2", NULL);
	rr_work_pool_push (pool, 3, my_func, "3", NULL);

	g_print ("Calling rr_work_pool_free\n");
	rr_work_pool_free (pool);
	g_print ("Calling rr_work_pool_free DONE\n");
}

#endif


syntax highlighted by Code2HTML, v. 0.9.1