/*
* scamper_queue.c
*
* $Id: scamper_queue.c,v 1.16 2007/05/10 02:45:33 mjl Exp $
*
* Copyright (C) 2005-2007 The University of Waikato
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include <sys/time.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#if defined(__APPLE__)
#include <stdint.h>
#endif
#include <string.h>
#include <stdlib.h>
#if defined(DMALLOC)
#include <dmalloc.h>
#endif
#include "scamper.h"
#include "scamper_task.h"
#include "scamper_queue.h"
#include "utils.h"
#include "mjl_list.h"
#include "mjl_heap.h"
struct scamper_queue
{
/* the task that is queued */
struct scamper_task *task;
/* when the scamper task should timeout from whatever queue it is on */
struct timeval timeout;
/* the current queue the task is in */
void *queue;
/* the node for the queue */
void *node;
};
static dlist_t *probe_queue = NULL;
static heap_t *wait_queue = NULL;
static heap_t *done_queue = NULL;
static int count = 0;
/*
* queue_cmp
*
*
*/
static int queue_cmp(const void *va, const void *vb)
{
const scamper_queue_t *a = (const scamper_queue_t *)va;
const scamper_queue_t *b = (const scamper_queue_t *)vb;
int cmp = timeval_cmp(&a->timeout, &b->timeout);
if(cmp < 0) return 1;
if(cmp > 0) return -1;
return 0;
}
/*
* queue_unlink
*
* detach a task from whichever queue it is in
*/
static void queue_unlink(scamper_queue_t *sq)
{
if(sq->queue == NULL)
{
return;
}
else if(sq->queue == probe_queue)
{
dlist_node_pop(sq->queue, sq->node);
}
else if(sq->queue == wait_queue || sq->queue == done_queue)
{
heap_delete(sq->queue, sq->node);
}
sq->queue = NULL;
sq->node = NULL;
count--;
return;
}
/*
* queue_link
*
* given a task and a queue to insert it in, put the task into the queue.
* to is a timeout value of when the task should be removed from the queue.
*/
static int queue_link(scamper_queue_t *sq, void *queue, const int *msec)
{
void *node;
/* remove the task from the old queue */
queue_unlink(sq);
/* if there's a timeout to include, then copy that into the task struct */
if(msec != NULL)
{
gettimeofday_wrap(&sq->timeout);
timeval_add_msec(&sq->timeout, *msec);
}
/* now, put it in the correct queue */
if(queue == probe_queue)
{
node = dlist_tail_push(queue, sq);
}
else if(queue == wait_queue || queue == done_queue)
{
node = heap_insert(queue, sq);
}
else
{
node = NULL;
}
/* ensure we've got a node */
if(node == NULL)
{
return -1;
}
sq->queue = queue;
sq->node = node;
count++;
return 0;
}
int scamper_queue_probe(scamper_queue_t *sq)
{
return queue_link(sq, probe_queue, NULL);
}
int scamper_queue_wait(scamper_queue_t *sq, int msec)
{
return queue_link(sq, wait_queue, &msec);
}
int scamper_queue_done(scamper_queue_t *sq, int msec)
{
return queue_link(sq, done_queue, &msec);
}
void scamper_queue_detach(scamper_queue_t *sq)
{
queue_unlink(sq);
return;
}
/*
* scamper_queue_select
*
* return the next task in the probe queue to deal with
*/
struct scamper_task *scamper_queue_select()
{
scamper_queue_t *sq;
if((sq = dlist_head_pop(probe_queue)) != NULL)
{
sq->queue = NULL;
sq->node = NULL;
count--;
return sq->task;
}
return NULL;
}
/*
* scamper_queue_getdone
*
*/
struct scamper_task *scamper_queue_getdone()
{
scamper_queue_t *sq;
struct timeval tv;
if((sq = (scamper_queue_t *)heap_head_item(done_queue)) == NULL)
{
return NULL;
}
gettimeofday_wrap(&tv);
if(timeval_cmp(&tv, &sq->timeout) > 0)
{
queue_unlink(sq);
return sq->task;
}
return NULL;
}
/*
* scamper_queue_waittime
*
* tell the caller how long it should wait in select before it makes a
* pass through the queues. note that this function does not check the
* probe queue where something is immediately ready to be probed.
*
* if there is nothing in any of the queues, we return 0. otherwise we
* return the number of active queues and the tv parameter contains the
* time that the first queue will have something to deal with.
*/
int scamper_queue_waittime(struct timeval *tv)
{
scamper_queue_t *sq;
heap_t *queues[2];
int i, set = 0;
queues[0] = wait_queue;
queues[1] = done_queue;
for(i=(sizeof(queues)/sizeof(heap_t *))-1; i >= 0; i--)
{
if((sq = (scamper_queue_t *)heap_head_item(queues[i])) != NULL)
{
if(set == 0 || timeval_cmp(tv, &sq->timeout) > 0)
{
timeval_cpy(tv, &sq->timeout);
set++;
}
}
}
return set;
}
/*
* scamper_queue_readycount
*
* this function causes the wait queue to be checked to see if any of the
* members should be punted onto the probe queue for
* action.
*
* we then return the count of ready tasks, which is the count of items on
* the probe queue.
*/
int scamper_queue_readycount()
{
scamper_queue_t *sq;
struct timeval tv;
gettimeofday_wrap(&tv);
/* timeout any tasks on the wait queue that are due to be probed again */
while((sq = heap_head_item(wait_queue)) != NULL)
{
if(timeval_cmp(&tv, &sq->timeout) > 0)
{
queue_unlink(sq);
if(sq->task->funcs->handle_timeout != NULL)
{
sq->task->funcs->handle_timeout(sq->task);
}
if(sq->queue == NULL)
{
queue_link(sq, probe_queue, NULL);
}
}
else break;
}
return dlist_count(probe_queue);
}
/*
* scamper_queue_empty
*
* for whatever reason, the queue of 'active' tasks must be flushed.
* drop all active tasks by removing them from the probe and wait queues.
*/
void scamper_queue_empty()
{
scamper_queue_t *sq;
while((sq = (scamper_queue_t *)heap_remove(wait_queue)) != NULL)
{
sq->queue = NULL;
sq->node = NULL;
count--;
}
while((sq = (scamper_queue_t *)dlist_head_pop(probe_queue)) != NULL)
{
sq->queue = NULL;
sq->node = NULL;
count--;
}
return;
}
int scamper_queue_count()
{
return count;
}
/*
* scamper_queue_alloc
*
* allocate a queue object to use with a task. set the task's queue
* pointer to the freshly allocated object
*/
int scamper_queue_alloc(struct scamper_task *task)
{
scamper_queue_t *sq;
/* refuse to allocate a queue node if there is no task associated */
if(task == NULL)
{
return -1;
}
/* allocate the queue object and set the queue pointers appropriately */
if((sq = malloc_zero(sizeof(scamper_queue_t))) != NULL)
{
sq->task = task;
task->queue = sq;
return 0;
}
return -1;
}
/*
* scamper_queue_free
*
* free the queue object. make sure that the task's queue pointer is
* set to null.
*/
void scamper_queue_free(scamper_queue_t *sq)
{
if(sq != NULL)
{
if(sq->task != NULL) sq->task->queue = NULL;
queue_unlink(sq);
free(sq);
}
return;
}
int scamper_queue_init()
{
if((probe_queue = dlist_alloc()) == NULL)
{
return -1;
}
if((wait_queue = heap_alloc(queue_cmp)) == NULL)
{
return -1;
}
if((done_queue = heap_alloc(queue_cmp)) == NULL)
{
return -1;
}
return 0;
}
void scamper_queue_cleanup()
{
if(done_queue != NULL)
{
heap_free(done_queue, NULL);
done_queue = NULL;
}
if(wait_queue != NULL)
{
heap_free(wait_queue, NULL);
wait_queue = NULL;
}
if(probe_queue != NULL)
{
dlist_free(probe_queue);
probe_queue = NULL;
}
return;
}
syntax highlighted by Code2HTML, v. 0.9.1