/* ** Copyright (C) 2005-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** rwgroup ** ** rwgroup is a 'fuzzy uniq' utility that can be used to group together ** records into 'groups' of fields; to work properly, it ** requires a sorted set of fields, because it can only do record-record ** comparison. ** ** command arguments: ** ** --id-fields: id fields are fields which have to be identical to ** define a group. ** --delta-field: delta field is the field which is allowed to have a ** delta value ** --delta-value: This is the maximum value by which two consecutive records ** can differ before they're considered parts of different groups. It ** Is an integer, and represents an inclusive value, so if delta is 1 and ** the field is stime, then records which are a second apart will be ** considered part of the same group. ** --threshold: minimum number of records to output ** --summarize: Instead of printing out a record with a grouped id, this will ** print out a summarized record with stime=min time, ** etime=max time, bytes and packets summarized. ** ** The way that this application works is as follows: a 'group' is ** a set of records where the id-fields are identical, and the ** delta-field changes within delta. Each group is identified by a ** unique group id stored in next hop ip; group id's start at 0 ** and continue on from there. ** ** */ #include "silk.h" RCSIDENT("$SiLK: rwgroup.c 6081 2007-01-22 19:25:22Z mthomas $"); #include "utils.h" #include "rwpack.h" /* TYPEDEFS AND DEFINES */ /* File handle for --help output */ #define USAGE_FH stdout #define MAX_FIELDS 8 #define MAX_THRESH 65535 typedef enum _fieldsEnum { RWSEQ_FID_SIP = 1, RWSEQ_FID_DIP, RWSEQ_FID_SPORT, RWSEQ_FID_DPORT, RWSEQ_FID_PROTO, RWSEQ_FID_PKTS, RWSEQ_FID_BYTES, RWSEQ_FID_FLAGS, RWSEQ_FID_STIME, RWSEQ_FID_DUR, RWSEQ_FID_ETIME, RWSEQ_FID_SENSOR } fieldsEnum; /* LOCAL FUNCTIONS */ static void appUsageLong(void); /* never returns */ static void appTeardown(void); static void appSetup(int argc, char **argv); /* never returns */ static int appOptionsHandler(clientData cData, int opt_index, char *opt_arg); static int cmp_field( int field_id, const rwRec *current_rec, const rwRec *ref_rec, uint32_t delta); static int process_rec(rwRec *input_rec, int force); /* LOCAL VARIABLES */ static uint8_t mask_field_id[MAX_FIELDS]; static uint8_t field_count = 0; static uint8_t delta_field_id = 0; static uint16_t threshold; static uint32_t delta, group_id, rec_id; static rwRec *rec_thresh_buffer; static rwIOStruct *out_rwios, *in_rwios; static uint8_t objective_flag = 0; static uint8_t summarize_flag = 0; static const char *in_fname = NULL; static const char *out_fname = NULL; /* OPTIONS SETUP */ typedef enum _appOptionsEnum { OPT_ID_FIELDS, OPT_DELTA_FIELD, OPT_DELTA_VALUE, OPT_REC_THRESHOLD, OPT_OBJECTIVE, OPT_INPIPE, OPT_OUTPIPE, OPT_SUMMARIZE } appOptionsEnum; static struct option appOptions[] = { {"id-fields", REQUIRED_ARG, 0, OPT_ID_FIELDS}, {"delta-field", REQUIRED_ARG, 0, OPT_DELTA_FIELD}, {"delta-value", REQUIRED_ARG, 0, OPT_DELTA_VALUE}, {"rec-threshold", REQUIRED_ARG, 0, OPT_REC_THRESHOLD}, {"objective", NO_ARG, 0, OPT_OBJECTIVE}, {"summarize", NO_ARG, 0, OPT_SUMMARIZE}, {0,0,0,0} /* sentinel entry */ }; static const char *appHelp[] = { ("List of fields that must be identical among all records\n" "\tin a group; accepts multiple fields. Field IDs:\n" "\t 1 source IP 5 protocol 9 start time \n" "\t 2 destination IP 6 packets 10 duration \n" "\t 3 source port 7 bytes 11 end time \n" "\t 4 destination port 8 TCP flags 12 sensor "), "Field which can differ within the group, only one allowed", ("Maximum difference allowed between delta-field values\n" "\tbefore the records are considered as being in different groups"), ("Minimum number of records a group must have before\n" "\tit will be written to the output stream. Def. 1"), ("Count values objectively; all delta-field values must be\n" "\twithin the delta-value of the first record in the group. Normally,\n" "\tthe delta-field value of consecutive records is compared. Def. No"), ("Output a summary (a single record) for each group rather\n" "\tthan a all the records in the group. Def. No"), (char *)NULL }; /* FUNCTION DEFINITIONS */ /* * appUsageLong(); * * Print complete usage information to USAGE_FH. Pass this * function to skOptionsSetUsageCallback(); optionsParse() will * call this funciton and then exit the program when the --help * option is given. */ static void appUsageLong(void) { #define USAGE_MSG \ ("[SWITCHES]\n" \ "\tGroups flows together by specified id-fields and delta-field;\n" \ "\tmarks the group ID in next hop IP; requires pre-sorting.\n") FILE *fh = USAGE_FH; skAppStandardUsage(fh, USAGE_MSG, appOptions, appHelp); } /* * appSetup(argc, argv); * * Perform all the setup for this application include setting up * required modules, parsing options, etc. This function should be * passed the same arguments that were passed into main(). * * Returns to the caller if all setup succeeds. If anything fails, * this function will cause the application to exit with a FAILURE * exit status. */ static void appSetup(int argc, char **argv) { int arg_index; /* verify same number of options and help strings */ assert((sizeof(appHelp)/sizeof(char *)) == (sizeof(appOptions)/sizeof(struct option))); /* register the application */ skAppRegister(argv[0]); skOptionsSetUsageCallback(&appUsageLong); /* initialize globals */ group_id = rec_id = 0; threshold = 0; rec_thresh_buffer = NULL; /* register the options */ if (optionsRegister(appOptions, (optHandler)appOptionsHandler, NULL)) { skAppPrintErr("unable to register options"); exit(EXIT_FAILURE); } /* parse options */ arg_index = optionsParse(argc, argv); if (arg_index < 0) { skAppUsage(); /* never returns */ } /* check for extra options */ if (arg_index != argc) { skAppPrintErr(("Too many arguments or unrecognized switch '%s'\n" "\trwgroup's input must come from stdin."), argv[arg_index]); skAppUsage(); /* never returns */ } /* must have fields to match */ if ((field_count == 0) && (delta_field_id == 0)) { skAppPrintErr("No fields specified; must specify --%s or --%s", appOptions[OPT_ID_FIELDS].name, appOptions[OPT_DELTA_FIELD].name); skAppUsage(); /* never returns */ } /* if a delta field, a delta value is required, and vice versa */ if ((delta_field_id > 0) && (delta == 0)) { skAppPrintErr("No %s specified for the %s", appOptions[OPT_DELTA_VALUE].name, appOptions[OPT_DELTA_FIELD].name); skAppUsage(); /* never returns */ } if ((delta != 0) && (delta_field_id == 0)) { skAppPrintErr("--%s only allowed when --%s is specified", appOptions[OPT_DELTA_VALUE].name, appOptions[OPT_DELTA_FIELD].name); skAppUsage(); /* never returns */ } /* create threshold buffer */ if (threshold) { rec_thresh_buffer = (rwRec *)calloc(threshold + 1, sizeof(rwRec)); if (rec_thresh_buffer == NULL) { skAppPrintErr("Out of memory"); exit(EXIT_FAILURE); } } /* * Opens output IO Stream; if that fails, it terminates */ if (out_fname == NULL) { out_fname = "stdout"; } if (rwioCreate(&out_rwios, out_fname, SK_RWIO_WRITE) || rwioOpen(out_rwios)) { skAppPrintErr("Couldn't open %s for writing. Exiting", out_fname); exit(EXIT_FAILURE); } /* * Deprecated for now, but functional. IO input and output defaults * and can only be stdin/stdout. */ if (in_fname == NULL) { in_fname = "stdin"; } if (rwioCreate(&in_rwios, in_fname, SK_RWIO_READ) || rwioOpen(in_rwios)) { skAppPrintErr("Couldn't open %s for reading. Exiting.", in_fname); exit(EXIT_FAILURE); } if (atexit(appTeardown) < 0) { skAppPrintErr("unable to register appTeardown() with atexit()"); appTeardown(); exit(EXIT_FAILURE); } return; /* OK */ } /* * appTeardown() * * Teardown all modules, close all files, and tidy up all * application state. * * This function is idempotent. */ static void appTeardown(void) { static int teardownFlag = 0; if (teardownFlag) { return; } teardownFlag = 1; skAppUnregister(); } /* * status = appOptionsHandler(cData, opt_index, opt_arg); * * This function is passed to optionsRegister(); it will be called * by optionsParse() for each user-specified switch that the * application has registered; it should handle the switch as * required---typically by setting global variables---and return 1 * if the switch processing failed or 0 if it succeeded. Returning * a non-zero from from the handler causes optionsParse() to return * a negative value. * * The clientData in 'cData' is typically ignored; 'opt_index' is * the index number that was specified as the last value for each * struct option in appOptions[]; 'opt_arg' is the user's argument * to the switch for options that have a REQUIRED_ARG or an * OPTIONAL_ARG. */ static int appOptionsHandler( clientData UNUSED(cData), int opt_index, char *opt_arg) { uint32_t i; uint32_t *parsed_list; uint32_t count; uint32_t opt_value; int rv; switch ((appOptionsEnum)opt_index) { case OPT_ID_FIELDS: if (field_count > 0) { skAppPrintErr("--%s switch provided multiple times", appOptions[opt_index].name); return 1; } if (skStringParseNumberList(&parsed_list, &count, opt_arg, RWSEQ_FID_SIP, RWSEQ_FID_SENSOR, MAX_FIELDS)) { /* Error: failed to successfully parse */ return 1; } /* else number list was successfully parsed */ assert(count > 0 && count <= MAX_FIELDS); for (i = 0 ; i < count ; i++) { mask_field_id[field_count] = (uint8_t)parsed_list[i]; ++field_count; } free(parsed_list); break; case OPT_DELTA_FIELD: rv = skStringParseUint32(&opt_value, opt_arg, RWSEQ_FID_SIP, RWSEQ_FID_SENSOR); if (rv < -10) { skAppPrintErr("--%s='%u' illegal; valid range %u-%u", appOptions[opt_index].name, opt_value, RWSEQ_FID_SIP, RWSEQ_FID_SENSOR); return 1; } if (rv != 0) { skAppPrintErr("Cannot parse --%s value '%s' as a single integer", appOptions[opt_index].name, opt_arg); return 1; } delta_field_id = (uint8_t)(opt_value & 0x000000FF); break; case OPT_DELTA_VALUE: /* allow any 32bit value */ rv = skStringParseUint32(&opt_value, opt_arg, 0, 0); if (rv != 0) { skAppPrintErr("Cannot parse --%s value '%s' as a single integer", appOptions[opt_index].name, opt_arg); return 1; } delta = opt_value; break; case OPT_REC_THRESHOLD: rv = skStringParseUint32(&opt_value, opt_arg, 0, MAX_THRESH); if (rv < -10) { skAppPrintErr("--%s='%u' illegal; valid range 0-%u", appOptions[opt_index].name, opt_value, MAX_THRESH); return 1; } if (rv != 0) { skAppPrintErr("Cannot parse --%s value '%s' as a single integer", appOptions[opt_index].name, opt_arg); return 1; } threshold = (uint16_t)opt_value; break; case OPT_SUMMARIZE: summarize_flag = 1; break; case OPT_OBJECTIVE: objective_flag = 1; return 0; case OPT_INPIPE: in_fname = opt_arg; break; case OPT_OUTPIPE: out_fname = opt_arg; break; } return 0; /* OK */ } /* * cmp_field * * There's probably somebody gasping in surprise that I didn't write this * as a giant macro, but I'm chanting "power law distribution" "power law * distribution" to myself here. * * cmp_field takes an id, a value and a delta and returns 0 if the value is * equal to the field +/- delta, it returns 0 if the values are equal (within * delta), 1 otherwise. */ static int cmp_field( int field_id, const rwRec *current_rec, const rwRec *ref_rec, uint32_t delta) { uint32_t result, ref_value, field_value; field_value = 0; ref_value = 0; switch ((fieldsEnum)field_id) { case RWSEQ_FID_SIP: field_value = current_rec->sIP.ipnum; ref_value = ref_rec->sIP.ipnum; break; case RWSEQ_FID_DIP: field_value = current_rec->dIP.ipnum; ref_value = ref_rec->dIP.ipnum; break; case RWSEQ_FID_SPORT: field_value = current_rec->sPort; ref_value = ref_rec->sPort; break; case RWSEQ_FID_DPORT: field_value = current_rec->dPort; ref_value = ref_rec->dPort; break; case RWSEQ_FID_PROTO: field_value = current_rec->proto; ref_value = ref_rec->proto; break; case RWSEQ_FID_PKTS: field_value = current_rec->pkts; ref_value = ref_rec->pkts; break; case RWSEQ_FID_BYTES: field_value = current_rec->bytes; ref_value = ref_rec->bytes; break; case RWSEQ_FID_FLAGS: field_value = current_rec->flags; ref_value = ref_rec->flags; break; case RWSEQ_FID_STIME: field_value = current_rec->sTime; ref_value = ref_rec->sTime; break; case RWSEQ_FID_DUR: field_value = current_rec->elapsed; ref_value = ref_rec->elapsed; break; case RWSEQ_FID_ETIME: field_value = current_rec->sTime + current_rec->elapsed; ref_value = ref_rec->sTime + current_rec->elapsed; break; case RWSEQ_FID_SENSOR: field_value = current_rec->sID; ref_value = ref_rec->sID; break; } /* get difference as a positive value and compare to delta */ result = ((field_value > ref_value) ? (field_value - ref_value) : (ref_value - field_value)); if (result <= delta) { return 0; } else { return 1; } } /* * int process_recs * Called on all records except the first; it compares the * new records against the group and generates a group * number as necessary. * * flags: current_rec: the record that we're actually processing * force: instead of processing the record, output whatever we would * at this time. force handles the terminal corner case and * makes sure we print out all records in the file. */ static int process_rec(rwRec *current_rec, int force) { uint8_t i; uint8_t increase = 0; static rwRec last_rec; static rwRec accumulator; static uint32_t current_group_count; uint32_t old_etime, new_etime; if (! force) { increase = 0; if (rec_id > 0) { for (i = 0 ; i < field_count ; i++) { if (cmp_field(mask_field_id[i], current_rec, &last_rec, 0)) { increase = 1; break; } } if (!increase && delta > 0) { if (cmp_field(delta_field_id, current_rec, &last_rec, delta)) { increase = 1; } } } else { /* * Escape clause to manage the first record case, csc is set * to 0 */ memcpy(&last_rec, current_rec, sizeof(rwRec)); memcpy(&accumulator, current_rec, sizeof(rwRec)); accumulator.bytes = 0; accumulator.pkts = 0; accumulator.flags = 0; current_group_count = 1; } group_id += increase; }; /* * Reset the current group count if we've moved on to the * next value */ /* * Backup - store in last rec; if the threshold is being * used, then copy to the threshold buffer. */ if(summarize_flag) { /* * 2005/09/28: New functionality added by MC. This extends * functionality by adding the ability to just output one * record if the requirements are met. * * 2006/07/18: Updated functionality by MC; the previous version * updated the last_rec value in place with cumulative values; I've * slapped in this accumulator which does that for me. */ current_rec->nhIP.ipnum = group_id; if(!objective_flag) { memcpy(&last_rec, current_rec, sizeof(rwRec)); }; if (increase || force) { accumulator.nhIP.ipnum = group_id - (force ? 0: 1); if((current_group_count >= threshold) || force) { rwWrite(out_rwios, &accumulator); memcpy(&last_rec, current_rec, sizeof(rwRec)); } memcpy(&accumulator, current_rec, sizeof(rwRec)); } else { accumulator.pkts += current_rec->pkts; accumulator.bytes += current_rec->bytes; accumulator.flags |= current_rec->flags; if (accumulator.sTime >= current_rec->sTime) { accumulator.sTime = current_rec->sTime; } old_etime = accumulator.sTime + accumulator.elapsed; new_etime = current_rec->sTime + current_rec->elapsed; if (accumulator.sTime > current_rec->sTime) { accumulator.sTime = current_rec->sTime; } if (old_etime < new_etime) { accumulator.elapsed = new_etime - accumulator.sTime; } } /* * Note the movement of CGC increase to the end of this clause; this is * due to accumulation being checked and printed here */ current_group_count = ((increase || (!rec_id)) ? 1 : (current_group_count + 1)); } else { /* * Note the movement of current group count into two * subclauses; the reason for this is that cumulative output * (via summarize) can only be printed when an increase is * specified. That means that the cgc can't be update until * *after* summarization takes place */ current_group_count = ((increase || (!rec_id)) ? 1 : (current_group_count + 1)); if ((increase) || ((!increase) && (!objective_flag))) { memcpy(&last_rec, current_rec, sizeof(rwRec)); } /* * if the group count exceeds the threshold, print the current record */ current_rec->nhIP.ipnum = group_id; if (current_group_count > threshold) { rwWrite(out_rwios, current_rec); } else { /* * If it's less, we just copy the record to the threshold buffer */ if (current_group_count < threshold) { memcpy(rec_thresh_buffer + (current_group_count - 1), current_rec, sizeof(rwRec)); } else { for (i = 0 ; i < threshold - 1; i++) { rwWrite(out_rwios, rec_thresh_buffer + i); } rwWrite(out_rwios, current_rec); } } } rec_id++; return 0; } int main(int argc, char **argv) { rwRec input_rec; appSetup(argc, argv); /* never returns on error */ while (rwRead(in_rwios, &input_rec)) { process_rec(&input_rec, 0); }; if(summarize_flag) { process_rec(&input_rec, 1); }; rwCloseFile(in_rwios); rwCloseFile(out_rwios); /* done */ appTeardown(); return 0; } /* ** Local variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */