/*-
* $Id: rr-workpool.c,v 1.10 2002/10/06 18:17:03 jonas Exp $
*
* See the file LICENSE for redistribution information.
* If you have not received a copy of the license, please contact CodeFactory
* by email at info@codefactory.se, or on the web at http://www.codefactory.se/
* You may also write to: CodeFactory AB, SE-903 47, Umeå, Sweden.
*
* Copyright (c) 2002 Jonas Borgström <jonas@codefactory.se>
* Copyright (c) 2002 Daniel Lundin <daniel@codefactory.se>
* Copyright (c) 2002 CodeFactory AB. All rights reserved.
*/
#include <librr/rr-workpool.h>
typedef struct {
GFunc func;
RRWPGroup gid;
gpointer data;
gpointer user_data;
RRWorkPool *pool;
} WorkItem;
/**
* rr_work_pool_new:
* @max_threads: The maximum number of threads this pool should use.
*
* Create a new work pool. The #RRWorkPool works like an ordinary thread pool
* with the exception that each "work item" is assigned a gid (group id).
* But the work pool guarantees that two items with the same gid value
* cant be executed in parallel.
*
* Return value: a newly allocated #RRWorkPool.
**/
RRWorkPool *
rr_work_pool_new (gint max_threads)
{
RRWorkPool *pool;
g_return_val_if_fail (max_threads > 0, NULL);
pool = g_new (RRWorkPool, 1);
pool->num_threads = 0;
pool->max_threads = max_threads;
pool->mutex = g_mutex_new ();
pool->add_cond = g_cond_new ();
pool->remove_cond = g_cond_new ();
pool->work = NULL;
pool->active = NULL;
pool->shutdown = FALSE;
return pool;
}
/**
* rr_work_pool_free:
* @pool: A #RRWorkPool.
*
* Waits until all work items have been executed then all resourced used by
* the work pool are returned to the operating system.
**/
void
rr_work_pool_free (RRWorkPool *pool)
{
g_return_if_fail (pool);
g_mutex_lock (pool->mutex);
pool->shutdown = TRUE;
g_cond_broadcast (pool->add_cond);
while (pool->work || pool->active || pool->num_threads)
g_cond_wait (pool->remove_cond, pool->mutex);
g_mutex_unlock (pool->mutex);
g_mutex_free (pool->mutex);
g_cond_free (pool->add_cond);
g_cond_free (pool->remove_cond);
g_free (pool);
}
static gboolean
is_active (RRWorkPool *pool, RRWPGroup gid)
{
GSList *list = pool->active;
while (list) {
if (gid == ((WorkItem *)list->data)->gid)
return TRUE;
list = list->next;
}
return FALSE;
}
static gboolean
is_pending (RRWorkPool *pool, RRWPGroup gid)
{
GSList *list = pool->work;
if (is_active (pool, gid))
return TRUE;
while (list) {
if (gid == ((WorkItem *)list->data)->gid)
return TRUE;
list = list->next;
}
return FALSE;
}
static WorkItem *
get_next_item (RRWorkPool *pool)
{
GSList *list;
g_return_val_if_fail (pool, NULL);
list = pool->work;
while (list) {
WorkItem *item = (WorkItem *)list->data;
g_assert (item != NULL);
if (!is_active (pool, item->gid)) {
pool->work = g_slist_remove_link (pool->work,
list);
list->next = pool->active;
pool->active = list;
return item;
}
list = list->next;
}
return NULL;
}
static gpointer
work_proxy (gpointer data)
{
WorkItem *item = (WorkItem *)data;
RRWorkPool *pool = item->pool;
GTimeVal tv = {0, 0};
while (item) {
item->func (item->data, item->user_data);
g_free (item);
g_mutex_lock (pool->mutex);
pool->active = g_slist_remove (pool->active, item);
g_cond_broadcast (pool->remove_cond);
while ((item = get_next_item (pool)) == NULL) {
g_get_current_time (&tv);
g_time_val_add (&tv, 1000000);
if (!g_cond_timed_wait (pool->add_cond,
pool->mutex, &tv)) {
item = NULL;
break;
}
if (pool->shutdown && pool->work == NULL)
break;
}
if (item == NULL) {
pool->num_threads--;
g_cond_broadcast (pool->remove_cond);
}
g_mutex_unlock (pool->mutex);
}
return NULL;
}
static void
process_item (RRWorkPool *pool)
{
GError *error = NULL;
WorkItem *item;
g_return_if_fail (pool);
if ((item = get_next_item (pool))) {
pool->num_threads++;
if (g_thread_create (work_proxy, item, FALSE, &error) == NULL)
g_error ("g_thread_create failed: %s\n",
error->message);
}
}
/**
* rr_work_pool_push:
* @pool: a #RRWorkPool
* @gid: a thread group id.
* @func: a function to execute by one thread in the work pool.
* @data: first argument to @func
* @user_data: second argument to @func
*
* Inserts @func to the list of task to be executed by the @pool.
* The work pool guarantees that two or more tasks with the same
* @gid will never be executed in parallel.
**/
void
rr_work_pool_push (RRWorkPool *pool, RRWPGroup gid, GFunc func,
gpointer data, gpointer user_data)
{
WorkItem *item;
g_return_if_fail (pool);
g_mutex_lock (pool->mutex);
item = g_new (WorkItem, 1);
item->func = func;
item->gid = gid;
item->data = data;
item->user_data = user_data;
item->pool = pool;
pool->work = g_slist_append (pool->work, item);
g_cond_broadcast (pool->add_cond);
if (pool->num_threads < pool->max_threads)
process_item (pool);
g_mutex_unlock (pool->mutex);
}
/**
* rr_work_pool_join:
* @pool: a #RRWork pool.
* @gid: a thread group id
*
* blocks until all tasks of the given @gid are executed.
**/
void
rr_work_pool_join (RRWorkPool *pool, RRWPGroup gid)
{
g_return_if_fail (pool);
g_mutex_lock (pool->mutex);
while (is_pending (pool, gid))
g_cond_wait (pool->remove_cond, pool->mutex);
g_mutex_unlock (pool->mutex);
}
#ifdef TESTCASE
static void
my_func (gpointer data, gpointer user_data)
{
g_print ("my_func %s start\n", (gchar *)data);
g_usleep (1000000);
g_print ("my_func %s end\n", (gchar *)data);
}
int
main () {
RRWorkPool *pool;
g_thread_init (NULL);
pool = rr_work_pool_new (2);
rr_work_pool_push (pool, 1, my_func, "1", NULL);
rr_work_pool_push (pool, 1, my_func, "1", NULL);
rr_work_pool_push (pool, 1, my_func, "1", NULL);
rr_work_pool_push (pool, 2, my_func, "2", NULL);
rr_work_pool_push (pool, 3, my_func, "3", NULL);
g_print ("Calling rr_work_pool_free\n");
rr_work_pool_free (pool);
g_print ("Calling rr_work_pool_free DONE\n");
}
#endif
syntax highlighted by Code2HTML, v. 0.9.1