/* ** Copyright (C) 2006-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* * rwsplit takes a sequence of input files and generates a set * of sample files from them. Each file is a single sample. * * Sampling criteria currnently has the following parameters: * * --basename: The name of the stub file to write to * --ip-limit: how many addresses to contain in a sample * --flow-limit: how many flows to contain in a sample * --packet-limit: how many packets to contain in a sample * --byte-limit: how many bytes * --sample-ratio: specifies that 1/n flows should be taken for the * sample file. * --file-ratio: specifies that 1/n possible sample files will be used. * sample is going to progress through the data linearly, so if you're * going to use time, make sure you sort on time. */ #include "silk.h" RCSIDENT("$SiLK: rwsplit.c 6074 2007-01-22 18:55:41Z mthomas $"); #include "rwpack.h" #include "utils.h" #include "iptree.h" #include "sksite.h" /* LOCAL DEFINES AND TYPEDEFS */ /* where to write --help output */ #define USAGE_FH stdout /* maximum number of output files; the file suffix is generated by * "%08u", so we can only have eight 9's worth of files */ #define MAX_OUTPUT_FILES 99999999 /* keep this in sync with the appOptionsEnum! */ typedef enum _aggmode_t { AGGMODE_IPS, AGGMODE_FLOWS, AGGMODE_PKTS, AGGMODE_BYTES, /* none must be last */ AGGMODE_NONE } aggmode_t; /* LOCAL FUNCTIONS */ static void appUsageLong(void); static void appTeardown(void); static void appSetup(int argc, char **argv); static int appOptionsHandler(clientData cData, int opt_index, char *opt_arg); static int processRec(const rwRec *input_rec); static void newOutput(void); static void closeOutput(void); /* LOCAL VARIABLES */ /* index of files in argv */ static int arg_index; /* * The state variable maintains all the information required for * maintaining the state of the partitioning application, including * sampling information and aggregation strategies. Actually * sample buffers are maintained in a seperate variable * created below; */ static struct _state { /* basename of output files */ char *basename; /* current output file */ rwIOStruct *rwios_out; /* iptree in which to store unique IPs */ skIPTree_t *ips; /* the index of the output file are we writing */ uint32_t output_ctr; /* max number of output files */ uint32_t max_outputs; /* max ip/flow/packet/byte per file */ uint32_t tag_limit; /* current count of ip/flow/packet/byte */ uint32_t tag_current; /* how many records we need to read before we write one */ uint32_t sample_ratio; /* how many records we've read on the way to reading * 'sample_ratio' records */ uint32_t current_sample_count; /* instead of writing each file, write each 'file_ratio' file */ uint32_t file_ratio; /* the thing we are aggregating */ aggmode_t aggmode; } state; /* OPTIONS SETUP */ typedef enum { /* the aggregate list--keep this set in sync with aggmode_t */ OPT_IP_LIMIT, OPT_FLOW_LIMIT, OPT_PACKET_LIMIT, OPT_BYTE_LIMIT, OPT_BASENAME, OPT_SAMPLE_RATIO, OPT_FILE_RATIO, OPT_MAX_OUTPUTS } appOptionsEnum; /* value to subtract from appOptionsEnum to get a aggmode_t */ static int opt2agg_offset = OPT_IP_LIMIT; static struct option appOptions[] = { {"ip-limit", REQUIRED_ARG, 0, OPT_IP_LIMIT}, {"flow-limit", REQUIRED_ARG, 0, OPT_FLOW_LIMIT}, {"packet-limit", REQUIRED_ARG, 0, OPT_PACKET_LIMIT}, {"byte-limit", REQUIRED_ARG, 0, OPT_BYTE_LIMIT}, {"basename", REQUIRED_ARG, 0, OPT_BASENAME}, {"sample-ratio", REQUIRED_ARG, 0, OPT_SAMPLE_RATIO}, {"file-ratio", REQUIRED_ARG, 0, OPT_FILE_RATIO}, {"max-outputs", REQUIRED_ARG, 0, OPT_MAX_OUTPUTS}, {0,0,0,0} /* sentinel entry */ }; static const char *appHelp[] = { "IP address count at which to begin a new sample file", "Flow count at which to begin a new sample file", "Packet count at which to begin a new sample file", "Bytes count at which to begin a new sample file", "Basename to use for output sample files", ("Ratio of records read to number written in sample\n" "\tfile (e.g., 100 means to write 1 out of 100 records). Def. 1"), ("Ratio of sample file names generated to total number\n" "\twritten (e.g., 10 means 1 of every 10 files will be saved). Def. 1"), ("Maximum number of files to write to disk. Def. 999999999"), (char *)NULL }; /* FUNCTION DEFINITIONS */ /* * appUsageLong(); * * Print complete usage information to USAGE_FH. Pass this * function to skOptionsSetUsageCallback(); optionsParse() will * call this funciton and then exit the program when the --help * option is given. */ static void appUsageLong(void) { #define USAGE_MSG \ ("--basename=F --{ip|flow|packet|byte}-limit=N [SWITCHES] [FILES]\n" \ "\tSplit a stream of SiLK Flow records into a set of flow files that\n" \ "\teach contain a subset of the records.\n") FILE *fh = USAGE_FH; int i; fprintf(fh, "%s %s", skAppName(), USAGE_MSG); fprintf(fh, "\nSWITCHES:\n"); skOptionsDefaultUsage(fh); sksiteOptionsUsage(fh); for (i = 0; appOptions[i].name; ++i) { fprintf(fh, "--%s %s. %s\n", appOptions[i].name, SK_OPTION_HAS_ARG(appOptions[i]), appHelp[i]); } fprintf(fh, ("\nNote: The --basename and one of the --*-limit" " switches are required.\n")); } /* * appTeardown() * * Teardown all modules, close all files, and tidy up all * application state. * * This function is idempotent. */ static void appTeardown(void) { static int teardownFlag = 0; if (teardownFlag) { return; } teardownFlag = 1; closeOutput(); if (state.ips) { skIPTreeDelete(&state.ips); } skAppUnregister(); } /* * appSetup(argc, argv); * * Perform all the setup for this application include setting up * required modules, parsing options, etc. This function should be * passed the same arguments that were passed into main(). * * Returns to the caller if all setup succeeds. If anything fails, * this function will cause the application to exit with a FAILURE * exit status. */ static void appSetup(int argc, char **argv) { /* verify same number of options and help strings */ assert((sizeof(appHelp)/sizeof(char *)) == (sizeof(appOptions)/sizeof(struct option))); /* register the application */ skAppRegister(argv[0]); skOptionsSetUsageCallback(&appUsageLong); /* initialize globals */ memset(&state, 0, sizeof(state)); state.aggmode = AGGMODE_NONE; state.sample_ratio = 1; state.file_ratio = 1; state.max_outputs = MAX_OUTPUT_FILES; /* register the options */ if (optionsRegister(appOptions, (optHandler)appOptionsHandler, NULL) || sksiteOptionsRegister(SK_SITE_FLAG_CONFIG_FILE)) { skAppPrintErr("unable to register options"); exit(EXIT_FAILURE); } /* parse the options */ arg_index = optionsParse(argc, argv); if (arg_index < 0) { /* options parsing should print error */ skAppUsage(); /* never returns */ } /* try to load site config file; if it fails, we will not be able * to resolve flowtype and sensor from input file names */ sksiteConfigure(0); /* arg_index is looking at first file name to process */ if (arg_index == argc) { if (FILEIsATty(stdin)) { skAppPrintErr("No input files on command line and" " stdin is connected to a terminal"); skAppUsage(); /* never returns */ } } /* * We now check for correctness. This implies: * An aggregation mode has been chosen. * An output stub name has been specified. */ if (state.aggmode == AGGMODE_NONE) { skAppPrintErr("No aggregation mode chosen; you must specify one"); exit(EXIT_FAILURE); } if (state.basename == NULL) { skAppPrintErr("You must specify the output files' basename"); exit(EXIT_FAILURE); } if (atexit(appTeardown) < 0) { skAppPrintErr("unable to register appTeardown() with atexit()"); appTeardown(); exit(EXIT_FAILURE); } /* need to initialize the state */ state.current_sample_count = state.sample_ratio; /* create IP tree if required */ if (state.aggmode == AGGMODE_IPS) { skIPTreeCreate(&state.ips); } return; /* OK */ } /* * status = appOptionsHandler(cData, opt_index, opt_arg); * * This function is passed to optionsRegister(); it will be called * by optionsParse() for each user-specified switch that the * application has registered; it should handle the switch as * required---typically by setting global variables---and return 1 * if the switch processing failed or 0 if it succeeded. Returning * a non-zero from from the handler causes optionsParse() to return * a negative value. * * The clientData in 'cData' is typically ignored; 'opt_index' is * the index number that was specified as the last value for each * struct option in appOptions[]; 'opt_arg' is the user's argument * to the switch for options that have a REQUIRED_ARG or an * OPTIONAL_ARG. */ static int appOptionsHandler( clientData UNUSED(cData), int opt_index, char *opt_arg) { aggmode_t new_aggmode; int rv; switch ((appOptionsEnum)opt_index) { case OPT_IP_LIMIT: case OPT_BYTE_LIMIT: case OPT_PACKET_LIMIT: case OPT_FLOW_LIMIT: new_aggmode = (aggmode_t)(opt_index - opt2agg_offset); if (state.aggmode != AGGMODE_NONE) { if (state.aggmode == new_aggmode) { skAppPrintErr("The --%s switch was given multiple times", appOptions[opt_index].name); } else { skAppPrintErr(("Can only give one aggregation strategy\n" "\tBoth %s and %s specified"), appOptions[state.aggmode+opt2agg_offset].name, appOptions[opt_index].name); } return 1; } state.aggmode = new_aggmode; if (skStringParseUint32(&state.tag_limit, opt_arg, 1, 0)) { skAppPrintErr("Error parsing %s '%s'", appOptions[opt_index].name, opt_arg); exit(EXIT_FAILURE); } break; case OPT_BASENAME: if (state.basename) { skAppPrintErr("The --%s switch was given multiple times", appOptions[opt_index].name); return 1; } state.basename = opt_arg; break; case OPT_SAMPLE_RATIO: rv = skStringParseUint32(&state.sample_ratio, opt_arg, 1, (UINT32_MAX / sizeof(rwRec))); if (rv) { if (rv < -10) { skAppPrintErr("%s value %s is out of range: %u-%lu", appOptions[opt_index].name, opt_arg, 1, (long unsigned int)(UINT32_MAX / sizeof(rwRec))); } else { skAppPrintErr("Error parsing %s '%s'", appOptions[opt_index].name, opt_arg); } exit(EXIT_FAILURE); } break; case OPT_FILE_RATIO: if (skStringParseUint32(&state.file_ratio, opt_arg, 1, 0)) { skAppPrintErr("Error parsing %s '%s'", appOptions[opt_index].name, opt_arg); exit(EXIT_FAILURE); } break; case OPT_MAX_OUTPUTS: rv = skStringParseUint32(&state.max_outputs, opt_arg, 1, MAX_OUTPUT_FILES); if (rv) { if (rv < -10) { skAppPrintErr("%s value %s is out of range: %u-%u", appOptions[opt_index].name, opt_arg, 1, MAX_OUTPUT_FILES); } else { skAppPrintErr("Error parsing %s '%s'", appOptions[opt_index].name, opt_arg); } exit(EXIT_FAILURE); } } return 0; /* OK */ } static void closeOutput(void) { int rv; if (state.rwios_out) { rv = rwioClose(state.rwios_out); if (rv) { rwioPrintLastErr(state.rwios_out, rv, &skAppPrintErr); } rwioDestroy(&state.rwios_out); } } /* * newOutput(); * * Create a new data file using the basename and allocates a handle * to it as the current file. */ static void newOutput(void) { static uint32_t sample_die_roll = 0; char datafn[PATH_MAX]; int rv; if (state.file_ratio != 1) { if (0 == (state.output_ctr % state.file_ratio)) { sample_die_roll = (uint32_t)(random() % state.file_ratio); } if ((state.output_ctr % state.file_ratio) != sample_die_roll) { ++state.output_ctr; return; } } /* have we written the maximum number of output files? */ if (state.max_outputs == 0) { exit(EXIT_SUCCESS); } --state.max_outputs; /* create new file name and open it */ snprintf(datafn, sizeof(datafn), "%s.%08d.rwf", state.basename, state.output_ctr); if (rwioCreate(&state.rwios_out, datafn, SK_RWIO_WRITE)) { skAppPrintErr("Error opening stub file %s\n", datafn); exit(EXIT_FAILURE); } rv = rwioOpen(state.rwios_out); if (rv) { rwioPrintLastErr(state.rwios_out, rv, &skAppPrintErr); rwioDestroy(&state.rwios_out); exit(EXIT_FAILURE); } ++state.output_ctr; } /* * int processRec(rwRec *active_rec) * * Given a single record, it updates its count and states and * determines whether or not it is time to move onto the next value * in the dataset. */ static int processRec(const rwRec *active_rec) { static uint32_t grab_index = 0; const rwRec *working_rec; int reset_status; reset_status = 0; /* if we are not processing every record, add the record to the * buffer and maybe pull a record from it */ if (state.sample_ratio != 1) { if (state.current_sample_count == state.sample_ratio) { state.current_sample_count = 0; /* figure out which record of the next sample_ratio * records to process */ grab_index = 1 + random() % state.sample_ratio; } ++state.current_sample_count; if (grab_index != state.current_sample_count) { return 0; } working_rec = active_rec; } working_rec = active_rec; if (state.rwios_out) { rwWrite(state.rwios_out, working_rec); } /* * What's going on here. This routine actually determine when an * element of the partition is complete and we can safely go on to * the next element. To do so, we update an internal count * (state.tag_current) with whatever values we got from the update. * The increase is determined by the record and the aggregation * mode. Once we have determined that the updated value exceeds our * per-partition limit (state.tag_limit), we close the file and move * onto the next one. */ switch (state.aggmode) { case AGGMODE_IPS: if (!skIPTreeCheckAddress(working_rec->sIP.ipnum, state.ips)) { skIPTreeAddAddress(working_rec->sIP.ipnum, state.ips); state.tag_current++; } if (!skIPTreeCheckAddress(working_rec->dIP.ipnum, state.ips)) { skIPTreeAddAddress(working_rec->dIP.ipnum, state.ips); state.tag_current++; } if (state.tag_current >= state.tag_limit) { reset_status = 1; /* reset tree */ skIPTreeRemoveAll(state.ips); } break; case AGGMODE_FLOWS: ++state.tag_current; if (state.tag_current >= state.tag_limit) { reset_status = 1; } break; case AGGMODE_PKTS: state.tag_current += working_rec->pkts; if (state.tag_current >= state.tag_limit) { reset_status = 1; } break; case AGGMODE_BYTES: state.tag_current += working_rec->bytes; if (state.tag_current >= state.tag_limit) { reset_status = 1; } break; case AGGMODE_NONE: assert(0); abort(); } if (reset_status) { /* new file */ closeOutput(); newOutput(); state.tag_current = 0; } return 0; } /* * rwios = appNextInput(argc, argv); * * Open and return the next input file from the command line or the * standard input if no files were given on the command line. */ static rwIOStruct *appNextInput(int argc, char **argv) { static int initialized = 0; rwIOStruct *rwios = NULL; const char *fname = NULL; int rv; if (!initialized) { initialized = 1; } else if (arg_index == argc) { /* no more input */ return NULL; } if (arg_index == argc) { fname = "stdin"; } else { /* get current file and prepare to get next */ fname = argv[arg_index]; ++arg_index; } /* create rwios and open file */ if ((rv = rwioCreate(&rwios, fname, SK_RWIO_READ)) || (rv = rwioOpen(rwios))) { rwioPrintLastErr(rwios, rv, NULL); if (rwios) { rwioDestroy(&rwios); } } return rwios; } int main(int argc, char **argv) { struct timeval tv; rwRec input_rec; rwIOStruct *in_rwios; appSetup(argc, argv); /* never returns on error */ gettimeofday(&tv, NULL); srandom((unsigned long)(tv.tv_sec + tv.tv_usec)); newOutput(); /* for all inputs, read all records */ while (NULL != (in_rwios = appNextInput(argc, argv))) { while (rwRead(in_rwios, &input_rec)) { processRec(&input_rec); } rwioDestroy(&in_rwios); } closeOutput(); return 0; } /* ** Local variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */