/*
* scamper_addresslist
*
* $Id: scamper_addresslist.c,v 1.115 2007/05/10 01:18:12 mjl Exp $
*
* this code deals with storing trace objects with their associated lists.
*
* Copyright (C) 2004-2007 The University of Waikato
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include <sys/param.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#if defined(__APPLE__)
#include <stdint.h>
#endif
#include <assert.h>
#if defined(DMALLOC)
#include <dmalloc.h>
#endif
#include "scamper.h"
#include "scamper_addr.h"
#include "scamper_list.h"
#include "scamper_tlv.h"
#include "scamper_trace.h"
#include "scamper_debug.h"
#include "scamper_file.h"
#include "scamper_outfiles.h"
#include "scamper_target.h"
#include "scamper_task.h"
#include "scamper_addresslist.h"
#include "scamper_fds.h"
#include "scamper_linepoll.h"
#include "scamper_privsep.h"
#include "scamper_addr.h"
#include "scamper_do_trace.h"
#include "scamper_ping.h"
#include "scamper_do_ping.h"
#include "scamper_cyclemon.h"
#include "utils.h"
#include "mjl_list.h"
#include "mjl_splaytree.h"
typedef void *(*alloctask_t)(void *data, scamper_list_t *, scamper_cycle_t *);
typedef void *(*freedata_t)(void *data);
/*
* command
*
* this structure is used to hold information for each command found in
* a source's command list.
*/
typedef struct command
{
/* the type of command */
uint8_t type;
#define command_probe_addr command_un.probe.addr
#define command_probe_cyclemon command_un.probe.cyclemon
#define command_probe_data command_un.probe.data
#define command_probe_alloctask command_un.probe.alloctask
#define command_probe_freedata command_un.probe.freedata
union
{
/* hold the parameters for a probe command */
struct command_probe_s
{
/* target address */
scamper_addr_t *addr;
/* cycle */
scamper_cyclemon_t *cyclemon;
/* data parameter - a preinitialised structure with all parameters set */
void *data;
/* function to allocate a scamper task using the data parameter */
alloctask_t alloctask;
/* function to free the data pointed to above */
freedata_t freedata;
} probe;
#define command_cycle_cycle command_un.cycle.cycle
/* hold the parameters for a cycle command */
struct command_cycle_s
{
/* the new cycle */
scamper_cycle_t *cycle;
} cycle;
} command_un;
} command_t;
#define COMMAND_PROBE 0x00
#define COMMAND_CYCLE 0x01
/*
* on_hold
*
* structure to keep details on the command on hold.
*/
typedef struct on_hold
{
/* pointer to the task that has the lock on the address */
scamper_task_t *task;
/* pointer to the source list that wants the address to be probed */
scamper_source_t *source;
/* pointer to the command that is waiting on the task to complete */
command_t *command;
/* pointer to the node held in the task's on-hold list */
void *task_cookie;
/* pointer to the node held in the source's on-hold list */
dlist_node_t *node;
} on_hold_t;
/*
* alf
*
* structure to keep state when reading an file of commands.
*/
typedef struct alf
{
/* back pointer to the source struct that receives each command */
scamper_source_t *source;
/* the file descriptor that scamper is monitoring */
scamper_fd_t *fd;
/* structure to break up a chunk of an ascii file into lines */
scamper_linepoll_t *lp;
} alf_t;
/*
* source_file
*
* structure to keep state on a source_file. this structure is used when
* a source is defined to cycle over a file. this sturcture is not used when
* a source is adhoc.
*/
typedef struct source_file
{
/* the name of the file */
char *filename;
/* a count of the number of cycles left to take through the list */
int cycles;
/* if set, the list should be reloaded at the next cycle */
int reload;
/* if set, the list should be refreshed each cycle if it has changed */
int autoreload;
/* the modification time of the file when it was last read */
time_t mtime;
} source_file_t;
/*
* scamper_source
*
* struct that keeps a record of the various address sources that scamper
* is dealing with at present.
*
*/
struct scamper_source
{
/* reference count to decide when the source is no longer required */
int refcnt;
/* mix priority of this source */
int priority;
/* list of commands queued */
slist_t *commands;
/* list of commands that are on hold until some external event completes */
dlist_t *on_hold;
/*
* the next two variables define the current state of the source based on
* the queue it is in. if set, the source is either in the active or
* blocked state. the first parameter defines which of the two lists it
* is in, while the second parameter defines the list node the source has
* been assigned.
*/
void *l;
void *ln;
/* default command to use where none is otherwise specified */
char *command;
size_t command_len;
/* the details of the list */
scamper_list_t *list;
/* pointer to a structure that monitors when to write a cycle_stop record */
scamper_cycle_t *cycle;
scamper_cyclemon_t *cyclemon;
/* where the data should be written, if the source has a preference */
scamper_outfile_t *sof;
/* how many cycle points are defined */
int cycle_points;
/*
* if set, this source is adhoc; that is, it does not have a defined
* command source. an adhoc source may have commands that originate in
* files, or are specified on the control socket. if a source is not
* adhoc, it may only obtain its commands from a single file, specified
* below.
*/
int adhoc;
/*
* the name of the input file that the source gets its commands from.
* if set, the source does not take commands from other files, or on an
* adhoc basis.
*/
source_file_t *file;
/* maintain state when a file is being parsed for commands */
alf_t *alf;
};
/*
* source_observe
*
*/
typedef struct source_observe
{
scamper_source_event_func_t func;
void *param;
dlist_node_t *node;
} source_observe_t;
/*
* global variables for managing sources:
*
* a source is stored in one of two lists depending on its state. it is
* either stored in the active list, a round-robin circular list, or in
* the blocked list.
*
* the source, if any, currently being used (that is, has not used up its
* priority quantum) is pointed to by source_cur. the number of tasks that
* have been read from the current source in this rotation is held in
* source_cnt.
*
* there are two special sources. the default source is mostly used for
* handling tasks passed in on the command line. the preempt source is used
* for handling high-priority tasks.
*
* the sources are stored in a tree that is searchable by name. the tree
* does not include special sources.
*/
static clist_t *active = NULL;
static dlist_t *blocked = NULL;
static scamper_source_t *source_cur = NULL;
static int source_cnt = 0;
static scamper_source_t *source_def = NULL;
static splaytree_t *source_tree = NULL;
/*
* scamper provides the ability for external monitoring of source events.
* these observers are held in a list.
*/
static dlist_t *observers = NULL;
/*
* keep state to ensure that stdin is only ever supplied as a target
* address source once.
*/
static int stdin_used = 0;
/* global temporary buf for assembling commands */
static char *command_buf = NULL;
static size_t command_len = 0;
static int source_event_post_cb(void *item, void *param)
{
scamper_source_event_t *event = (scamper_source_event_t *)param;
source_observe_t *observe = (source_observe_t *)item;
observe->func(event, observe->param);
return 0;
}
static void source_event_post(scamper_source_t *source, int type,
scamper_source_event_t *ev)
{
scamper_source_event_t sse;
struct timeval tv;
/* check if there is actually anyone observing */
if(observers == NULL)
{
return;
}
/* if null event, then create one from scratch */
if(ev == NULL)
{
memset(&sse, 0, sizeof(sse));
ev = &sse;
}
gettimeofday_wrap(&tv);
ev->source = source;
ev->event = type;
ev->sec = tv.tv_sec;
dlist_foreach(observers, source_event_post_cb, ev);
return;
}
/*
* source_cycle_finish
*
* when the last cycle is written to disk, we can start on the next cycle.
*/
static void source_cycle_finish(scamper_cycle_t *cycle,
scamper_source_t *source,
scamper_outfile_t *outfile)
{
scamper_file_t *sf;
struct timeval tv;
/* timestamp when the cycle ends */
gettimeofday_wrap(&tv);
cycle->stop_time = (uint32_t)tv.tv_sec;
/* write the cycle stop record out */
if(outfile != NULL)
{
sf = scamper_outfile_getfile(outfile);
scamper_file_write_cycle_stop(sf, cycle);
}
/* last task for this cycle, reduce count of cycle points */
if(source != NULL)
{
source->cycle_points--;
if(source->alf != NULL && source->cycle_points < 2)
{
scamper_fd_read_unpause(source->alf->fd);
}
}
return;
}
static void command_free(command_t *command)
{
switch(command->type)
{
case COMMAND_PROBE:
scamper_addr_free(command->command_probe_addr);
scamper_cyclemon_unuse(command->command_probe_cyclemon);
break;
case COMMAND_CYCLE:
break;
}
free(command);
return;
}
/*
* command_probe
*
* given the commands list and a target address, create a probe command
* and put it on the head / tail of the list depending on the value of
* the pushhead parameter; 0 means tail, 1 means head.
*/
static int command_probe(slist_t *commands, scamper_addr_t *addr,
scamper_cyclemon_t *cyclemon, void *data,
alloctask_t alloctask, freedata_t freedata)
{
command_t *command = NULL;
if((command = malloc_zero(sizeof(command_t))) == NULL)
{
goto err;
}
command->type = COMMAND_PROBE;
command->command_probe_addr = scamper_addr_use(addr);
command->command_probe_cyclemon = scamper_cyclemon_use(cyclemon);
command->command_probe_data = data;
command->command_probe_alloctask = alloctask;
command->command_probe_freedata = freedata;
if(slist_tail_push(commands, command) == NULL)
{
goto err;
}
return 0;
err:
if(command != NULL) command_free(command);
return -1;
}
/*
* command_cycle
*
* given the commands list, append a cycle command to it.
*/
static int command_cycle(scamper_source_t *source, scamper_cycle_t *cycle)
{
command_t *command = NULL;
if((command = malloc_zero(sizeof(command_t))) == NULL)
{
goto err;
}
command->type = COMMAND_CYCLE;
command->command_cycle_cycle = cycle;
if(slist_tail_push(source->commands, command) == NULL)
{
goto err;
}
source->cycle_points++;
return 0;
err:
if(command != NULL) command_free(command);
return -1;
}
/*
* source_cycle
*
* allocate and initialise a cycle start object for the source.
* write the cycle start to disk.
*/
static int source_cycle(scamper_source_t *source, uint32_t cycle_id)
{
scamper_cyclemon_t *cyclemon = NULL;
scamper_cycle_t *cycle = NULL;
/* allocate the new cycle object */
if((cycle = scamper_cycle_alloc(source->list)) == NULL)
{
goto err;
}
/* assign the cycle id */
cycle->id = cycle_id;
/* allocate structure to monitor references to the new cycle */
if((cyclemon = scamper_cyclemon_alloc(cycle,
source_cycle_finish,
source,
source->sof)) == NULL)
{
goto err;
}
/* append the cycle record to the source's commands list */
if(command_cycle(source, cycle) != 0)
{
goto err;
}
/*
* if there is a previous cycle object associated with the source, then
* free that. also free the cyclemon.
*/
if(source->cycle != NULL)
{
scamper_cycle_free(source->cycle);
}
if(source->cyclemon != NULL)
{
scamper_cyclemon_unuse(source->cyclemon);
}
/* store the cycle and we're done */
source->cycle = cycle;
source->cyclemon = cyclemon;
return 0;
err:
if(cyclemon != NULL) scamper_cyclemon_free(cyclemon);
if(cycle != NULL) scamper_cycle_free(cycle);
return -1;
}
/*
* source_file_open
*
* open the filename specified. set it to non-blocking mode.
* return the fd to the caller.
*/
static int source_file_open(char *filename)
{
int fd = -1;
if(strcmp(filename, "-") == 0)
{
if(stdin_used == 0)
{
if((fd = fileno(stdin)) == -1)
{
goto err;
}
stdin_used = 1;
}
else
{
goto err;
}
}
else
{
#if defined(WITHOUT_PRIVSEP)
fd = open(filename, O_RDONLY);
#else
fd = scamper_privsep_open_file(filename, O_RDONLY, 0);
#endif
/* make sure the fd is valid, otherwise bail */
if(fd == -1)
{
goto err;
}
}
/* we don't want reads on this file descriptor to block */
if(fcntl_set(fd, O_NONBLOCK) == -1)
{
goto err;
}
return fd;
err:
if(fd != -1) close(fd);
return -1;
}
/*
* source_cmp
*
* provide a sorting function for storing sources in a splay tree
*/
static int source_cmp(const void *a, const void *b)
{
return strcasecmp(((const scamper_source_t *)b)->list->name,
((const scamper_source_t *)a)->list->name);
}
/*
* source_next
*
* advance to the next source to read addresses from, and reset the
* current count of how many addresses have been returned off the list
* for this source-cycle
*/
static scamper_source_t *source_next(void)
{
void *node;
if((node = clist_node_next(source_cur->ln)) != source_cur->ln)
{
source_cur = clist_node_item(node);
}
source_cnt = 0;
return source_cur;
}
/*
* source_active_detach
*
* detach the source out of the active list. move to the next source
* if it is the current source that is being read from.
*/
static void source_active_detach(scamper_source_t *source)
{
void *node;
assert(source->l == active);
if((node = clist_node_next(source->ln)) != source->ln)
{
source_cur = clist_node_item(node);
}
else
{
source_cur = NULL;
}
source_cnt = 0;
clist_node_pop(active, source->ln);
source->l = NULL;
source->ln = NULL;
return;
}
/*
* source_blocked_detach
*
* detach the source out of the blocked list.
*/
static void source_blocked_detach(scamper_source_t *source)
{
assert(source->l == blocked);
dlist_node_pop(blocked, source->ln);
source->ln = NULL;
source->l = NULL;
return;
}
/*
* source_active_attach
*
* some condition has changed, which may mean the source can go back onto
* the active list for use by the probing process.
*
* a caller MUST NOT assume that the source will necessarily end up on the
* active list after calling this function. for example, source_active_attach
* may be called when new tasks are added to the command list. however, the
* source may have a zero priority, which means probing this source is
* currently paused.
*/
static int source_active_attach(scamper_source_t *source)
{
if(source->l == active)
{
return 0;
}
if(source->l == blocked)
{
/* if the source has a zero priority, it must remain blocked */
if(source->priority == 0)
{
return 0;
}
source_blocked_detach(source);
}
if((source->ln = clist_tail_push(active, source)) == NULL)
{
return -1;
}
source->l = active;
if(source_cur == NULL)
{
source_cur = source;
source_cnt = 0;
}
return 0;
}
/*
* source_blocked_attach
*
* put the specified source onto the blocked list.
*/
static int source_blocked_attach(scamper_source_t *source)
{
if(source->l == blocked)
{
return 0;
}
if(source->l != NULL)
{
source_active_detach(source);
}
if((source->ln = dlist_tail_push(blocked, source)) == NULL)
{
return -1;
}
source->l = blocked;
return 0;
}
/*
* addresslist_unhold
*
* callback function that is used to take an on_hold structure and put
* the command back on the source's command list. finally, it puts the
* source back on the active list (if it is not already).
*/
static void addresslist_unhold(void *cookie)
{
on_hold_t *on_hold = (on_hold_t *)cookie;
scamper_source_t *source = on_hold->source;
command_t *command = on_hold->command;
/* remove the source's copy of the on-hold structure */
dlist_node_pop(source->on_hold, on_hold->node);
free(on_hold);
/* push the command onto the front of the list */
slist_head_push(source->commands, command);
/* put the source on the active list */
source_active_attach(source);
return;
}
/*
* addresslist_onhold
*
* the command (pointed to by command) cannot run right now because another
* task (pointed to by task) is busy probing the same target. put it on hold,
* and note which source (pointed to by source) the command originates from.
*/
static int addresslist_onhold(scamper_source_t *source, scamper_task_t *task,
command_t *command)
{
on_hold_t *on_hold;
/* allocate the on-hold structure, and set up the basic parameters */
if((on_hold = malloc_zero(sizeof(on_hold_t))) == NULL)
{
return -1;
}
on_hold->task = task;
on_hold->source = source;
on_hold->command = command;
/* put the structure on the the source's onhold list */
if((on_hold->node = dlist_tail_push(source->on_hold, on_hold)) == NULL)
{
goto cleanup;
}
/*
* register the on-hold structure with the task we're blocked on, so that
* when the task completes it can wake up the source (by using the
* addresslist_unhold callback function).
*/
if((on_hold->task_cookie =
scamper_task_onhold(task, on_hold, addresslist_unhold)) == NULL)
{
goto cleanup;
}
return 0;
cleanup:
if(on_hold->node != NULL) dlist_node_pop(source->on_hold, on_hold->node);
free(on_hold);
return -1;
}
/*
* source_command
*
*/
static int source_command(scamper_source_t *source, char *command)
{
char *opts;
alloctask_t alloctask;
freedata_t freedata;
scamper_trace_t *trace;
scamper_ping_t *ping;
scamper_addr_t *addr;
void *data;
opts = string_nextword(command);
if(strcasecmp(command, "trace") == 0)
{
if((trace = scamper_do_trace_alloc(opts)) == NULL)
{
return -1;
}
alloctask = (alloctask_t)scamper_do_trace_alloctask;
freedata = (freedata_t)scamper_trace_free;
data = trace;
addr = trace->dst;
}
else if(strcasecmp(command, "ping") == 0)
{
if((ping = scamper_do_ping_alloc(opts)) == NULL)
{
return -1;
}
alloctask = (alloctask_t)scamper_do_ping_alloctask;
freedata = (freedata_t)scamper_ping_free;
data = ping;
addr = ping->dst;
}
else goto err;
if(source == NULL) source = source_def;
if(command_probe(source->commands, addr, source->cyclemon, data,
alloctask, freedata) != 0)
{
goto err;
}
source_active_attach(source);
return 0;
err:
return -1;
}
static int command_assemble(char *cmd, size_t cmd_len,
char *addr, size_t addr_len)
{
size_t reqd_len;
void *tmp;
/* allocate buffer large enough, if required */
if(command_len < (reqd_len = cmd_len + 1 + addr_len + 1))
{
if((tmp = malloc(reqd_len)) == NULL)
{
return -1;
}
if(command_buf != NULL)
{
free(command_buf);
}
command_buf = tmp;
command_len = reqd_len;
}
/* assemble the command string */
memcpy(command_buf, cmd, cmd_len);
command_buf[cmd_len] = ' ';
memcpy(&command_buf[cmd_len+1], addr, addr_len+1);
return 0;
}
/*
* alf_free
*
* free up all resources related to an address-list-file.
* if feedlastline is set, then any partial line is parsed.
* if closefd is set, then the file descriptor is closed.
*/
static void alf_free(alf_t *alf, int feedlastline)
{
int fd = -1;
if(alf->lp != NULL)
{
scamper_linepoll_free(alf->lp, feedlastline);
}
if(alf->fd != NULL)
{
fd = scamper_fd_fd_get(alf->fd);
scamper_fd_free(alf->fd);
}
if(fd != -1)
{
close(fd);
}
free(alf);
return;
}
/*
* alf_read_line
*
* this callback receives a single line per call, which should contain an
* address in string form. it combines that address with the source's
* default command and then passes the string to source_command for further
* processing. the line eventually ends up in the commands queue.
*/
static int alf_read_line(void *param, uint8_t *buf, size_t len)
{
alf_t *alf = (alf_t *)param;
scamper_source_t *source = alf->source;
char *str = (char *)buf;
/* make sure the string contains only printable characters */
if(string_isprint(str, len) == 0)
{
return -1;
}
/* null terminate at these characters */
string_nullterm(str, " \r\t#");
/* make sure the line isn't blank or a comment line */
if(str[0] == '\0' || str[0] == '#')
{
return 0;
}
if(command_assemble(source->command, source->command_len, str, len) != 0)
{
return -1;
}
/* resolve the address */
if(source_command(source, command_buf) == -1)
{
return -1;
}
return 0;
}
/*
* alf_read
*
* this callback is called when the fd has something ready to read.
*/
static void alf_read(const int fd, void *param)
{
alf_t *alf = (alf_t *)param;
scamper_source_t *source = alf->source;
uint8_t buf[1024];
ssize_t rc;
time_t mtime;
int reload = 0;
int newfd;
if((rc = read(fd, buf, sizeof(buf))) > 0)
{
/* got data to read. parse the buffer for addresses, one per line. */
scamper_linepoll_handle(alf->lp, buf, (size_t)rc);
/*
* if probe queue for this source is sufficiently large, then
* don't read any more for the time being
*/
if(slist_count(source->commands) >= scamper_pps_get())
{
scamper_fd_read_pause(alf->fd);
}
}
else if(rc == 0 && (source->adhoc == 1 || source->file->cycles == 1))
{
/*
* when we get EOF on an adhoc source, or we complete the last cycle
* over an input file, the input file is closed
*/
alf_free(alf, 1);
source->alf = NULL;
}
else if(rc == 0 && source->adhoc == 0)
{
/* a cycle value of zero means cycle indefinitely */
if(source->file->cycles != 0)
{
source->file->cycles--;
}
/* decide if we should reload the file at this point */
if(source->file->reload == 1)
{
/* stat the file so we have an mtime value for later */
if(stat_mtime(source->file->filename, &mtime) == 0)
{
reload = 1;
}
}
else if(source->file->autoreload == 1)
{
/*
* reload is conditional on being able to stat the file, and the
* mtime being different to whatever our record of the mtime is
*/
if(stat_mtime(source->file->filename, &mtime) == 0 &&
source->file->mtime != mtime)
{
reload = 1;
}
}
/* we have to reload the file (if we can open it) */
if(reload == 1 && (newfd=source_file_open(source->file->filename)) != -1)
{
/* use the new file descriptor */
if(scamper_fd_fd_set(source->alf->fd, newfd) == -1)
{
goto err;
}
/* close the existing file */
close(fd);
/* update file details; ensure reload is reset to zero */
source->file->mtime = mtime;
source->file->reload = 0;
}
else
{
/* rewind the current file position */
if(lseek(fd, 0, SEEK_SET) == -1)
{
goto err;
}
}
/* check to see if we should pause, or allow reading to continue */
if(source->cycle_points < 1)
{
scamper_fd_read_unpause(source->alf->fd);
}
else
{
scamper_fd_read_pause(source->alf->fd);
}
/* create a new cycle record, etc */
if(source_cycle(source, source->cycle->id + 1) == -1)
{
goto err;
}
}
else if(rc == -1 && errno != EAGAIN)
{
printerror(errno, strerror, __func__, "read failed");
goto err;
}
return;
err:
alf_free(alf, 0);
source->alf = NULL;
return;
}
static alf_t *alf_alloc(int fd, scamper_source_t *source)
{
alf_t *alf = NULL;
if((alf = malloc_zero(sizeof(alf_t))) == NULL)
{
goto err;
}
if((alf->fd = scamper_fd_private(fd, alf_read, alf, NULL, NULL)) == NULL)
{
goto err;
}
if((alf->lp = scamper_linepoll_alloc(alf_read_line, alf)) == NULL)
{
goto err;
}
alf->source = source;
return alf;
err:
if(alf != NULL)
{
if(alf->fd != NULL) scamper_fd_free(alf->fd);
if(alf->lp != NULL) scamper_linepoll_free(alf->lp, 0);
free(alf);
}
return NULL;
}
/*
* source_free
*
* clean up the source
*/
static void source_free(scamper_source_t *source)
{
command_t *command;
on_hold_t *onhold;
if(source->cyclemon != NULL)
{
scamper_cyclemon_source_detach(source->cyclemon);
scamper_cyclemon_unuse(source->cyclemon);
source->cyclemon = NULL;
}
/* detach the source from whatever list it is in */
if(source->l == active)
{
source_active_detach(source);
}
else if(source->l == blocked)
{
source_blocked_detach(source);
}
/*
* empty the source of any addresses that are currently on hold.
* this occurs before we flush the commands list as it is possible that
* an on-hold address might end up in the command list for this source.
*/
if(source->on_hold != NULL)
{
while((onhold = dlist_head_pop(source->on_hold)) != NULL)
{
scamper_task_dehold(onhold->task, onhold->task_cookie);
free(onhold);
}
dlist_free(source->on_hold);
}
/* now empty the list of commands */
if(source->commands != NULL)
{
while((command = slist_head_pop(source->commands)) != NULL)
{
if(command->type == COMMAND_PROBE)
{
command->command_probe_freedata(command->command_probe_data);
}
command_free(command);
}
slist_free(source->commands);
}
/* release this structure's hold on the scamper_outfile */
if(source->sof != NULL) scamper_outfile_free(source->sof);
/* remove the source from the source tree */
splaytree_remove_item(source_tree, source);
/* free up details of the source file */
if(source->file != NULL)
{
if(source->file->filename != NULL) free(source->file->filename);
free(source->file);
}
if(source->alf != NULL) alf_free(source->alf, 0);
if(source->command != NULL) free(source->command);
if(source->list != NULL) scamper_list_free(source->list);
if(source->cycle != NULL) scamper_cycle_free(source->cycle);
free(source);
return;
}
scamper_source_t *scamper_source_use(scamper_source_t *source)
{
source->refcnt++;
return source;
}
void scamper_source_unuse(scamper_source_t *source)
{
/*
* decrement the reference count. check to see if the reference count
* shows there are active tasks probing. if there are active tasks, then
* return now.
*/
if(--source->refcnt > 1)
{
return;
}
/*
* if the source is an adhoc source, then don't remove it; other commands
* may be supplied in the future
*/
if(source->adhoc == 1)
{
return;
}
/* if there are more cycles to come, then don't remove the source */
if(source->file->cycles != 1)
{
return;
}
/*
* do not remove a source that has targets queued, or has the possibility
* of having targets queued
*/
if(slist_count(source->commands) > 0 || source->alf != NULL ||
dlist_count(source->on_hold) > 0)
{
return;
}
source_event_post(source, SCAMPER_SOURCE_EVENT_FINISH, NULL);
source_free(source);
return;
}
void *scamper_addresslist_observe(scamper_source_event_func_t func, void *p)
{
source_observe_t *observe = NULL;
if(observers == NULL && (observers = dlist_alloc()) == NULL)
{
goto err;
}
if((observe = malloc_zero(sizeof(source_observe_t))) == NULL)
{
goto err;
}
observe->func = func;
observe->param = p;
if((observe->node = dlist_tail_push(observers, observe)) == NULL)
{
goto err;
}
return observe;
err:
if(observe != NULL)
{
if(observe->node != NULL) dlist_node_pop(observers, observe->node);
free(observe);
}
return NULL;
}
void scamper_addresslist_unobserve(void *handle)
{
source_observe_t *observe = (source_observe_t *)handle;
if(observers == NULL)
{
return;
}
/* free the node */
dlist_node_pop(observers, observe->node);
free(observe);
/* if there are no other observations, free the list as well */
if(dlist_count(observers) == 0)
{
dlist_free(observers);
observers = NULL;
}
return;
}
uint32_t scamper_source_getlistid(const scamper_source_t *source)
{
return source->list->id;
}
uint32_t scamper_source_getcycleid(const scamper_source_t *source)
{
return source->cycle->id;
}
const char *scamper_source_getname(const scamper_source_t *source)
{
return source->list->name;
}
const char *scamper_source_getdescr(const scamper_source_t *source)
{
return source->list->descr;
}
int scamper_source_getpriority(const scamper_source_t *source)
{
return source->priority;
}
int scamper_source_getadhoc(const scamper_source_t *source)
{
return source->adhoc;
}
scamper_outfile_t *scamper_source_getoutfile(const scamper_source_t *s)
{
/* if there is no defined outfile, then the default outfile will be used */
if(s->sof == NULL)
{
/* passing NULL means the default outfile */
return scamper_outfiles_get(NULL);
}
return s->sof;
}
const char *scamper_source_getfilename(const scamper_source_t *source)
{
if(source->file == NULL) return NULL;
return source->file->filename;
}
int scamper_source_getcycles(const scamper_source_t *source)
{
if(source->file == NULL) return 1;
return source->file->cycles;
}
int scamper_source_getautoreload(const scamper_source_t *source)
{
if(source->file == NULL) return 0;
return source->file->autoreload;
}
int scamper_source_update(scamper_source_t *source,
const int *autoreload, const int *cycles,
const int *priority)
{
scamper_source_event_t sse;
int old_priority;
/* first, ensure there is actually some parameter to update */
if(autoreload == NULL && cycles == NULL && priority == NULL)
{
return 0;
}
/*
* second, sanity check the parameters;
*
* if there is no file, then can't update autoreload.
* if there is no file, then can't update cycle count.
* negative priority forbidden
*/
if((autoreload != NULL && source->file == NULL) ||
(cycles != NULL && source->file == NULL) ||
(priority != NULL && *priority < 0))
{
return -1;
}
/* build up an event structure to post */
memset(&sse, 0, sizeof(sse));
if(autoreload != NULL)
{
sse.sse_update_flags |= 0x01;
sse.sse_update_autoreload = *autoreload;
source->file->autoreload = *autoreload;
}
if(cycles != NULL)
{
sse.sse_update_flags |= 0x02;
sse.sse_update_cycles = *cycles;
source->file->cycles = *cycles;
}
if(priority != NULL)
{
sse.sse_update_flags |= 0x04;
sse.sse_update_priority = *priority;
/* swap the priorities around */
old_priority = source->priority;
source->priority = *priority;
/* if priority is set to zero */
if(*priority == 0 && old_priority > 0)
{
source_blocked_attach(source);
}
/* else if the priority is raised from zero */
else if(*priority > 0 && old_priority == 0)
{
source_active_attach(source);
}
}
source_event_post(source, SCAMPER_SOURCE_EVENT_UPDATE, &sse);
return 0;
}
/*
* scamper_source_do
*/
int scamper_source_do(scamper_source_t *source, char *command)
{
return source_command(source, command);
}
int scamper_source_do_array(scamper_source_t *source, char *command,
char **iparray, int ipcount)
{
size_t cmd_len;
int i;
/* need a command to go with the array */
if(command == NULL && (command = source->command) == NULL)
{
return -1;
}
cmd_len = strlen(command);
for(i=0; i<ipcount; i++)
{
if(command_assemble(command,cmd_len,iparray[i],strlen(iparray[i])) != 0)
{
return -1;
}
if(source_command(source, command_buf) != 0)
{
return -1;
}
}
return 0;
}
/*
* scamper_source_do_file
*
* queue the contents of the specified file for reading on an ad-hoc
* list.
*/
int scamper_source_do_file(scamper_source_t *source, char *filename)
{
int fd = -1;
if(source == NULL) source = source_def;
/* only adhoc sources may have files queued, and only one file at a time */
if(source->adhoc != 1 || source->alf != NULL)
{
goto err;
}
if((fd = source_file_open(filename)) == -1)
{
goto err;
}
if((source->alf = alf_alloc(fd, source)) == NULL)
{
goto err;
}
return 0;
err:
if(fd != -1) close(fd);
return -1;
}
/*
* scamper_addresslist_addsource
*
* link an address list into rotation.
*/
scamper_source_t *scamper_addresslist_addsource(scamper_source_params_t *ssp)
{
scamper_source_t *source = NULL;
int fd = -1;
int is_default = 0;
const char *def;
int cycleid;
if((def = scamper_option_listname()) == NULL)
{
def = "default";
}
if(strcasecmp(ssp->name, def) == 0)
{
is_default = 1;
}
/*
* don't let a source named default get into the tree if there already
* is a default source
*/
if(is_default && source_def != NULL)
{
goto err;
}
/* if a source is not 'ad-hoc', then it has to specify a file */
if(ssp->adhoc == 0 && ssp->filename == NULL)
{
goto err;
}
if((source = malloc_zero(sizeof(scamper_source_t))) == NULL)
{
goto err;
}
source->refcnt = 1;
source->priority = ssp->priority;
source->adhoc = ssp->adhoc;
/* set the default command to use with the source */
if((source->command = strdup(ssp->command != NULL ?
ssp->command : scamper_command_get())) == NULL)
{
goto err;
}
source->command_len = strlen(source->command);
/* if we have been passed an output file to target, then use it */
if(ssp->sof != NULL)
{
source->sof = scamper_outfile_use(ssp->sof);
}
/* the targets list is an ordered list of addresses to probe */
if((source->commands = slist_alloc()) == NULL)
{
goto err;
}
/* the on-hold list is a collection of on_hold records for this source */
if((source->on_hold = dlist_alloc()) == NULL)
{
goto err;
}
/*
* this structure defines the source where a series of traces came
* from. it is shared by the source and by the traces that reference
* it.
*/
if((source->list = scamper_list_alloc(ssp->list_id, ssp->name, ssp->descr,
scamper_monitorname_get())) == NULL)
{
goto err;
}
/* initialise the cycle record */
if(is_default == 1)
{
if((cycleid = scamper_option_cycleid()) >= 0 &&
source_cycle(source, cycleid) == -1)
{
goto err;
}
}
else
{
if(source_cycle(source, ssp->cycle_id) == -1)
{
goto err;
}
}
/* if there is a file to initialise the source with, then open it now */
if(ssp->filename != NULL)
{
if((fd = source_file_open(ssp->filename)) == -1)
{
goto err;
}
if((source->alf = alf_alloc(fd, source)) == NULL)
{
goto err;
}
}
/*
* if the source is a managed list, then we need to keep state on the
* destination list file.
*/
if(source->adhoc == 0)
{
if((source->file = malloc_zero(sizeof(source_file_t))) == NULL)
{
goto err;
}
if(fstat_mtime(fd, &source->file->mtime) == -1)
{
goto err;
}
if((source->file->filename = strdup(ssp->filename)) == NULL)
{
goto err;
}
source->file->autoreload = ssp->autoreload;
source->file->cycles = ssp->cycles;
}
/*
* put the source into the splaytree, where it can be found quickly
* when needed.
*/
if(splaytree_insert(source_tree, source) == NULL)
{
goto err;
}
/*
* put the source in the blocked list, as there currently is no
* addresses ready to be probed at this time.
*/
if(is_default == 0 && source_blocked_attach(source) == -1)
{
goto err;
}
source_event_post(source, SCAMPER_SOURCE_EVENT_ADD, NULL);
return source;
err:
if(fd != -1) close(fd);
source_free(source);
return NULL;
}
/*
* scamper_addresslist_getsource
*
* look for the address list specified by the name passed
*/
scamper_source_t *scamper_addresslist_getsource(char *name)
{
scamper_source_t findme;
scamper_list_t list;
list.name = name;
findme.list = &list;
return (scamper_source_t *)splaytree_find(source_tree, &findme);
}
/*
* scamper_addressslist_delsource
*
* remove the source from the list of available address sources if
* possible
*/
int scamper_addresslist_delsource(scamper_source_t *source)
{
const char *def;
if((def = scamper_option_listname()) == NULL)
{
def = "default";
}
/* the default source cannot be removed */
if(strcasecmp(source->list->name, def) == 0)
{
return -1;
}
/* if there are external references to the source, then don't free it */
if(source->refcnt > 1)
{
return -1;
}
source_event_post(source, SCAMPER_SOURCE_EVENT_DELETE, NULL);
source_free(source);
return 0;
}
/*
* scamper_addresslist_cyclesource
*
* if the source is an adhoc source [and it isn't the default source]
* then append a cycle command to the source.
*/
int scamper_addresslist_cyclesource(scamper_source_t *source)
{
/* check that the source is not managed / is not the default list */
if(source->adhoc == 0 || source == source_def)
{
return -1;
}
/* append the cycle command */
if(source_cycle(source, source->cycle->id + 1) == -1)
{
return -1;
}
source_active_attach(source);
return 0;
}
/*
* scamper_addresslist_empty
*
* remove all addresses and sources in the address list
* XXX: fix up to use source reference counting properly
*/
void scamper_addresslist_empty()
{
scamper_source_t *source;
/*
* for each source, go through and empty the lists, close the files, and
* leave the list of sources available to read from empty.
*/
while((source = dlist_tail_pop(blocked)) != NULL)
{
source->l = NULL; source->ln = NULL;
source_free(source);
}
while((source = clist_tail_pop(active)) != NULL)
{
source->l = NULL; source->ln = NULL;
source_free(source);
}
return;
}
void scamper_addresslist_foreach(void *param,
int (*func)(void *p, scamper_source_t *src))
{
splaytree_inorder(source_tree, (splaytree_inorder_t)func, param);
return;
}
/*
* command_probe_handle
*
*
*/
static int command_probe_handle(scamper_source_t *source, command_t *command,
scamper_task_t **task_out)
{
scamper_target_t *target;
scamper_cycle_t *cycle;
scamper_task_t *task;
/*
* if this command is blocked, then we can't allocate the caller a task.
* put the command on hold, and return NULL in task_out.
*/
if((target = scamper_target_find(command->command_probe_addr)) != NULL)
{
addresslist_onhold(source, target->task, command);
*task_out = NULL;
return 0;
}
/* get a pointer to the cycle for *this* task */
cycle = scamper_cyclemon_cycle(command->command_probe_cyclemon);
/* allocate the task structure to keep everything together */
if((task = command->command_probe_alloctask(command->command_probe_data,
source->list, cycle)) == NULL)
{
command_free(command);
return -1;
}
/* pass the cyclemon structure to the task */
task->cyclemon = scamper_cyclemon_use(command->command_probe_cyclemon);
/* pass reference to the file to write the task to */
task->source = scamper_source_use(source);
/* don't need the command any more */
command_free(command);
/* return to the caller the task we allocated */
*task_out = task;
return 0;
}
static int command_cycle_handle(scamper_source_t *source, command_t *command)
{
scamper_source_event_t sse;
scamper_cycle_t *cycle = command->command_un.cycle.cycle;
scamper_file_t *file;
struct timeval tv;
char hostname[MAXHOSTNAMELEN];
/* get the hostname of the system for the cycle point */
if(gethostname(hostname, sizeof(hostname)) == 0)
{
cycle->hostname = strdup(hostname);
}
/* get a timestamp for the cycle start point */
gettimeofday_wrap(&tv);
cycle->start_time = (uint32_t)tv.tv_sec;
/* write a cycle start point to disk if there is a file to do so */
if(source->sof != NULL &&
(file = scamper_outfile_getfile(source->sof)) != NULL)
{
scamper_file_write_cycle_start(file, cycle);
}
/* post an event saying the cycle point just rolled around */
sse.sse_cycle_cycle_id = cycle->id;
source_event_post(source, SCAMPER_SOURCE_EVENT_CYCLE, &sse);
command_free(command);
return 0;
}
/*
* scamper_addresslist_get
*
* return the next address to trace.
*
* note that if this function returns null, it does not necessarily infer
* that there are no more addresses to trace; if there are addresses
* currently on-hold that are blocked on other traces in the active window
* and no other traces to enqueue then this function will return null.
*
* XXX: replace head_pop with head_node, and then head_pop when the item
* is successfully returned.
*/
scamper_task_t *scamper_addresslist_get(void)
{
scamper_source_t *source;
scamper_task_t *task;
command_t *command;
/*
* if the priority of the source was changed in between calls to this
* function, then make sure the source's priority hasn't been lowered to
* below how many tasks it has had allocated in this cycle
*/
if(source_cur != NULL && source_cnt >= source_cur->priority)
{
source_next();
}
if((source = source_cur) == NULL)
{
return NULL;
}
/*
* if the number of available tasks is less than the packets-per-second
* rate, and we're not too far ahead, then queue the source to read more
* tasks.
*/
if(source->alf != NULL && source->cycle_points < 2 &&
slist_count(source->commands) < scamper_pps_get())
{
scamper_fd_read_unpause(source->alf->fd);
}
while((source = source_cur) != NULL)
{
assert(source->priority != 0);
/* fetch commands off until we have a task to return */
while((command = slist_head_pop(source->commands)) != NULL)
{
if(command->type == COMMAND_PROBE)
{
if(command_probe_handle(source, command, &task) == -1)
{
return NULL;
}
else if(task == NULL)
{
continue;
}
source_cnt++;
return task;
}
else if(command->type == COMMAND_CYCLE)
{
command_cycle_handle(source, command);
}
}
/* the previous source could not supply a command */
assert(slist_count(source->commands) == 0);
/*
* if there is still the possibility of getting an address
* out of the source then put the list in the blocked list.
*/
if(source->alf != NULL || dlist_count(source->on_hold) > 0)
{
source_blocked_attach(source);
}
/*
* if the source is the default list, then we merely detach it
* from the active list and leave it 'dangling' in that it is not
* in either of the active or blocked lists.
*/
else if(source == source_def)
{
source_active_detach(source);
}
/*
* put the source onto the blocked list for now.
*/
else
{
source_blocked_attach(source);
}
}
/* there shouldn't be any active address list sources available ... */
assert(clist_count(active) == 0);
return NULL;
}
/*
* scamper_addresslist_isempty
*
* return to the caller if the address list is empty or not.
*/
int scamper_addresslist_isempty()
{
/*
* if there are either active or blocked address list sources, the list
* can't be empty
*/
if(clist_count(active) > 0 || dlist_count(blocked) > 0)
{
return 0;
}
/*
* if the default source is waiting for more addresses then the list
* can't be empty
*/
if(source_def->alf != NULL)
{
return 0;
}
return 1;
}
/*
* scamper_addresslist_isready
*
* return to the caller if the address list appears to be able to supply
* an address for probing.
*/
int scamper_addresslist_isready()
{
if(source_cur != NULL)
{
return 1;
}
return 0;
}
int scamper_addresslist_init()
{
scamper_source_params_t ssp;
const char *list_name;
int list_id;
if((active = clist_alloc()) == NULL)
{
return -1;
}
if((blocked = dlist_alloc()) == NULL)
{
return -1;
}
if((source_tree = splaytree_alloc(source_cmp)) == NULL)
{
return -1;
}
if((list_id = scamper_option_listid()) < 0)
{
list_id = 0;
}
if((list_name = scamper_option_listname()) == NULL)
{
list_name = "default";
}
/* create the default address list source and put it into rotation */
memset(&ssp, 0, sizeof(ssp));
ssp.list_id = list_id;
ssp.name = (char *)list_name;
ssp.descr = "default";
ssp.priority = 1;
ssp.adhoc = 1;
if((source_def = scamper_addresslist_addsource(&ssp)) == NULL)
{
return -1;
}
return 0;
}
void scamper_addresslist_cleanup()
{
struct timeval tv;
if(source_def != NULL)
{
if(source_def->cycle != NULL)
{
/* timestamp when the cycle ends */
gettimeofday_wrap(&tv);
source_def->cycle->stop_time = (uint32_t)tv.tv_sec;
scamper_file_write_cycle_stop(scamper_outfile_getfile(
scamper_outfiles_get(NULL)),
source_def->cycle);
}
source_free(source_def);
source_def = NULL;
}
if(source_tree != NULL)
{
splaytree_free(source_tree, NULL);
source_tree = NULL;
}
if(observers != NULL)
{
dlist_free(observers);
observers = NULL;
}
if(blocked != NULL)
{
dlist_free(blocked);
blocked = NULL;
}
if(active != NULL)
{
clist_free(active);
active = NULL;
}
if(command_buf != NULL)
{
free(command_buf);
command_buf = NULL;
}
return;
}
syntax highlighted by Code2HTML, v. 0.9.1