/* * scamper_addresslist * * $Id: scamper_addresslist.c,v 1.115 2007/05/10 01:18:12 mjl Exp $ * * this code deals with storing trace objects with their associated lists. * * Copyright (C) 2004-2007 The University of Waikato * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, version 2. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(__APPLE__) #include #endif #include #if defined(DMALLOC) #include #endif #include "scamper.h" #include "scamper_addr.h" #include "scamper_list.h" #include "scamper_tlv.h" #include "scamper_trace.h" #include "scamper_debug.h" #include "scamper_file.h" #include "scamper_outfiles.h" #include "scamper_target.h" #include "scamper_task.h" #include "scamper_addresslist.h" #include "scamper_fds.h" #include "scamper_linepoll.h" #include "scamper_privsep.h" #include "scamper_addr.h" #include "scamper_do_trace.h" #include "scamper_ping.h" #include "scamper_do_ping.h" #include "scamper_cyclemon.h" #include "utils.h" #include "mjl_list.h" #include "mjl_splaytree.h" typedef void *(*alloctask_t)(void *data, scamper_list_t *, scamper_cycle_t *); typedef void *(*freedata_t)(void *data); /* * command * * this structure is used to hold information for each command found in * a source's command list. */ typedef struct command { /* the type of command */ uint8_t type; #define command_probe_addr command_un.probe.addr #define command_probe_cyclemon command_un.probe.cyclemon #define command_probe_data command_un.probe.data #define command_probe_alloctask command_un.probe.alloctask #define command_probe_freedata command_un.probe.freedata union { /* hold the parameters for a probe command */ struct command_probe_s { /* target address */ scamper_addr_t *addr; /* cycle */ scamper_cyclemon_t *cyclemon; /* data parameter - a preinitialised structure with all parameters set */ void *data; /* function to allocate a scamper task using the data parameter */ alloctask_t alloctask; /* function to free the data pointed to above */ freedata_t freedata; } probe; #define command_cycle_cycle command_un.cycle.cycle /* hold the parameters for a cycle command */ struct command_cycle_s { /* the new cycle */ scamper_cycle_t *cycle; } cycle; } command_un; } command_t; #define COMMAND_PROBE 0x00 #define COMMAND_CYCLE 0x01 /* * on_hold * * structure to keep details on the command on hold. */ typedef struct on_hold { /* pointer to the task that has the lock on the address */ scamper_task_t *task; /* pointer to the source list that wants the address to be probed */ scamper_source_t *source; /* pointer to the command that is waiting on the task to complete */ command_t *command; /* pointer to the node held in the task's on-hold list */ void *task_cookie; /* pointer to the node held in the source's on-hold list */ dlist_node_t *node; } on_hold_t; /* * alf * * structure to keep state when reading an file of commands. */ typedef struct alf { /* back pointer to the source struct that receives each command */ scamper_source_t *source; /* the file descriptor that scamper is monitoring */ scamper_fd_t *fd; /* structure to break up a chunk of an ascii file into lines */ scamper_linepoll_t *lp; } alf_t; /* * source_file * * structure to keep state on a source_file. this structure is used when * a source is defined to cycle over a file. this sturcture is not used when * a source is adhoc. */ typedef struct source_file { /* the name of the file */ char *filename; /* a count of the number of cycles left to take through the list */ int cycles; /* if set, the list should be reloaded at the next cycle */ int reload; /* if set, the list should be refreshed each cycle if it has changed */ int autoreload; /* the modification time of the file when it was last read */ time_t mtime; } source_file_t; /* * scamper_source * * struct that keeps a record of the various address sources that scamper * is dealing with at present. * */ struct scamper_source { /* reference count to decide when the source is no longer required */ int refcnt; /* mix priority of this source */ int priority; /* list of commands queued */ slist_t *commands; /* list of commands that are on hold until some external event completes */ dlist_t *on_hold; /* * the next two variables define the current state of the source based on * the queue it is in. if set, the source is either in the active or * blocked state. the first parameter defines which of the two lists it * is in, while the second parameter defines the list node the source has * been assigned. */ void *l; void *ln; /* default command to use where none is otherwise specified */ char *command; size_t command_len; /* the details of the list */ scamper_list_t *list; /* pointer to a structure that monitors when to write a cycle_stop record */ scamper_cycle_t *cycle; scamper_cyclemon_t *cyclemon; /* where the data should be written, if the source has a preference */ scamper_outfile_t *sof; /* how many cycle points are defined */ int cycle_points; /* * if set, this source is adhoc; that is, it does not have a defined * command source. an adhoc source may have commands that originate in * files, or are specified on the control socket. if a source is not * adhoc, it may only obtain its commands from a single file, specified * below. */ int adhoc; /* * the name of the input file that the source gets its commands from. * if set, the source does not take commands from other files, or on an * adhoc basis. */ source_file_t *file; /* maintain state when a file is being parsed for commands */ alf_t *alf; }; /* * source_observe * */ typedef struct source_observe { scamper_source_event_func_t func; void *param; dlist_node_t *node; } source_observe_t; /* * global variables for managing sources: * * a source is stored in one of two lists depending on its state. it is * either stored in the active list, a round-robin circular list, or in * the blocked list. * * the source, if any, currently being used (that is, has not used up its * priority quantum) is pointed to by source_cur. the number of tasks that * have been read from the current source in this rotation is held in * source_cnt. * * there are two special sources. the default source is mostly used for * handling tasks passed in on the command line. the preempt source is used * for handling high-priority tasks. * * the sources are stored in a tree that is searchable by name. the tree * does not include special sources. */ static clist_t *active = NULL; static dlist_t *blocked = NULL; static scamper_source_t *source_cur = NULL; static int source_cnt = 0; static scamper_source_t *source_def = NULL; static splaytree_t *source_tree = NULL; /* * scamper provides the ability for external monitoring of source events. * these observers are held in a list. */ static dlist_t *observers = NULL; /* * keep state to ensure that stdin is only ever supplied as a target * address source once. */ static int stdin_used = 0; /* global temporary buf for assembling commands */ static char *command_buf = NULL; static size_t command_len = 0; static int source_event_post_cb(void *item, void *param) { scamper_source_event_t *event = (scamper_source_event_t *)param; source_observe_t *observe = (source_observe_t *)item; observe->func(event, observe->param); return 0; } static void source_event_post(scamper_source_t *source, int type, scamper_source_event_t *ev) { scamper_source_event_t sse; struct timeval tv; /* check if there is actually anyone observing */ if(observers == NULL) { return; } /* if null event, then create one from scratch */ if(ev == NULL) { memset(&sse, 0, sizeof(sse)); ev = &sse; } gettimeofday_wrap(&tv); ev->source = source; ev->event = type; ev->sec = tv.tv_sec; dlist_foreach(observers, source_event_post_cb, ev); return; } /* * source_cycle_finish * * when the last cycle is written to disk, we can start on the next cycle. */ static void source_cycle_finish(scamper_cycle_t *cycle, scamper_source_t *source, scamper_outfile_t *outfile) { scamper_file_t *sf; struct timeval tv; /* timestamp when the cycle ends */ gettimeofday_wrap(&tv); cycle->stop_time = (uint32_t)tv.tv_sec; /* write the cycle stop record out */ if(outfile != NULL) { sf = scamper_outfile_getfile(outfile); scamper_file_write_cycle_stop(sf, cycle); } /* last task for this cycle, reduce count of cycle points */ if(source != NULL) { source->cycle_points--; if(source->alf != NULL && source->cycle_points < 2) { scamper_fd_read_unpause(source->alf->fd); } } return; } static void command_free(command_t *command) { switch(command->type) { case COMMAND_PROBE: scamper_addr_free(command->command_probe_addr); scamper_cyclemon_unuse(command->command_probe_cyclemon); break; case COMMAND_CYCLE: break; } free(command); return; } /* * command_probe * * given the commands list and a target address, create a probe command * and put it on the head / tail of the list depending on the value of * the pushhead parameter; 0 means tail, 1 means head. */ static int command_probe(slist_t *commands, scamper_addr_t *addr, scamper_cyclemon_t *cyclemon, void *data, alloctask_t alloctask, freedata_t freedata) { command_t *command = NULL; if((command = malloc_zero(sizeof(command_t))) == NULL) { goto err; } command->type = COMMAND_PROBE; command->command_probe_addr = scamper_addr_use(addr); command->command_probe_cyclemon = scamper_cyclemon_use(cyclemon); command->command_probe_data = data; command->command_probe_alloctask = alloctask; command->command_probe_freedata = freedata; if(slist_tail_push(commands, command) == NULL) { goto err; } return 0; err: if(command != NULL) command_free(command); return -1; } /* * command_cycle * * given the commands list, append a cycle command to it. */ static int command_cycle(scamper_source_t *source, scamper_cycle_t *cycle) { command_t *command = NULL; if((command = malloc_zero(sizeof(command_t))) == NULL) { goto err; } command->type = COMMAND_CYCLE; command->command_cycle_cycle = cycle; if(slist_tail_push(source->commands, command) == NULL) { goto err; } source->cycle_points++; return 0; err: if(command != NULL) command_free(command); return -1; } /* * source_cycle * * allocate and initialise a cycle start object for the source. * write the cycle start to disk. */ static int source_cycle(scamper_source_t *source, uint32_t cycle_id) { scamper_cyclemon_t *cyclemon = NULL; scamper_cycle_t *cycle = NULL; /* allocate the new cycle object */ if((cycle = scamper_cycle_alloc(source->list)) == NULL) { goto err; } /* assign the cycle id */ cycle->id = cycle_id; /* allocate structure to monitor references to the new cycle */ if((cyclemon = scamper_cyclemon_alloc(cycle, source_cycle_finish, source, source->sof)) == NULL) { goto err; } /* append the cycle record to the source's commands list */ if(command_cycle(source, cycle) != 0) { goto err; } /* * if there is a previous cycle object associated with the source, then * free that. also free the cyclemon. */ if(source->cycle != NULL) { scamper_cycle_free(source->cycle); } if(source->cyclemon != NULL) { scamper_cyclemon_unuse(source->cyclemon); } /* store the cycle and we're done */ source->cycle = cycle; source->cyclemon = cyclemon; return 0; err: if(cyclemon != NULL) scamper_cyclemon_free(cyclemon); if(cycle != NULL) scamper_cycle_free(cycle); return -1; } /* * source_file_open * * open the filename specified. set it to non-blocking mode. * return the fd to the caller. */ static int source_file_open(char *filename) { int fd = -1; if(strcmp(filename, "-") == 0) { if(stdin_used == 0) { if((fd = fileno(stdin)) == -1) { goto err; } stdin_used = 1; } else { goto err; } } else { #if defined(WITHOUT_PRIVSEP) fd = open(filename, O_RDONLY); #else fd = scamper_privsep_open_file(filename, O_RDONLY, 0); #endif /* make sure the fd is valid, otherwise bail */ if(fd == -1) { goto err; } } /* we don't want reads on this file descriptor to block */ if(fcntl_set(fd, O_NONBLOCK) == -1) { goto err; } return fd; err: if(fd != -1) close(fd); return -1; } /* * source_cmp * * provide a sorting function for storing sources in a splay tree */ static int source_cmp(const void *a, const void *b) { return strcasecmp(((const scamper_source_t *)b)->list->name, ((const scamper_source_t *)a)->list->name); } /* * source_next * * advance to the next source to read addresses from, and reset the * current count of how many addresses have been returned off the list * for this source-cycle */ static scamper_source_t *source_next(void) { void *node; if((node = clist_node_next(source_cur->ln)) != source_cur->ln) { source_cur = clist_node_item(node); } source_cnt = 0; return source_cur; } /* * source_active_detach * * detach the source out of the active list. move to the next source * if it is the current source that is being read from. */ static void source_active_detach(scamper_source_t *source) { void *node; assert(source->l == active); if((node = clist_node_next(source->ln)) != source->ln) { source_cur = clist_node_item(node); } else { source_cur = NULL; } source_cnt = 0; clist_node_pop(active, source->ln); source->l = NULL; source->ln = NULL; return; } /* * source_blocked_detach * * detach the source out of the blocked list. */ static void source_blocked_detach(scamper_source_t *source) { assert(source->l == blocked); dlist_node_pop(blocked, source->ln); source->ln = NULL; source->l = NULL; return; } /* * source_active_attach * * some condition has changed, which may mean the source can go back onto * the active list for use by the probing process. * * a caller MUST NOT assume that the source will necessarily end up on the * active list after calling this function. for example, source_active_attach * may be called when new tasks are added to the command list. however, the * source may have a zero priority, which means probing this source is * currently paused. */ static int source_active_attach(scamper_source_t *source) { if(source->l == active) { return 0; } if(source->l == blocked) { /* if the source has a zero priority, it must remain blocked */ if(source->priority == 0) { return 0; } source_blocked_detach(source); } if((source->ln = clist_tail_push(active, source)) == NULL) { return -1; } source->l = active; if(source_cur == NULL) { source_cur = source; source_cnt = 0; } return 0; } /* * source_blocked_attach * * put the specified source onto the blocked list. */ static int source_blocked_attach(scamper_source_t *source) { if(source->l == blocked) { return 0; } if(source->l != NULL) { source_active_detach(source); } if((source->ln = dlist_tail_push(blocked, source)) == NULL) { return -1; } source->l = blocked; return 0; } /* * addresslist_unhold * * callback function that is used to take an on_hold structure and put * the command back on the source's command list. finally, it puts the * source back on the active list (if it is not already). */ static void addresslist_unhold(void *cookie) { on_hold_t *on_hold = (on_hold_t *)cookie; scamper_source_t *source = on_hold->source; command_t *command = on_hold->command; /* remove the source's copy of the on-hold structure */ dlist_node_pop(source->on_hold, on_hold->node); free(on_hold); /* push the command onto the front of the list */ slist_head_push(source->commands, command); /* put the source on the active list */ source_active_attach(source); return; } /* * addresslist_onhold * * the command (pointed to by command) cannot run right now because another * task (pointed to by task) is busy probing the same target. put it on hold, * and note which source (pointed to by source) the command originates from. */ static int addresslist_onhold(scamper_source_t *source, scamper_task_t *task, command_t *command) { on_hold_t *on_hold; /* allocate the on-hold structure, and set up the basic parameters */ if((on_hold = malloc_zero(sizeof(on_hold_t))) == NULL) { return -1; } on_hold->task = task; on_hold->source = source; on_hold->command = command; /* put the structure on the the source's onhold list */ if((on_hold->node = dlist_tail_push(source->on_hold, on_hold)) == NULL) { goto cleanup; } /* * register the on-hold structure with the task we're blocked on, so that * when the task completes it can wake up the source (by using the * addresslist_unhold callback function). */ if((on_hold->task_cookie = scamper_task_onhold(task, on_hold, addresslist_unhold)) == NULL) { goto cleanup; } return 0; cleanup: if(on_hold->node != NULL) dlist_node_pop(source->on_hold, on_hold->node); free(on_hold); return -1; } /* * source_command * */ static int source_command(scamper_source_t *source, char *command) { char *opts; alloctask_t alloctask; freedata_t freedata; scamper_trace_t *trace; scamper_ping_t *ping; scamper_addr_t *addr; void *data; opts = string_nextword(command); if(strcasecmp(command, "trace") == 0) { if((trace = scamper_do_trace_alloc(opts)) == NULL) { return -1; } alloctask = (alloctask_t)scamper_do_trace_alloctask; freedata = (freedata_t)scamper_trace_free; data = trace; addr = trace->dst; } else if(strcasecmp(command, "ping") == 0) { if((ping = scamper_do_ping_alloc(opts)) == NULL) { return -1; } alloctask = (alloctask_t)scamper_do_ping_alloctask; freedata = (freedata_t)scamper_ping_free; data = ping; addr = ping->dst; } else goto err; if(source == NULL) source = source_def; if(command_probe(source->commands, addr, source->cyclemon, data, alloctask, freedata) != 0) { goto err; } source_active_attach(source); return 0; err: return -1; } static int command_assemble(char *cmd, size_t cmd_len, char *addr, size_t addr_len) { size_t reqd_len; void *tmp; /* allocate buffer large enough, if required */ if(command_len < (reqd_len = cmd_len + 1 + addr_len + 1)) { if((tmp = malloc(reqd_len)) == NULL) { return -1; } if(command_buf != NULL) { free(command_buf); } command_buf = tmp; command_len = reqd_len; } /* assemble the command string */ memcpy(command_buf, cmd, cmd_len); command_buf[cmd_len] = ' '; memcpy(&command_buf[cmd_len+1], addr, addr_len+1); return 0; } /* * alf_free * * free up all resources related to an address-list-file. * if feedlastline is set, then any partial line is parsed. * if closefd is set, then the file descriptor is closed. */ static void alf_free(alf_t *alf, int feedlastline) { int fd = -1; if(alf->lp != NULL) { scamper_linepoll_free(alf->lp, feedlastline); } if(alf->fd != NULL) { fd = scamper_fd_fd_get(alf->fd); scamper_fd_free(alf->fd); } if(fd != -1) { close(fd); } free(alf); return; } /* * alf_read_line * * this callback receives a single line per call, which should contain an * address in string form. it combines that address with the source's * default command and then passes the string to source_command for further * processing. the line eventually ends up in the commands queue. */ static int alf_read_line(void *param, uint8_t *buf, size_t len) { alf_t *alf = (alf_t *)param; scamper_source_t *source = alf->source; char *str = (char *)buf; /* make sure the string contains only printable characters */ if(string_isprint(str, len) == 0) { return -1; } /* null terminate at these characters */ string_nullterm(str, " \r\t#"); /* make sure the line isn't blank or a comment line */ if(str[0] == '\0' || str[0] == '#') { return 0; } if(command_assemble(source->command, source->command_len, str, len) != 0) { return -1; } /* resolve the address */ if(source_command(source, command_buf) == -1) { return -1; } return 0; } /* * alf_read * * this callback is called when the fd has something ready to read. */ static void alf_read(const int fd, void *param) { alf_t *alf = (alf_t *)param; scamper_source_t *source = alf->source; uint8_t buf[1024]; ssize_t rc; time_t mtime; int reload = 0; int newfd; if((rc = read(fd, buf, sizeof(buf))) > 0) { /* got data to read. parse the buffer for addresses, one per line. */ scamper_linepoll_handle(alf->lp, buf, (size_t)rc); /* * if probe queue for this source is sufficiently large, then * don't read any more for the time being */ if(slist_count(source->commands) >= scamper_pps_get()) { scamper_fd_read_pause(alf->fd); } } else if(rc == 0 && (source->adhoc == 1 || source->file->cycles == 1)) { /* * when we get EOF on an adhoc source, or we complete the last cycle * over an input file, the input file is closed */ alf_free(alf, 1); source->alf = NULL; } else if(rc == 0 && source->adhoc == 0) { /* a cycle value of zero means cycle indefinitely */ if(source->file->cycles != 0) { source->file->cycles--; } /* decide if we should reload the file at this point */ if(source->file->reload == 1) { /* stat the file so we have an mtime value for later */ if(stat_mtime(source->file->filename, &mtime) == 0) { reload = 1; } } else if(source->file->autoreload == 1) { /* * reload is conditional on being able to stat the file, and the * mtime being different to whatever our record of the mtime is */ if(stat_mtime(source->file->filename, &mtime) == 0 && source->file->mtime != mtime) { reload = 1; } } /* we have to reload the file (if we can open it) */ if(reload == 1 && (newfd=source_file_open(source->file->filename)) != -1) { /* use the new file descriptor */ if(scamper_fd_fd_set(source->alf->fd, newfd) == -1) { goto err; } /* close the existing file */ close(fd); /* update file details; ensure reload is reset to zero */ source->file->mtime = mtime; source->file->reload = 0; } else { /* rewind the current file position */ if(lseek(fd, 0, SEEK_SET) == -1) { goto err; } } /* check to see if we should pause, or allow reading to continue */ if(source->cycle_points < 1) { scamper_fd_read_unpause(source->alf->fd); } else { scamper_fd_read_pause(source->alf->fd); } /* create a new cycle record, etc */ if(source_cycle(source, source->cycle->id + 1) == -1) { goto err; } } else if(rc == -1 && errno != EAGAIN) { printerror(errno, strerror, __func__, "read failed"); goto err; } return; err: alf_free(alf, 0); source->alf = NULL; return; } static alf_t *alf_alloc(int fd, scamper_source_t *source) { alf_t *alf = NULL; if((alf = malloc_zero(sizeof(alf_t))) == NULL) { goto err; } if((alf->fd = scamper_fd_private(fd, alf_read, alf, NULL, NULL)) == NULL) { goto err; } if((alf->lp = scamper_linepoll_alloc(alf_read_line, alf)) == NULL) { goto err; } alf->source = source; return alf; err: if(alf != NULL) { if(alf->fd != NULL) scamper_fd_free(alf->fd); if(alf->lp != NULL) scamper_linepoll_free(alf->lp, 0); free(alf); } return NULL; } /* * source_free * * clean up the source */ static void source_free(scamper_source_t *source) { command_t *command; on_hold_t *onhold; if(source->cyclemon != NULL) { scamper_cyclemon_source_detach(source->cyclemon); scamper_cyclemon_unuse(source->cyclemon); source->cyclemon = NULL; } /* detach the source from whatever list it is in */ if(source->l == active) { source_active_detach(source); } else if(source->l == blocked) { source_blocked_detach(source); } /* * empty the source of any addresses that are currently on hold. * this occurs before we flush the commands list as it is possible that * an on-hold address might end up in the command list for this source. */ if(source->on_hold != NULL) { while((onhold = dlist_head_pop(source->on_hold)) != NULL) { scamper_task_dehold(onhold->task, onhold->task_cookie); free(onhold); } dlist_free(source->on_hold); } /* now empty the list of commands */ if(source->commands != NULL) { while((command = slist_head_pop(source->commands)) != NULL) { if(command->type == COMMAND_PROBE) { command->command_probe_freedata(command->command_probe_data); } command_free(command); } slist_free(source->commands); } /* release this structure's hold on the scamper_outfile */ if(source->sof != NULL) scamper_outfile_free(source->sof); /* remove the source from the source tree */ splaytree_remove_item(source_tree, source); /* free up details of the source file */ if(source->file != NULL) { if(source->file->filename != NULL) free(source->file->filename); free(source->file); } if(source->alf != NULL) alf_free(source->alf, 0); if(source->command != NULL) free(source->command); if(source->list != NULL) scamper_list_free(source->list); if(source->cycle != NULL) scamper_cycle_free(source->cycle); free(source); return; } scamper_source_t *scamper_source_use(scamper_source_t *source) { source->refcnt++; return source; } void scamper_source_unuse(scamper_source_t *source) { /* * decrement the reference count. check to see if the reference count * shows there are active tasks probing. if there are active tasks, then * return now. */ if(--source->refcnt > 1) { return; } /* * if the source is an adhoc source, then don't remove it; other commands * may be supplied in the future */ if(source->adhoc == 1) { return; } /* if there are more cycles to come, then don't remove the source */ if(source->file->cycles != 1) { return; } /* * do not remove a source that has targets queued, or has the possibility * of having targets queued */ if(slist_count(source->commands) > 0 || source->alf != NULL || dlist_count(source->on_hold) > 0) { return; } source_event_post(source, SCAMPER_SOURCE_EVENT_FINISH, NULL); source_free(source); return; } void *scamper_addresslist_observe(scamper_source_event_func_t func, void *p) { source_observe_t *observe = NULL; if(observers == NULL && (observers = dlist_alloc()) == NULL) { goto err; } if((observe = malloc_zero(sizeof(source_observe_t))) == NULL) { goto err; } observe->func = func; observe->param = p; if((observe->node = dlist_tail_push(observers, observe)) == NULL) { goto err; } return observe; err: if(observe != NULL) { if(observe->node != NULL) dlist_node_pop(observers, observe->node); free(observe); } return NULL; } void scamper_addresslist_unobserve(void *handle) { source_observe_t *observe = (source_observe_t *)handle; if(observers == NULL) { return; } /* free the node */ dlist_node_pop(observers, observe->node); free(observe); /* if there are no other observations, free the list as well */ if(dlist_count(observers) == 0) { dlist_free(observers); observers = NULL; } return; } uint32_t scamper_source_getlistid(const scamper_source_t *source) { return source->list->id; } uint32_t scamper_source_getcycleid(const scamper_source_t *source) { return source->cycle->id; } const char *scamper_source_getname(const scamper_source_t *source) { return source->list->name; } const char *scamper_source_getdescr(const scamper_source_t *source) { return source->list->descr; } int scamper_source_getpriority(const scamper_source_t *source) { return source->priority; } int scamper_source_getadhoc(const scamper_source_t *source) { return source->adhoc; } scamper_outfile_t *scamper_source_getoutfile(const scamper_source_t *s) { /* if there is no defined outfile, then the default outfile will be used */ if(s->sof == NULL) { /* passing NULL means the default outfile */ return scamper_outfiles_get(NULL); } return s->sof; } const char *scamper_source_getfilename(const scamper_source_t *source) { if(source->file == NULL) return NULL; return source->file->filename; } int scamper_source_getcycles(const scamper_source_t *source) { if(source->file == NULL) return 1; return source->file->cycles; } int scamper_source_getautoreload(const scamper_source_t *source) { if(source->file == NULL) return 0; return source->file->autoreload; } int scamper_source_update(scamper_source_t *source, const int *autoreload, const int *cycles, const int *priority) { scamper_source_event_t sse; int old_priority; /* first, ensure there is actually some parameter to update */ if(autoreload == NULL && cycles == NULL && priority == NULL) { return 0; } /* * second, sanity check the parameters; * * if there is no file, then can't update autoreload. * if there is no file, then can't update cycle count. * negative priority forbidden */ if((autoreload != NULL && source->file == NULL) || (cycles != NULL && source->file == NULL) || (priority != NULL && *priority < 0)) { return -1; } /* build up an event structure to post */ memset(&sse, 0, sizeof(sse)); if(autoreload != NULL) { sse.sse_update_flags |= 0x01; sse.sse_update_autoreload = *autoreload; source->file->autoreload = *autoreload; } if(cycles != NULL) { sse.sse_update_flags |= 0x02; sse.sse_update_cycles = *cycles; source->file->cycles = *cycles; } if(priority != NULL) { sse.sse_update_flags |= 0x04; sse.sse_update_priority = *priority; /* swap the priorities around */ old_priority = source->priority; source->priority = *priority; /* if priority is set to zero */ if(*priority == 0 && old_priority > 0) { source_blocked_attach(source); } /* else if the priority is raised from zero */ else if(*priority > 0 && old_priority == 0) { source_active_attach(source); } } source_event_post(source, SCAMPER_SOURCE_EVENT_UPDATE, &sse); return 0; } /* * scamper_source_do */ int scamper_source_do(scamper_source_t *source, char *command) { return source_command(source, command); } int scamper_source_do_array(scamper_source_t *source, char *command, char **iparray, int ipcount) { size_t cmd_len; int i; /* need a command to go with the array */ if(command == NULL && (command = source->command) == NULL) { return -1; } cmd_len = strlen(command); for(i=0; iadhoc != 1 || source->alf != NULL) { goto err; } if((fd = source_file_open(filename)) == -1) { goto err; } if((source->alf = alf_alloc(fd, source)) == NULL) { goto err; } return 0; err: if(fd != -1) close(fd); return -1; } /* * scamper_addresslist_addsource * * link an address list into rotation. */ scamper_source_t *scamper_addresslist_addsource(scamper_source_params_t *ssp) { scamper_source_t *source = NULL; int fd = -1; int is_default = 0; const char *def; int cycleid; if((def = scamper_option_listname()) == NULL) { def = "default"; } if(strcasecmp(ssp->name, def) == 0) { is_default = 1; } /* * don't let a source named default get into the tree if there already * is a default source */ if(is_default && source_def != NULL) { goto err; } /* if a source is not 'ad-hoc', then it has to specify a file */ if(ssp->adhoc == 0 && ssp->filename == NULL) { goto err; } if((source = malloc_zero(sizeof(scamper_source_t))) == NULL) { goto err; } source->refcnt = 1; source->priority = ssp->priority; source->adhoc = ssp->adhoc; /* set the default command to use with the source */ if((source->command = strdup(ssp->command != NULL ? ssp->command : scamper_command_get())) == NULL) { goto err; } source->command_len = strlen(source->command); /* if we have been passed an output file to target, then use it */ if(ssp->sof != NULL) { source->sof = scamper_outfile_use(ssp->sof); } /* the targets list is an ordered list of addresses to probe */ if((source->commands = slist_alloc()) == NULL) { goto err; } /* the on-hold list is a collection of on_hold records for this source */ if((source->on_hold = dlist_alloc()) == NULL) { goto err; } /* * this structure defines the source where a series of traces came * from. it is shared by the source and by the traces that reference * it. */ if((source->list = scamper_list_alloc(ssp->list_id, ssp->name, ssp->descr, scamper_monitorname_get())) == NULL) { goto err; } /* initialise the cycle record */ if(is_default == 1) { if((cycleid = scamper_option_cycleid()) >= 0 && source_cycle(source, cycleid) == -1) { goto err; } } else { if(source_cycle(source, ssp->cycle_id) == -1) { goto err; } } /* if there is a file to initialise the source with, then open it now */ if(ssp->filename != NULL) { if((fd = source_file_open(ssp->filename)) == -1) { goto err; } if((source->alf = alf_alloc(fd, source)) == NULL) { goto err; } } /* * if the source is a managed list, then we need to keep state on the * destination list file. */ if(source->adhoc == 0) { if((source->file = malloc_zero(sizeof(source_file_t))) == NULL) { goto err; } if(fstat_mtime(fd, &source->file->mtime) == -1) { goto err; } if((source->file->filename = strdup(ssp->filename)) == NULL) { goto err; } source->file->autoreload = ssp->autoreload; source->file->cycles = ssp->cycles; } /* * put the source into the splaytree, where it can be found quickly * when needed. */ if(splaytree_insert(source_tree, source) == NULL) { goto err; } /* * put the source in the blocked list, as there currently is no * addresses ready to be probed at this time. */ if(is_default == 0 && source_blocked_attach(source) == -1) { goto err; } source_event_post(source, SCAMPER_SOURCE_EVENT_ADD, NULL); return source; err: if(fd != -1) close(fd); source_free(source); return NULL; } /* * scamper_addresslist_getsource * * look for the address list specified by the name passed */ scamper_source_t *scamper_addresslist_getsource(char *name) { scamper_source_t findme; scamper_list_t list; list.name = name; findme.list = &list; return (scamper_source_t *)splaytree_find(source_tree, &findme); } /* * scamper_addressslist_delsource * * remove the source from the list of available address sources if * possible */ int scamper_addresslist_delsource(scamper_source_t *source) { const char *def; if((def = scamper_option_listname()) == NULL) { def = "default"; } /* the default source cannot be removed */ if(strcasecmp(source->list->name, def) == 0) { return -1; } /* if there are external references to the source, then don't free it */ if(source->refcnt > 1) { return -1; } source_event_post(source, SCAMPER_SOURCE_EVENT_DELETE, NULL); source_free(source); return 0; } /* * scamper_addresslist_cyclesource * * if the source is an adhoc source [and it isn't the default source] * then append a cycle command to the source. */ int scamper_addresslist_cyclesource(scamper_source_t *source) { /* check that the source is not managed / is not the default list */ if(source->adhoc == 0 || source == source_def) { return -1; } /* append the cycle command */ if(source_cycle(source, source->cycle->id + 1) == -1) { return -1; } source_active_attach(source); return 0; } /* * scamper_addresslist_empty * * remove all addresses and sources in the address list * XXX: fix up to use source reference counting properly */ void scamper_addresslist_empty() { scamper_source_t *source; /* * for each source, go through and empty the lists, close the files, and * leave the list of sources available to read from empty. */ while((source = dlist_tail_pop(blocked)) != NULL) { source->l = NULL; source->ln = NULL; source_free(source); } while((source = clist_tail_pop(active)) != NULL) { source->l = NULL; source->ln = NULL; source_free(source); } return; } void scamper_addresslist_foreach(void *param, int (*func)(void *p, scamper_source_t *src)) { splaytree_inorder(source_tree, (splaytree_inorder_t)func, param); return; } /* * command_probe_handle * * */ static int command_probe_handle(scamper_source_t *source, command_t *command, scamper_task_t **task_out) { scamper_target_t *target; scamper_cycle_t *cycle; scamper_task_t *task; /* * if this command is blocked, then we can't allocate the caller a task. * put the command on hold, and return NULL in task_out. */ if((target = scamper_target_find(command->command_probe_addr)) != NULL) { addresslist_onhold(source, target->task, command); *task_out = NULL; return 0; } /* get a pointer to the cycle for *this* task */ cycle = scamper_cyclemon_cycle(command->command_probe_cyclemon); /* allocate the task structure to keep everything together */ if((task = command->command_probe_alloctask(command->command_probe_data, source->list, cycle)) == NULL) { command_free(command); return -1; } /* pass the cyclemon structure to the task */ task->cyclemon = scamper_cyclemon_use(command->command_probe_cyclemon); /* pass reference to the file to write the task to */ task->source = scamper_source_use(source); /* don't need the command any more */ command_free(command); /* return to the caller the task we allocated */ *task_out = task; return 0; } static int command_cycle_handle(scamper_source_t *source, command_t *command) { scamper_source_event_t sse; scamper_cycle_t *cycle = command->command_un.cycle.cycle; scamper_file_t *file; struct timeval tv; char hostname[MAXHOSTNAMELEN]; /* get the hostname of the system for the cycle point */ if(gethostname(hostname, sizeof(hostname)) == 0) { cycle->hostname = strdup(hostname); } /* get a timestamp for the cycle start point */ gettimeofday_wrap(&tv); cycle->start_time = (uint32_t)tv.tv_sec; /* write a cycle start point to disk if there is a file to do so */ if(source->sof != NULL && (file = scamper_outfile_getfile(source->sof)) != NULL) { scamper_file_write_cycle_start(file, cycle); } /* post an event saying the cycle point just rolled around */ sse.sse_cycle_cycle_id = cycle->id; source_event_post(source, SCAMPER_SOURCE_EVENT_CYCLE, &sse); command_free(command); return 0; } /* * scamper_addresslist_get * * return the next address to trace. * * note that if this function returns null, it does not necessarily infer * that there are no more addresses to trace; if there are addresses * currently on-hold that are blocked on other traces in the active window * and no other traces to enqueue then this function will return null. * * XXX: replace head_pop with head_node, and then head_pop when the item * is successfully returned. */ scamper_task_t *scamper_addresslist_get(void) { scamper_source_t *source; scamper_task_t *task; command_t *command; /* * if the priority of the source was changed in between calls to this * function, then make sure the source's priority hasn't been lowered to * below how many tasks it has had allocated in this cycle */ if(source_cur != NULL && source_cnt >= source_cur->priority) { source_next(); } if((source = source_cur) == NULL) { return NULL; } /* * if the number of available tasks is less than the packets-per-second * rate, and we're not too far ahead, then queue the source to read more * tasks. */ if(source->alf != NULL && source->cycle_points < 2 && slist_count(source->commands) < scamper_pps_get()) { scamper_fd_read_unpause(source->alf->fd); } while((source = source_cur) != NULL) { assert(source->priority != 0); /* fetch commands off until we have a task to return */ while((command = slist_head_pop(source->commands)) != NULL) { if(command->type == COMMAND_PROBE) { if(command_probe_handle(source, command, &task) == -1) { return NULL; } else if(task == NULL) { continue; } source_cnt++; return task; } else if(command->type == COMMAND_CYCLE) { command_cycle_handle(source, command); } } /* the previous source could not supply a command */ assert(slist_count(source->commands) == 0); /* * if there is still the possibility of getting an address * out of the source then put the list in the blocked list. */ if(source->alf != NULL || dlist_count(source->on_hold) > 0) { source_blocked_attach(source); } /* * if the source is the default list, then we merely detach it * from the active list and leave it 'dangling' in that it is not * in either of the active or blocked lists. */ else if(source == source_def) { source_active_detach(source); } /* * put the source onto the blocked list for now. */ else { source_blocked_attach(source); } } /* there shouldn't be any active address list sources available ... */ assert(clist_count(active) == 0); return NULL; } /* * scamper_addresslist_isempty * * return to the caller if the address list is empty or not. */ int scamper_addresslist_isempty() { /* * if there are either active or blocked address list sources, the list * can't be empty */ if(clist_count(active) > 0 || dlist_count(blocked) > 0) { return 0; } /* * if the default source is waiting for more addresses then the list * can't be empty */ if(source_def->alf != NULL) { return 0; } return 1; } /* * scamper_addresslist_isready * * return to the caller if the address list appears to be able to supply * an address for probing. */ int scamper_addresslist_isready() { if(source_cur != NULL) { return 1; } return 0; } int scamper_addresslist_init() { scamper_source_params_t ssp; const char *list_name; int list_id; if((active = clist_alloc()) == NULL) { return -1; } if((blocked = dlist_alloc()) == NULL) { return -1; } if((source_tree = splaytree_alloc(source_cmp)) == NULL) { return -1; } if((list_id = scamper_option_listid()) < 0) { list_id = 0; } if((list_name = scamper_option_listname()) == NULL) { list_name = "default"; } /* create the default address list source and put it into rotation */ memset(&ssp, 0, sizeof(ssp)); ssp.list_id = list_id; ssp.name = (char *)list_name; ssp.descr = "default"; ssp.priority = 1; ssp.adhoc = 1; if((source_def = scamper_addresslist_addsource(&ssp)) == NULL) { return -1; } return 0; } void scamper_addresslist_cleanup() { struct timeval tv; if(source_def != NULL) { if(source_def->cycle != NULL) { /* timestamp when the cycle ends */ gettimeofday_wrap(&tv); source_def->cycle->stop_time = (uint32_t)tv.tv_sec; scamper_file_write_cycle_stop(scamper_outfile_getfile( scamper_outfiles_get(NULL)), source_def->cycle); } source_free(source_def); source_def = NULL; } if(source_tree != NULL) { splaytree_free(source_tree, NULL); source_tree = NULL; } if(observers != NULL) { dlist_free(observers); observers = NULL; } if(blocked != NULL) { dlist_free(blocked); blocked = NULL; } if(active != NULL) { clist_free(active); active = NULL; } if(command_buf != NULL) { free(command_buf); command_buf = NULL; } return; }