/* ** Copyright (C) 2001-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** rwcount.c ** ** This is a counting application; given the records read from stdin ** or named files, it generates counting results for the time period ** covered. */ #include "silk.h" RCSIDENT("$SiLK: rwcount.c 8269 2007-08-03 18:54:48Z mthomas $"); #include "rwcount.h" /* LOCAL DEFINES AND TYPEDEFS */ /* Where to write filenames if --print-file specified */ #define PRINT_FILENAMES_FH stderr /* note that the variables are defined below */ #define GET_BIN(t) \ (ROW_BINS * \ (((t) - countData.fileHeader->initOffset) / countData.fileHeader->binSize)) /* This macro is TRUE if the time 't' is too large (or too small) to * fit into the current countFile 'countData' */ #define TIME_OUT_OF_RANGE(t) \ (((t) < countData.fileHeader->initOffset) || \ ((GET_BIN(t) + ROW_BINS - 1) >= \ (ROW_BINS * countData.fileHeader->totalBins))) /* The number of seconds of data to allow for when re/allocating the * bins; allow a 30 day buffer */ #define BINS_EXTRA_SECONDS (30 * DAY_SECS) /* EXPORTED VARIABLES */ iochecksInfoStruct_t *ioISP; rwIOStruct_t *rwIOS; countFile countData; int printFNameFlag = 0; int dryRunFlag = 0; /* name of program to run to page output */ char *pager = NULL; /* LOCAL FUNCTIONS */ static int countPackFile(const char *inFName); static int initFileHeader(void); static void reallocBins(uint32_t time); /* LOCAL VARIABLES */ static int (*addFunction)(const rwRec *); /* OPTIONS SETUP */ /* options defined in rwcountutils.c */ /* * void initFileHeader(void) * * DESCRIPTION * * Allocate memory for the fileHeader and prep the generic header * * PARAMETERS * * none * * RETURNS * * 0 on success. 1 if allocation fails. */ static int initFileHeader(void) { /* Why do we dynamically allocate this? */ countData.fileHeader = (countFileHeader*)calloc(1,sizeof(countFileHeader)); if (!countData.fileHeader) { return 1; } /* * prep the generic header */ PREPHEADER(&(countData.fileHeader->gHdr)); countData.fileHeader->gHdr.isBigEndian = IS_BIG_ENDIAN; countData.fileHeader->gHdr.type = FT_RWCOUNT; countData.fileHeader->gHdr.version = RWCO_VERSION; countData.fileHeader->bin_load_scheme = DEFAULT_LOAD_SCHEME; countData.fileHeader->bin_label_flag = BIN_LABEL_GMT; countData.fileHeader->no_titles = 0; countData.fileHeader->no_columns = 0; countData.fileHeader->skip_zeroes = 0; countData.fileHeader->start_epoch = RWCO_START_EPOCH_UNSET; countData.fileHeader->delimiter = '|'; countData.fileHeader->binSize = DEFAULT_BINSIZE; return 0; } /* * void reallocBins(uint32_t time) * * DESCRIPTION * * Reallocate memory for the bins. * * PARAMETERS * * time: the time for the bin we want to insert data into * * RETURNS * * n/a * * SIDE EFFECTS * * Exits application if realloc fails */ static void reallocBins(uint32_t time) { countFileHeader *cfh = countData.fileHeader; double *newptr; uint32_t extensionBins; size_t old_array_len; assert(TIME_OUT_OF_RANGE(time)); /* Always extend the rear of array, no matter which end we * actually overflow on. Afterwards, we'll check if it's the * front, and shift data around. */ if ( time < cfh->initOffset ) { /* To extend front, we want to add enough room to cover the time * we're trying to insert--plus an additional 30 days. */ extensionBins = ((cfh->initOffset - time + BINS_EXTRA_SECONDS) / cfh->binSize); } else { /* To extend rear, we want to add enough room to cover the * time we're trying to insert, plus an additional 30 * days. Slightly different calc since we don't have the * finalOffset. */ extensionBins = (((time - cfh->initOffset + BINS_EXTRA_SECONDS) / cfh->binSize) - cfh->totalBins); } old_array_len = cfh->totalBins * ROW_BINS; cfh->totalBins += extensionBins; countData.binMallocSize = ROW_BINS * cfh->totalBins * sizeof(double); newptr = (double *) realloc(countData.bins, countData.binMallocSize); if (!newptr) { skAppPrintErr("bin size isn't large enough" " and can't be reallocated"); exit(EXIT_FAILURE); } if ( time < cfh->initOffset ) { /* Since realloc maintains the front of the array, we need * to shift things around a bit now, so that the empty * space is at the front. */ memmove(newptr + (extensionBins * ROW_BINS), newptr, old_array_len * sizeof(double)); /* And we need to clear out all of the space before what we * just moved into place. */ memset(newptr, 0, (extensionBins * ROW_BINS * sizeof(double))); /* And adjust the initOffset for the new setup. */ cfh->initOffset = cfh->initOffset - (extensionBins * cfh->binSize); } else { /* bzero the newly allocated space */ memset((newptr + old_array_len), 0, (extensionBins * ROW_BINS * sizeof(double))); } countData.bins = newptr; } /* * int initBins(start_time) * * initBins allocates time bins based on an initial guess time and * the range of days its told to allocate for. * * parameters: * guessTime: the guess Time. This is the pivot time used for the guessing * process. * length: the length of the bins, in seconds. * * notes: * DO NOT CALL THIS TWICE. * * initBins is the clean allocation function - it sets initial * environmental variables in addition to memory allocation. Calling * it twice will trigger an error. * * returns: * 0 on success * */ static int initBins(uint32_t start_time) { if (! countData.fileHeader) { initFileHeader(); } countData.fileHeader->initOffset = start_time; countData.fileHeader->totalBins = 1 + (BINS_EXTRA_SECONDS / countData.fileHeader->binSize); /* * Now do the allocation. */ countData.binMallocSize = ROW_BINS * countData.fileHeader->totalBins * sizeof(double); countData.bins = (double *)calloc(countData.binMallocSize, 1); /* * and we're done! */ if (!countData.bins) { return RWCO_ERR_MALLOC; } else { return 0; } } /* * t = calcStartTime(rec_stime); * * Given a start-time for an rwrec 'rec_stime', calculate the time * to use as the starting time for the bins. */ static uint32_t calcStartTime(uint32_t t) { /* If the user specified the start_epoch, use it unconditionally. * Note that this may cause problems memory problems later if the * user's start_epoch is much earlier than the times on the * records she is reading. */ if (countData.fileHeader->start_epoch != 0) { return (countData.fileHeader->start_epoch); } /* Move 't' to the start of the day before yesterday */ return (t - (t % DAY_SECS) - 2 * DAY_SECS); } /* * status = startAdd(rwrec); * * Front-loaded record addition. In the front-loaded * implementation, the bytes and packets are added to the first bin * relevant to the record. * * Returns 0 on success. */ static int startAdd(const rwRec *rwrec) { uint32_t tgtBin; uint32_t t = rwrec->sTime; if (t < countData.fileHeader->start_epoch) { /* user not interested in this flow */ return 0; } if ( TIME_OUT_OF_RANGE(t) ) { reallocBins(t); } tgtBin = GET_BIN(t); countData.bins[tgtBin + RECS]++; countData.bins[tgtBin + BYTES] += rwrec->bytes; countData.bins[tgtBin + PKTS] += rwrec->pkts; return 0; } /* * endAdd(rwrec); * * Add the bytes and packets to the final bin relevant to the * record. * * Returns 0 on success. */ static int endAdd(const rwRec *rwrec) { uint32_t tgtBin; uint32_t t = rwrec->sTime + rwrec->elapsed; if (t < countData.fileHeader->start_epoch) { /* user not interested in this flow */ return 0; } if ( TIME_OUT_OF_RANGE(t) ) { reallocBins(t); } tgtBin = GET_BIN(t); countData.bins[tgtBin + RECS]++; countData.bins[tgtBin + BYTES] += rwrec->bytes; countData.bins[tgtBin + PKTS] += rwrec->pkts; return 0; } /* * midAdd(rwrec); * * Add the bytes and packets to the middle bin relevant to the * flow. * * Returns 0 on success. */ static int midAdd(const rwRec *rwrec) { uint32_t tgtBin; uint32_t t = rwrec->sTime + (rwrec->elapsed / 2); if (t < countData.fileHeader->start_epoch) { /* user not interested in this flow */ return 0; } if ( TIME_OUT_OF_RANGE(t) ) { reallocBins(t); } tgtBin = GET_BIN(t); countData.bins[tgtBin + RECS]++; countData.bins[tgtBin + BYTES] += rwrec->bytes; countData.bins[tgtBin + PKTS] += rwrec->pkts; return 0; } /* * status = meanAdd(rwrec); * * Equally distribute the record among all the BINs by adding the * mean of the bytes and packets to each bin. Note that a * particularly placed 32 second record will be equally distributed * among three 30 second bins. * * Returns 0 on success. */ static int meanAdd(const rwRec *rwrec) { uint32_t start_bin, end_bin, i; uint32_t negative_bins = 0; uint32_t eTime = rwrec->sTime + rwrec->elapsed; double flows, bytes, pkts; if (eTime < countData.fileHeader->start_epoch) { /* user not interested in this flow */ return 0; } if (rwrec->sTime < countData.fileHeader->start_epoch) { /* ugh. user is interested in part of the flow; set * negative_bins to the number of bins the flow covers before * the start_epoch (which equals the initOffset). We don't * want to allow the bins to grow to earlier times. Must * expand GET_BIN() macro here to reverse the times. */ negative_bins = ((((countData.fileHeader->initOffset - rwrec->sTime) / countData.fileHeader->binSize) + 1) * ROW_BINS); } else { /* maybe grow the bins to allow for the start time */ if ( TIME_OUT_OF_RANGE(rwrec->sTime) ) { reallocBins(rwrec->sTime); } } if ( TIME_OUT_OF_RANGE(eTime) ) { reallocBins(eTime); } end_bin = GET_BIN(eTime); if (negative_bins > 0) { start_bin = 0; } else { start_bin = GET_BIN(rwrec->sTime); if (start_bin == end_bin) { /* handle simple case where everything is in one bin */ countData.bins[start_bin + RECS]++; countData.bins[start_bin + BYTES] += rwrec->bytes; countData.bins[start_bin + PKTS] += rwrec->pkts; return 0; } } /* * Compute the amount of the flow to allocate to each bin. * Logically, this value is 1/number-of-bins which is given by the * following, but because the bins are multiples of ROW_BINS, the * actual formula is a bit more complicated. * * 1 / (end_bin - start_bin + negative_bins + 1) */ flows = (((double)ROW_BINS) / ((double)(end_bin - start_bin + negative_bins + ROW_BINS))); bytes = (double)rwrec->bytes * flows; pkts = (double)rwrec->pkts * flows; for (i = start_bin; i <= end_bin; i += ROW_BINS) { countData.bins[i + RECS] += flows; countData.bins[i + BYTES] += bytes; countData.bins[i + PKTS] += pkts; } return 0; } /* * status = durationAdd(rwrec); * * Divide the flow evenly across each second in the flow, and * then apply that value to each bin according to the number of * seconds the flow spent in that bin. * * Returns 0 on success. */ static int durationAdd(const rwRec *rwrec) { uint32_t start_bin, end_bin, i; uint32_t bin_time; uint32_t eTime = rwrec->sTime + rwrec->elapsed; double flows, bytes, pkts; if (eTime < countData.fileHeader->start_epoch) { /* user not interested in this flow */ return 0; } /* grow the bins to allow for this record's start time, but don't * move the bins earlier than the start_epoch. */ if ((rwrec->sTime >= countData.fileHeader->start_epoch) && TIME_OUT_OF_RANGE(rwrec->sTime)) { reallocBins(rwrec->sTime); } /* grow the bins to allow for the end time */ if (TIME_OUT_OF_RANGE(eTime)) { reallocBins(eTime); } /* get the amount of data per second */ flows = 1.0 / (1.0 + (double)rwrec->elapsed); bytes = (double)rwrec->bytes * flows; pkts = (double)rwrec->pkts * flows; /* find the ending and starting bins */ end_bin = GET_BIN(eTime); if (rwrec->sTime < countData.fileHeader->start_epoch) { /* flow started before start_epoch; we will use all of this * bin---unless eTime occurs in this bin, which we will handle * below */ start_bin = 0; } else { start_bin = GET_BIN(rwrec->sTime); if (start_bin == end_bin) { /* handle simple case where everything is in one bin */ countData.bins[start_bin + RECS]++; countData.bins[start_bin + BYTES] += rwrec->bytes; countData.bins[start_bin + PKTS] += rwrec->pkts; return 0; } /* handle the part of the flow that occurs in the start_bin: * find the start of the next bin, and subtract sTime from it * to get number of relevant seconds in this bin. */ bin_time = (((1 + (start_bin / ROW_BINS)) * countData.fileHeader->binSize) + countData.fileHeader->initOffset); assert(bin_time > rwrec->sTime); countData.bins[start_bin + RECS] += ((double)(bin_time - rwrec->sTime) * flows); countData.bins[start_bin + BYTES] += ((double)(bin_time - rwrec->sTime) * bytes); countData.bins[start_bin + PKTS] += ((double)(bin_time - rwrec->sTime) * pkts); /* move start_bin to the next row */ start_bin += ROW_BINS; } /* handle the part of the flow that occurs in the end_bin: find * the start of the this bin, and subtract it from eTime to get * number of relevant seconds in this bin. Add a second here * since at least part of the flow must be active in this * interval. */ bin_time = (((end_bin / ROW_BINS) * countData.fileHeader->binSize) + countData.fileHeader->initOffset); assert(bin_time <= eTime); countData.bins[end_bin + RECS] += flows * (double)(eTime - bin_time + 1); countData.bins[end_bin + BYTES] += bytes * (double)(eTime - bin_time + 1); countData.bins[end_bin + PKTS] += pkts * (double)(eTime - bin_time + 1); if (start_bin == end_bin) { /* flow started and ended in adjacent bins */ return 0; } /* Handle the bins that had complete coverage */ flows *= (double)countData.fileHeader->binSize; bytes *= (double)countData.fileHeader->binSize; pkts *= (double)countData.fileHeader->binSize; for (i = start_bin; i < end_bin; i += ROW_BINS) { countData.bins[i + RECS] += flows; countData.bins[i + BYTES] += bytes; countData.bins[i + PKTS] += pkts; } return 0; } /* * countFile: * This is the actuall filtering of a file. Also handles * dumping out the header if required. */ static int countPackFile(const char *inFName) { /* protect initBins from multiple calls */ static int initDone = 0; rwRec rwrec; if (dryRunFlag) { fprintf(stdout, "%s\n", inFName); return 0; } /* open the file */ rwIOS = rwOpenFile(inFName, ioISP->inputCopyFD); if (rwIOS == NULL) { /* some error. the library would have dumped a msg */ return 1; } if (printFNameFlag) { fprintf(PRINT_FILENAMES_FH, "%s\n", rwGetFileName(rwIOS)); } /* read records, initializing if necessary */ if(!initDone) { uint32_t t; initDone = 1; if (! rwRead(rwIOS, &rwrec)) { return 0; } t = calcStartTime(rwrec.sTime); if (initBins(t) == RWCO_ERR_MALLOC) { skAppPrintErr("cannot allocate space for bins. " "Try a larger bin size or fewer records"); return 1; } addFunction(&rwrec); } while (rwRead(rwIOS, &rwrec)) { addFunction(&rwrec); } /* close file */ rwCloseFile(rwIOS); rwIOS = NULL; return 0; } /* * int main(int argc, char **argv) * * DESCRIPTION * * main function, this is currently a bit too bulky since this app is * still more or less being felt out. * * PARAMETERS * * * */ int main(int argc, char ** argv) { int counter; int using_pager = 0; FILE *stream_out; /* Options will (re)set values in the header, so initialize it first */ if ( initFileHeader() ) { skAppPrintErr("Cannot initialize header."); exit(EXIT_FAILURE); } appSetup(argc, argv); stream_out = ioISP->passFD[0]; switch((bin_load_scheme_enum_t)countData.fileHeader->bin_load_scheme) { case LOAD_START: addFunction = &startAdd; break; case LOAD_END: addFunction = &endAdd; break; case LOAD_MIDDLE: addFunction = &midAdd; break; case LOAD_MEAN: addFunction = &meanAdd; break; case LOAD_DURATION: addFunction = &durationAdd; break; default: skAppPrintErr("Option %d not handled in switch() at %s:%d", countData.fileHeader->bin_load_scheme, __FILE__, __LINE__); assert(0); abort(); } /* get files to process from the command line */ for(counter = ioISP->firstFile; counter < ioISP->fileCount ; counter++) { if (countPackFile(ioISP->fnArray[counter])) { break; } } /* Print the records */ if (isRunningOnNode()) { dumpBinary(stdout, &countData); } else { /* Invoke the pager */ using_pager = skOpenPagerWhenStdoutTty(&stream_out, &pager); if (using_pager < 0) { exit(EXIT_FAILURE); } /* print records */ dumpText(stream_out, &countData); /* close pager */ if (using_pager) { skClosePager(stream_out, pager); } } appTeardown(); return 0; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */