/* ** Copyright (C) 2004-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** rwbag ** ** Build binary Bag files from flow records. ** ** For IPs, use the standard Bag API to create the bags. For Bags ** of ports or protocols, store the data in an in-core array, then ** convert it to a bag when the time comes to store it on disk. ** ** Add provision for bagging on nhIP, sensor, input/output ** interface. ** John McHugh ** */ #include "silk.h" RCSIDENT("$SiLK: rwbag.c 7314 2007-05-29 13:50:04Z mthomas $"); #include "utils.h" #include "rwpack.h" #include "sksite.h" #include "bagtree.h" /* LOCAL DEFINES AND TYPEDEFS */ /* where to write --help output */ #define USAGE_FH stdout /* Where to write filenames if --print-file specified */ #define PRINT_FILENAMES_FH stderr /* number of potential IP bags: * {sIP,dIP, nhIP} * {flows,pkts,bytes} */ #define NUM_OUTPUTS_IP 9 /* number of potential arrays: * {sPort,dPort,proto,sensor,input,output} * {flows,pkts,bytes} */ #define NUM_OUTPUTS_ARRAY 18 typedef struct { skBag_header_t *bag; skstream_t *stream; } bagfile_t; typedef struct { skBag_counter_t *array; skBag_key_t size; skstream_t *stream; } arrayfile_t; /* LOCAL VARIABLES */ /* The potential IP-based bag files to create */ static bagfile_t bag_io[NUM_OUTPUTS_IP]; /* The potential array-based bag files to create */ static arrayfile_t array_io[NUM_OUTPUTS_ARRAY]; /* The output stream to use when --copy-input is given */ static rwIOStruct_t *copy_input = NULL; /* the compression method to use when writing the files. * sksiteCompmethodOptionsRegister() will set this to the default or * to the value the user specifies. */ static sk_compmethod_t comp_method; /* First file index in argv[] after handling all options */ static int arg_index; /* * stdout_used is set to 1 by prepareBagFile and prepareArrayBagFile * when a file is to be written to stdout. */ static int stdout_used = 0; /* whether to print input file names as they are opened */ static int print_filenames = 0; /* whether legacy help was requested */ static int legacy_help_requested = 0; /* True if an overflow warning has been printed for given stream */ static int overflow_warning[NUM_OUTPUTS_IP]; static int array_overflow_warning[NUM_OUTPUTS_ARRAY]; /* OPTIONS SETUP */ typedef enum _appOptionsEnum { /* These MUST be kept in order with the options */ SIP_FLOWS=0, SIP_PKTS, SIP_BYTES, DIP_FLOWS, DIP_PKTS, DIP_BYTES, NHIP_FLOWS, NHIP_PKTS, NHIP_BYTES, SPORT_FLOWS, SPORT_PKTS, SPORT_BYTES, DPORT_FLOWS, DPORT_PKTS, DPORT_BYTES, PROTO_FLOWS, PROTO_PKTS, PROTO_BYTES, SID_FLOWS, SID_PKTS, SID_BYTES, INPUT_FLOWS, INPUT_PKTS, INPUT_BYTES, OUTPUT_FLOWS, OUTPUT_PKTS, OUTPUT_BYTES, OPT_PRINT_FILENAMES, OPT_COPY_INPUT, OPT_LEGACY_HELP } appOptionsEnum; /* This #define is used to adjust the indices for the array-based * bags. It must be the first enum item that represents an array bag * instead of a tree bag. */ #define AB_OFFSET SPORT_FLOWS static struct option appOptions[] = { {"sip-flows", REQUIRED_ARG, 0, SIP_FLOWS}, {"sip-packets", REQUIRED_ARG, 0, SIP_PKTS}, {"sip-bytes", REQUIRED_ARG, 0, SIP_BYTES}, {"dip-flows", REQUIRED_ARG, 0, DIP_FLOWS}, {"dip-packets", REQUIRED_ARG, 0, DIP_PKTS}, {"dip-bytes", REQUIRED_ARG, 0, DIP_BYTES}, {"nhip-flows", REQUIRED_ARG, 0, NHIP_FLOWS}, {"nhip-packets", REQUIRED_ARG, 0, NHIP_PKTS}, {"nhip-bytes", REQUIRED_ARG, 0, NHIP_BYTES}, {"sport-flows", REQUIRED_ARG, 0, SPORT_FLOWS}, {"sport-packets", REQUIRED_ARG, 0, SPORT_PKTS}, {"sport-bytes", REQUIRED_ARG, 0, SPORT_BYTES}, {"dport-flows", REQUIRED_ARG, 0, DPORT_FLOWS}, {"dport-packets", REQUIRED_ARG, 0, DPORT_PKTS}, {"dport-bytes", REQUIRED_ARG, 0, DPORT_BYTES}, {"proto-flows", REQUIRED_ARG, 0, PROTO_FLOWS}, {"proto-packets", REQUIRED_ARG, 0, PROTO_PKTS}, {"proto-bytes", REQUIRED_ARG, 0, PROTO_BYTES}, {"sensor-flows", REQUIRED_ARG, 0, SID_FLOWS}, {"sensor-packets", REQUIRED_ARG, 0, SID_PKTS}, {"sensor-bytes", REQUIRED_ARG, 0, SID_BYTES}, {"input-flows", REQUIRED_ARG, 0, INPUT_FLOWS}, {"input-packets", REQUIRED_ARG, 0, INPUT_PKTS}, {"input-bytes", REQUIRED_ARG, 0, INPUT_BYTES}, {"output-flows", REQUIRED_ARG, 0, OUTPUT_FLOWS}, {"output-packets", REQUIRED_ARG, 0, OUTPUT_PKTS}, {"output-bytes", REQUIRED_ARG, 0, OUTPUT_BYTES}, /* additional switches not part of bag creation */ {"print-filenames", NO_ARG, 0, OPT_PRINT_FILENAMES}, {"copy-input", REQUIRED_ARG, 0, OPT_COPY_INPUT}, {"legacy-help", NO_ARG, 0, OPT_LEGACY_HELP}, {0,0,0,0} /* sentinel entry */ }; /* allow these older options */ static struct option oldOptions[] = { {"sf-file", REQUIRED_ARG, 0, SIP_FLOWS}, {"sp-file", REQUIRED_ARG, 0, SIP_PKTS}, {"sb-file", REQUIRED_ARG, 0, SIP_BYTES}, {"df-file", REQUIRED_ARG, 0, DIP_FLOWS}, {"dp-file", REQUIRED_ARG, 0, DIP_PKTS}, {"db-file", REQUIRED_ARG, 0, DIP_BYTES}, {"port-sf-file", REQUIRED_ARG, 0, SPORT_FLOWS}, {"port-sp-file", REQUIRED_ARG, 0, SPORT_PKTS}, {"port-sb-file", REQUIRED_ARG, 0, SPORT_BYTES}, {"port-df-file", REQUIRED_ARG, 0, DPORT_FLOWS}, {"port-dp-file", REQUIRED_ARG, 0, DPORT_PKTS}, {"port-db-file", REQUIRED_ARG, 0, DPORT_BYTES}, {"proto-f-file", REQUIRED_ARG, 0, PROTO_FLOWS}, {"proto-p-file", REQUIRED_ARG, 0, PROTO_PKTS}, {"proto-b-file", REQUIRED_ARG, 0, PROTO_BYTES}, /* allow proto-X to work as a shortcut since it is unique*/ {"proto-f", REQUIRED_ARG, 0, PROTO_FLOWS}, {"proto-p", REQUIRED_ARG, 0, PROTO_PKTS}, {"proto-b", REQUIRED_ARG, 0, PROTO_BYTES}, {0,0,0,0} /* sentinel */ }; static const char *appHelp[] = { "Write bag of flow counts by unique source IP", "Write bag of packet counts by unique source IP", "Write bag of byte counts by unique source IP", "Write bag of flow counts by unique destination IP", "Write bag of packet counts by unique destination IP", "Write bag of byte counts by unique destination IP", "Write bag of flow counts by unique next hop IP", "Write bag of packet counts by unique next hop IP", "Write bag of byte counts by unique next hop IP", "Write bag of flow counts by unique source port", "Write bag of packet counts by unique source port", "Write bag of byte counts by unique source port", "Write bag of flow counts by unique destination port", "Write bag of packet counts by unique destination port", "Write bag of byte counts by unique destination port", "Write bag of flow counts by unique protocol", "Write bag of packet counts by unique protocol", "Write bag of byte counts by unique protocol", "Write bag of flow counts by unique sensor ID", "Write bag of packet counts by unique sensor ID", "Write bag of byte counts by unique sensor ID", "Write bag of flow counts by unique input interface", "Write bag of packet counts by unique input interface", "Write bag of byte counts by unique input interface", "Write bag of flow counts by unique output interface", "Write bag of packet counts by unique output interface", "Write bag of byte counts by unique output interface", "Print names of input files as they are opened. Def. No", "Copy all input SiLK Flows to given pipe or file. Def. No", "Print usage including deprecated switches", (char *)NULL }; /* LOCAL FUNCTION DECLARATIONS */ static void appUsageLong(void); /* never returns */ static void appTeardown(void); static void appSetup(int argc, char **argv); /* never returns when error */ static int appOptionsHandler(clientData, int, const char *); static int prepareBagFile(bagfile_t*, const char *); static int prepareArrayBagFile(arrayfile_t*, const char *); /* FUNCTION DEFINITIONS */ /* * appUsageLong(); * * Print complete usage information to USAGE_FH. Pass this * function to skOptionsSetUsageCallback(); optionsParse() will * call this funciton and then exit the program when the --help * option is given. */ static void appUsageLong(void) { #define USAGE_MSG \ ("[SWITCHES] [FILES]\n" \ "\tRead SiLK Flow records and builds binary Bag(s) containing\n" \ "\tkey-count pairs. Key is one of source or destination address or\n" \ "\tport, protocol, sensor, input or output interface, or next hop IP.\n" \ "\tCounter is sum of flows, packets, or bytes. Reads SiLK Flows\n" \ "\tfrom named files or from the standard input.\n") FILE *fh = USAGE_FH; int i; fprintf(fh, "%s %s", skAppName(), USAGE_MSG); fprintf(fh, "\nSWITCHES:\n"); skOptionsDefaultUsage(fh); sksiteOptionsUsage(fh); sksiteCompmethodOptionsUsage(fh); /* Loop over the standard options */ for (i = 0; appOptions[i].name; ++i) { fprintf(fh, "--%s %s. %s\n", appOptions[i].name, SK_OPTION_HAS_ARG(appOptions[i]), appHelp[i]); } if (legacy_help_requested) { fprintf(fh, "\nLEGACY SWITCHES:\n"); /* Now, print the help for the deprecated options */ for (i = 0; appOptions[i].name && oldOptions[i].name; ++i) { fprintf(USAGE_FH, "--%s %s. Deprecated alias for --%s\n", oldOptions[i].name, SK_OPTION_HAS_ARG(appOptions[i]), appOptions[i].name); } } } /* * appTeardown() * * Teardown all modules, close all files, and tidy up all * application state. * * This function is idempotent. */ static void appTeardown(void) { static int teardownFlag = 0; int i; int rv; if (teardownFlag) { return; } teardownFlag = 1; /* close the copy stream */ if (copy_input) { rv = rwioClose(copy_input); if (rv) { rwioPrintLastErr(copy_input, rv, &skAppPrintErr); } rwioDestroy(©_input); } /* close all bag files */ for (i = 0; i < NUM_OUTPUTS_IP; ++i) { if (bag_io[i].bag) { skBag_free(bag_io[i].bag); } if (bag_io[i].stream) { rv = skStreamClose(bag_io[i].stream); if (rv) { skStreamPrintLastErr(bag_io[i].stream, rv, &skAppPrintErr); } rv = skStreamDestroy(&bag_io[i].stream); } memset(&bag_io[i], 0, sizeof(bagfile_t)); } /* close all array bag files */ for (i = 0; i < NUM_OUTPUTS_ARRAY; ++i) { if (array_io[i].array) { free(array_io[i].array); } if (array_io[i].stream) { rv = skStreamClose(array_io[i].stream); if (rv) { skStreamPrintLastErr(array_io[i].stream, rv, &skAppPrintErr); } rv = skStreamDestroy(&array_io[i].stream); } memset(&array_io[i], 0, sizeof(arrayfile_t)); } skAppUnregister(); } /* * appSetup(argc, argv); * * Perform all the setup for this application include setting up * required modules, parsing options, etc. This function should be * passed the same arguments that were passed into main(). * * Returns to the caller if all setup succeeds. If anything fails, * this function will cause the application to exit with a FAILURE * exit status. */ static void appSetup(int argc, char **argv) { int requested_output; int i; int rv; /* verify same number of options and help strings */ assert((sizeof(appHelp)/sizeof(char *)) == (sizeof(appOptions)/sizeof(struct option))); /* register the application */ skAppRegister(argv[0]); skOptionsSetUsageCallback(&appUsageLong); /* initialize variables */ memset(bag_io, 0, sizeof(bag_io)); memset(array_io, 0, sizeof(array_io)); memset(overflow_warning, 0, sizeof(overflow_warning)); memset(array_overflow_warning, 0, sizeof(array_overflow_warning)); /* register the options */ if (optionsRegister(appOptions, (optHandler)appOptionsHandler, NULL) || optionsRegister(oldOptions, (optHandler)appOptionsHandler, NULL) || sksiteCompmethodOptionsRegister(&comp_method) || sksiteOptionsRegister(SK_SITE_FLAG_CONFIG_FILE)) { skAppPrintErr("unable to register options"); exit(EXIT_FAILURE); } /* parse options */ arg_index = optionsParse(argc, argv); assert(arg_index <= argc); if (arg_index < 0) { /* options parsing should print error */ skAppUsage(); /* never returns */ } /* try to load site config file; if it fails, we will not be able * to resolve flowtype and sensor from input file names */ sksiteConfigure(0); /* arg_index is looking at first file name to process */ if (arg_index == argc) { if (FILEIsATty(stdin)) { skAppPrintErr("No input files on command line and" " stdin is connected to a terminal"); skAppUsage(); } } /* verify that the user requested output */ requested_output = 0; for (i = 0; i < NUM_OUTPUTS_IP; ++i) { if (bag_io[i].stream) { requested_output = 1; break; } } if (requested_output == 0) { for (i = 0; i < NUM_OUTPUTS_ARRAY; ++i) { if (array_io[i].stream) { requested_output = 1; break; } } } if (requested_output == 0) { skAppPrintErr("Must specify type of output(s) to generate."); skAppUsage(); } if (atexit(appTeardown) < 0) { skAppPrintErr("unable to register appTeardown() with atexit()"); appTeardown(); exit(EXIT_FAILURE); } /* open the output files */ for (i = 0; i < NUM_OUTPUTS_IP; ++i) { if (bag_io[i].stream) { rv = skStreamOpen(bag_io[i].stream); if (rv) { skStreamPrintLastErr(bag_io[i].stream, rv, &skAppPrintErr); exit(EXIT_FAILURE); } } } for (i = 0; i < NUM_OUTPUTS_ARRAY; ++i) { if (array_io[i].stream) { rv = skStreamOpen(array_io[i].stream); if (rv) { skStreamPrintLastErr(array_io[i].stream, rv, &skAppPrintErr); exit(EXIT_FAILURE); } } } return; /* OK */ } /* * status = appOptionsHandler(cData, opt_index, opt_arg); * * This function is passed to optionsRegister(); it will be called * by optionsParse() for each user-specified switch that the * application has registered; it should handle the switch as * required---typically by setting global variables---and return 1 * if the switch processing failed or 0 if it succeeded. Returning * a non-zero from from the handler causes optionsParse() to return * a negative value. * * The clientData in 'cData' is typically ignored; 'opt_index' is * the index number that was specified as the last value for each * struct option in appOptions[]; 'opt_arg' is the user's argument * to the switch for options that have a REQUIRED_ARG or an * OPTIONAL_ARG. */ static int appOptionsHandler( clientData UNUSED(cData), int opt_index, const char *opt_arg) { int rv; switch ((appOptionsEnum)opt_index) { case SIP_FLOWS: case SIP_PKTS: case SIP_BYTES: case DIP_FLOWS: case DIP_PKTS: case DIP_BYTES: case NHIP_FLOWS: case NHIP_PKTS: case NHIP_BYTES: if (!opt_arg || !opt_arg[0]) { skAppPrintErr("Missing file name for --%s option", appOptions[opt_index].name); return 1; } if (bag_io[opt_index].bag) { skAppPrintErr("Option %s given multiple times", appOptions[opt_index].name); return 1; } if (prepareBagFile(&bag_io[opt_index], opt_arg)) { return 1; } break; case SPORT_FLOWS: case SPORT_PKTS: case SPORT_BYTES: case DPORT_FLOWS: case DPORT_PKTS: case DPORT_BYTES: case PROTO_FLOWS: case PROTO_PKTS: case PROTO_BYTES: case SID_FLOWS: case SID_PKTS: case SID_BYTES: case INPUT_FLOWS: case INPUT_PKTS: case INPUT_BYTES: case OUTPUT_FLOWS: case OUTPUT_PKTS: case OUTPUT_BYTES: if (!opt_arg || 0 == strlen(opt_arg)) { skAppPrintErr("Missing file name for --%s option", appOptions[opt_index].name); return 1; } if (array_io[opt_index - AB_OFFSET].array) { skAppPrintErr("Option %s given multiple times", appOptions[opt_index].name); return 1; } if ((opt_index == PROTO_FLOWS) || (opt_index == PROTO_PKTS) || (opt_index == PROTO_BYTES)) { array_io[opt_index - AB_OFFSET].size = 0x100; } else { array_io[opt_index - AB_OFFSET].size = 0x10000; } if (prepareArrayBagFile(&array_io[opt_index - AB_OFFSET], opt_arg)) { return 1; } break; case OPT_COPY_INPUT: if (!opt_arg || 0 == strlen(opt_arg)) { skAppPrintErr("Missing file name for --%s option", appOptions[opt_index].name); return 1; } if (copy_input) { skAppPrintErr("Option %s given multiple times", appOptions[opt_index].name); return 1; } if (0 == strcmp(opt_arg, "stdout")) { if (stdout_used) { skAppPrintErr("Only one output may use stdout"); return 1; } stdout_used = 1; } if ((rv = rwioCreate(©_input, opt_arg, SK_RWIO_WRITE)) || (rv = rwioOpen(copy_input))) { rwioPrintLastErr(copy_input, rv, &skAppPrintErr); rwioDestroy(©_input); return 1; } break; case OPT_PRINT_FILENAMES: print_filenames = 1; break; case OPT_LEGACY_HELP: legacy_help_requested = 1; appUsageLong(); exit(EXIT_SUCCESS); break; } return 0; /* OK */ } /* * ok = prepareBagFile(bagf, pathname); * * Prepare the output bag file 'bagf' to write a bag to 'pathname'. * * This function creates an skstream_t to 'pathname' and allocates * a bag that will be writing to that file. The function makes * sure a file with that name does not currently exist. If * pathname is "stdout" and no other bag files are writing to * stdout, then stdout will be used. * * Returns 0 on success. Returns non-zero if allocation files, if * we attempt to open an existing file, if more than one bag use * stdout. */ static int prepareBagFile(bagfile_t *bagf, const char *pathname) { int rv; if (0 == strcmp("stdout", pathname)) { if (stdout_used) { skAppPrintErr("Only one output may use stdout"); return 1; } stdout_used = 1; } else if (fileExists(pathname)) { skAppPrintErr("Will not overwrite existing file %s", pathname); return 1; } if (skBag_create(&bagf->bag) != SKBAG_OK) { skAppPrintErr("Error allocating bag tree"); return 1; } if ((rv = skStreamCreate(&(bagf->stream), SK_IO_WRITE, SK_CONTENT_SILK)) || (rv = skStreamBind(bagf->stream, pathname)) || (rv = skStreamSetCompressionMethod(bagf->stream, comp_method))) { skStreamPrintLastErr(bagf->stream, rv, &skAppPrintErr); skStreamDestroy(&bagf->stream); return 1; } return 0; /* OK */ } /* * ok = prepareArrayBagFile(arraybag, pathname); * * Prepare the output bag file 'arraybag', which is an in-core * array---of ports or protocols. The array size should already be * filled in. * * This function creates an skstream_t to 'pathname' and allocates * an array that will be written to that file. The function makes * sure a file with that name does not currently exist. if * pathname is "stdout" and no other files are writing to stdout, * then stdout will be used. * * Returns 0 on success. Returns non-zero if allocation fails, if * we attempt to open an existing file, if more than one bag uses * stdout. */ static int prepareArrayBagFile(arrayfile_t *arrayf, const char *pathname) { int rv; assert(arrayf->size); if (0 == strcmp("stdout", pathname)) { if (stdout_used) { skAppPrintErr("Only one output may use stdout"); return 1; } stdout_used = 1; } else if (fileExists(pathname)) { skAppPrintErr("Will not overwrite existing file %s", pathname); return 1; } arrayf->array = calloc(arrayf->size, sizeof(skBag_counter_t)); if (NULL == arrayf->array) { skAppPrintErr("Error allocating bag array"); return 1; } if ((rv = skStreamCreate(&(arrayf->stream), SK_IO_WRITE, SK_CONTENT_SILK)) || (rv = skStreamBind(arrayf->stream, pathname)) || (rv = skStreamSetCompressionMethod(arrayf->stream, comp_method))) { skStreamPrintLastErr(arrayf->stream, rv, &skAppPrintErr); skStreamDestroy(&arrayf->stream); return 1; } return 0; /* OK */ } /* * ok = processFile(path); * * Read the SiLK Flow records from the file at 'path'---path can be * "stdin"---and potentially create bag files for * {sIP,dIP,sPort,dPort,proto} x {flows,pkts,bytes}. * * Return 0 if successful; non-zero otherwise. */ static int processFile(const char *path) { const skBag_counter_t max_val = (skBag_counter_t)(~0); rwIOStruct_t *rwIOS; skBag_err_t err = SKBAG_OK; rwRec rwrec; int i; int rv = -1; /* return value: assume error */ skBag_counter_t key; skBag_counter_t count; rwIOS = rwOpenFile(path, copy_input); if (!rwIOS) { /* rwOpenFile should print error */ return rv; } if (print_filenames) { fprintf(PRINT_FILENAMES_FH, "%s\n", rwGetFileName(rwIOS)); } while (rwRead(rwIOS, &rwrec)) { for (i = 0; i < NUM_OUTPUTS_IP; ++i) { if (bag_io[i].bag == NULL || overflow_warning[i]) { continue; } switch (i) { case SIP_FLOWS: err = skBag_incrCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.sIP.ipnum); break; case DIP_FLOWS: err = skBag_incrCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.dIP.ipnum); break; case NHIP_FLOWS: err = skBag_incrCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.nhIP.ipnum); break; case SIP_PKTS: count = (skBag_counter_t)rwrec.pkts; err = skBag_addToCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.sIP.ipnum, &count); break; case DIP_PKTS: count = (skBag_counter_t)rwrec.pkts; err = skBag_addToCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.dIP.ipnum, &count); break; case NHIP_PKTS: count = (skBag_counter_t)rwrec.pkts; err = skBag_addToCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.nhIP.ipnum, &count); break; case SIP_BYTES: count = (skBag_counter_t)rwrec.bytes; err = skBag_addToCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.sIP.ipnum, &count); break; case DIP_BYTES: count = (skBag_counter_t)rwrec.bytes; err = skBag_addToCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.dIP.ipnum, &count); break; case NHIP_BYTES: count = (skBag_counter_t)rwrec.bytes; err = skBag_addToCounter(bag_io[i].bag, (skBag_key_t*)&rwrec.nhIP.ipnum, &count); break; default: assert(0); abort(); } if (err != SKBAG_OK) { if (err == SKBAG_ERR_OP_BOUNDS) { overflow_warning[i] = 1; skAppPrintErr("**WARNING** %s overflow for bag %s", appOptions[i].name, skStreamGetPathname(bag_io[i].stream)); } else { if (err == SKBAG_ERR_MEMORY) { skAppPrintErr(("Out of memory for bag %s\n" "\tCleaning up and exiting"), skStreamGetPathname(bag_io[i].stream)); } else { skAppPrintErr("Error setting %s value for bag %s: %s", appOptions[i].name, skStreamGetPathname(bag_io[i].stream), skBag_strerror(err)); } goto END; } } } for (i = 0; i < NUM_OUTPUTS_ARRAY; ++i) { if (array_io[i].array == NULL || array_overflow_warning[i]) { continue; } switch (i + AB_OFFSET) { case SPORT_FLOWS: key = (skBag_key_t)rwrec.sPort; count = 1; break; case DPORT_FLOWS: key = (skBag_key_t)rwrec.dPort; count = 1; break; case PROTO_FLOWS: key = (skBag_key_t)rwrec.proto; count = 1; break; case SID_FLOWS: key = (skBag_key_t)rwrec.sID; count = 1; break; case INPUT_FLOWS: key = (skBag_key_t)rwrec.input; count = 1; break; case OUTPUT_FLOWS: key = (skBag_key_t)rwrec.output; count = 1; break; case SPORT_PKTS: key = (skBag_key_t)rwrec.sPort; count = (skBag_counter_t)rwrec.pkts; break; case DPORT_PKTS: key = (skBag_key_t)rwrec.dPort; count = (skBag_counter_t)rwrec.pkts; break; case PROTO_PKTS: key = (skBag_key_t)rwrec.proto; count = (skBag_counter_t)rwrec.pkts; break; case SID_PKTS: key = (skBag_key_t)rwrec.sID; count = (skBag_counter_t)rwrec.pkts; break; case INPUT_PKTS: key = (skBag_key_t)rwrec.input; count = (skBag_counter_t)rwrec.pkts; break; case OUTPUT_PKTS: key = (skBag_key_t)rwrec.output; count = (skBag_counter_t)rwrec.pkts; break; case SPORT_BYTES: key = (skBag_key_t)rwrec.sPort; count = (skBag_counter_t)rwrec.bytes; break; case DPORT_BYTES: key = (skBag_key_t)rwrec.dPort; count = (skBag_counter_t)rwrec.bytes; break; case PROTO_BYTES: key = (skBag_key_t)rwrec.proto; count = (skBag_counter_t)rwrec.bytes; break; case SID_BYTES: key = (skBag_key_t)rwrec.sID; count = (skBag_counter_t)rwrec.bytes; break; case INPUT_BYTES: key = (skBag_key_t)rwrec.input; count = (skBag_counter_t)rwrec.bytes; break; case OUTPUT_BYTES: key = (skBag_key_t)rwrec.output; count = (skBag_counter_t)rwrec.bytes; break; default: assert(0); abort(); } if ((max_val - count) < array_io[i].array[key]) { array_overflow_warning[i] = 1; skAppPrintErr("**WARNING** %s overflow for bag %s", appOptions[i+AB_OFFSET].name, skStreamGetPathname(array_io[i].stream)); array_io[i].array[key] = max_val; } else { array_io[i].array[key] += count; } } } /* Successful if we make it here */ rv = 0; END: rwCloseFile(rwIOS); return rv; } int main(int argc, char **argv) { int i; appSetup(argc, argv); /* never returns on error */ if (arg_index == argc) { if (0 != processFile("stdin")) { return 1; } } else { for (i = arg_index; i < argc; ++i) { if (0 != processFile(argv[i])) { skAppPrintErr("Error processing input file %s", argv[i]); return 1; } } } /* write out the IP-based bags */ for (i = 0; i < NUM_OUTPUTS_IP; ++i) { if (bag_io[i].bag) { if (SKBAG_OK != skBag_writeBinary(bag_io[i].bag, bag_io[i].stream)) { skAppPrintErr("Error writing to %s", skStreamGetPathname(bag_io[i].stream)); } } } /* write out the array-based-bags */ for (i = 0; i < NUM_OUTPUTS_ARRAY; ++i) { if (array_io[i].array) { if (0 != skBag_writeArray(array_io[i].array, array_io[i].size, array_io[i].stream)) { skAppPrintErr("Error writing to %s", skStreamGetPathname(array_io[i].stream)); } } } /* done */ appTeardown(); return 0; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */