/*- * $Id: rr-workpool.c,v 1.10 2002/10/06 18:17:03 jonas Exp $ * * See the file LICENSE for redistribution information. * If you have not received a copy of the license, please contact CodeFactory * by email at info@codefactory.se, or on the web at http://www.codefactory.se/ * You may also write to: CodeFactory AB, SE-903 47, Umeå, Sweden. * * Copyright (c) 2002 Jonas Borgström * Copyright (c) 2002 Daniel Lundin * Copyright (c) 2002 CodeFactory AB. All rights reserved. */ #include typedef struct { GFunc func; RRWPGroup gid; gpointer data; gpointer user_data; RRWorkPool *pool; } WorkItem; /** * rr_work_pool_new: * @max_threads: The maximum number of threads this pool should use. * * Create a new work pool. The #RRWorkPool works like an ordinary thread pool * with the exception that each "work item" is assigned a gid (group id). * But the work pool guarantees that two items with the same gid value * cant be executed in parallel. * * Return value: a newly allocated #RRWorkPool. **/ RRWorkPool * rr_work_pool_new (gint max_threads) { RRWorkPool *pool; g_return_val_if_fail (max_threads > 0, NULL); pool = g_new (RRWorkPool, 1); pool->num_threads = 0; pool->max_threads = max_threads; pool->mutex = g_mutex_new (); pool->add_cond = g_cond_new (); pool->remove_cond = g_cond_new (); pool->work = NULL; pool->active = NULL; pool->shutdown = FALSE; return pool; } /** * rr_work_pool_free: * @pool: A #RRWorkPool. * * Waits until all work items have been executed then all resourced used by * the work pool are returned to the operating system. **/ void rr_work_pool_free (RRWorkPool *pool) { g_return_if_fail (pool); g_mutex_lock (pool->mutex); pool->shutdown = TRUE; g_cond_broadcast (pool->add_cond); while (pool->work || pool->active || pool->num_threads) g_cond_wait (pool->remove_cond, pool->mutex); g_mutex_unlock (pool->mutex); g_mutex_free (pool->mutex); g_cond_free (pool->add_cond); g_cond_free (pool->remove_cond); g_free (pool); } static gboolean is_active (RRWorkPool *pool, RRWPGroup gid) { GSList *list = pool->active; while (list) { if (gid == ((WorkItem *)list->data)->gid) return TRUE; list = list->next; } return FALSE; } static gboolean is_pending (RRWorkPool *pool, RRWPGroup gid) { GSList *list = pool->work; if (is_active (pool, gid)) return TRUE; while (list) { if (gid == ((WorkItem *)list->data)->gid) return TRUE; list = list->next; } return FALSE; } static WorkItem * get_next_item (RRWorkPool *pool) { GSList *list; g_return_val_if_fail (pool, NULL); list = pool->work; while (list) { WorkItem *item = (WorkItem *)list->data; g_assert (item != NULL); if (!is_active (pool, item->gid)) { pool->work = g_slist_remove_link (pool->work, list); list->next = pool->active; pool->active = list; return item; } list = list->next; } return NULL; } static gpointer work_proxy (gpointer data) { WorkItem *item = (WorkItem *)data; RRWorkPool *pool = item->pool; GTimeVal tv = {0, 0}; while (item) { item->func (item->data, item->user_data); g_free (item); g_mutex_lock (pool->mutex); pool->active = g_slist_remove (pool->active, item); g_cond_broadcast (pool->remove_cond); while ((item = get_next_item (pool)) == NULL) { g_get_current_time (&tv); g_time_val_add (&tv, 1000000); if (!g_cond_timed_wait (pool->add_cond, pool->mutex, &tv)) { item = NULL; break; } if (pool->shutdown && pool->work == NULL) break; } if (item == NULL) { pool->num_threads--; g_cond_broadcast (pool->remove_cond); } g_mutex_unlock (pool->mutex); } return NULL; } static void process_item (RRWorkPool *pool) { GError *error = NULL; WorkItem *item; g_return_if_fail (pool); if ((item = get_next_item (pool))) { pool->num_threads++; if (g_thread_create (work_proxy, item, FALSE, &error) == NULL) g_error ("g_thread_create failed: %s\n", error->message); } } /** * rr_work_pool_push: * @pool: a #RRWorkPool * @gid: a thread group id. * @func: a function to execute by one thread in the work pool. * @data: first argument to @func * @user_data: second argument to @func * * Inserts @func to the list of task to be executed by the @pool. * The work pool guarantees that two or more tasks with the same * @gid will never be executed in parallel. **/ void rr_work_pool_push (RRWorkPool *pool, RRWPGroup gid, GFunc func, gpointer data, gpointer user_data) { WorkItem *item; g_return_if_fail (pool); g_mutex_lock (pool->mutex); item = g_new (WorkItem, 1); item->func = func; item->gid = gid; item->data = data; item->user_data = user_data; item->pool = pool; pool->work = g_slist_append (pool->work, item); g_cond_broadcast (pool->add_cond); if (pool->num_threads < pool->max_threads) process_item (pool); g_mutex_unlock (pool->mutex); } /** * rr_work_pool_join: * @pool: a #RRWork pool. * @gid: a thread group id * * blocks until all tasks of the given @gid are executed. **/ void rr_work_pool_join (RRWorkPool *pool, RRWPGroup gid) { g_return_if_fail (pool); g_mutex_lock (pool->mutex); while (is_pending (pool, gid)) g_cond_wait (pool->remove_cond, pool->mutex); g_mutex_unlock (pool->mutex); } #ifdef TESTCASE static void my_func (gpointer data, gpointer user_data) { g_print ("my_func %s start\n", (gchar *)data); g_usleep (1000000); g_print ("my_func %s end\n", (gchar *)data); } int main () { RRWorkPool *pool; g_thread_init (NULL); pool = rr_work_pool_new (2); rr_work_pool_push (pool, 1, my_func, "1", NULL); rr_work_pool_push (pool, 1, my_func, "1", NULL); rr_work_pool_push (pool, 1, my_func, "1", NULL); rr_work_pool_push (pool, 2, my_func, "2", NULL); rr_work_pool_push (pool, 3, my_func, "3", NULL); g_print ("Calling rr_work_pool_free\n"); rr_work_pool_free (pool); g_print ("Calling rr_work_pool_free DONE\n"); } #endif