/* ** Copyright (C) 2004-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** A "plug-in" (currently compile-time only) for rwflowpack to ** process FloCap records from a directory. ** */ #include "silk.h" RCSIDENT("$SiLK: fcfilesreader.c 6513 2007-03-01 21:43:26Z mwd $"); #include "sksite.h" #include "rwflowpack.h" #if SK_ENABLE_FLOWCAP #include "skpolldir.h" #include "flowcap-source.h" #include "convert-flowcap.h" #include "lzo-file.h" /* MACROS and DATA TYPES */ /* A name for this reader type. */ #define READER_TYPE_NAME "FlowCap Files Reader" /* PRIVATE FUNCTIONS */ static int readerSetup( fp_daemon_mode_t *is_daemon, const sk_vector_t *probe_vec, reader_options_t *options); static int readerStart(flow_proc_t *fproc); static int readerStop(flow_proc_t *fproc); static fp_get_record_result_t readerGetRecord( rwRec *out_rwrec, const probe_def_t **out_probe, flow_proc_t *fproc); static int readerWantProbe(probe_def_t *probe); /* PRIVATE VARIABLES */ /* The directory flowcap files mode will poll for new flowcap * files to process */ static const char *incoming_directory = NULL; /* Vector of probes to operate on; needed for Flowcap V1 */ static const sk_vector_t *g_probe_vec = NULL; /* Directory polling information */ static skPollDirQueue_t pdq = NULL; static skPollDir_t polldir = NULL; static uint32_t polling_interval; /* FUNCTION DEFINITIONS */ /* * readerStart(); * * Creates a polldir object and queue */ static int readerStart(flow_proc_t UNUSED(*fproc)) { /* Set up directory polling */ pdq = skPollDirQueueCreate(); if (NULL == pdq) { CRITMSG("Could not create polldir queue."); return 1; } polldir = skPollDirCreate(incoming_directory, polling_interval, pdq); if (NULL == polldir) { CRITMSG("Could not initiate polling for %s", incoming_directory); return 1; } return 0; } /* * pduReaderStop(); * * Destroy the receiver. * * Invoked by reader_type->stop_fn(); */ static int readerStop(flow_proc_t UNUSED(*fproc)) { /* End the file thread. */ if (polldir) { skPollDirDestroy(polldir); polldir = NULL; } if (pdq) { skPollDirQueueDestroy(pdq); pdq = NULL; } return 0; } /* * readerGetNextValidFile(&fc_src, &probe); * * Pull the next file name off of the valid-queue and create a * flowsource object to read the flowcap records in it. Fills * 'fc_src' with the new flowcap-source object; fills 'probe' with * the skProbe object that represents the flows in the file. */ static int readerGetNextValidFile(flow_proc_t *fproc) { flowcapSource_t fc_src; char *filename; char path[PATH_MAX]; const char *sensor_name; const char *probe_name; int rv; /* Pop next file off of the valid queue */ rv = skPollDirGetNextFile(pdq, path, &filename); if (rv != 0) { if (rv == -1 && skPollDirGetError(polldir) != PDERR_NONE) { CRITMSG("Polldir error ocurred."); abort(); } return -1; } logMsg((READER_TYPE_NAME " processing %s"), filename); /* open 'filename' to create a source of records */ fc_src = flowcapSourceCreateFromFile(path, &logMsg); if (fc_src == NULL) { return -1; } /* * Figure out which probe this source is associated with. * * Get the name of the sensor and of the probe from the flowcap * source. Flowcap Files V2 and above know their sensor and probe * names; version 1 files did not. */ sensor_name = flowcapSourceSensorName(fc_src); probe_name = flowcapSourceProbeName(fc_src); if (sensor_name[0] != '\0' && probe_name[0] != '\0') { /* We have sensor and probe names; get the skProbe object for * this sensor and probe. Abort if we don't recognize it. */ fproc->probe = probeConfGetProbeByName(sensor_name, probe_name); if (fproc->probe == NULL) { logMsg("Cannot get valid probe for sensor %s, probe %s, file %s", sensor_name, probe_name, flowcapSourceFileName(fc_src)); abort(); } } else { logMsg("File '%s' appears to be unsupported v1 flowcap file. Abort", flowcapSourceFileName(fc_src)); exit(EXIT_FAILURE); } fproc->flow_src = fc_src; return 0; } /* * readerGetRecord(&out_rwrec, &out_probe); * * Fill 'out_rwrec' with an rw generic record from the underlying * flowsource object. If there is no flowsource object, create one * by pulling a file off the queue. Fill 'out_probe' with the * probe where the flow was collected. * * Invoked by reader_type->get_record_fn(); */ fp_get_record_result_t readerGetRecord( rwRec *out_rwrec, const probe_def_t **out_probe, flow_proc_t *fproc) { static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; flowcapSource_t fc_src; const char *filename; fp_get_record_result_t retVal = FP_GET_ERROR; uint32_t rec_count; pthread_mutex_lock(&mutex); /* If we don't have a source, pop a file off the valid-queue * and start processing it. */ if (fproc->flow_src == NULL) { if (readerGetNextValidFile(fproc) != 0) { /* error getting file */ goto END; } } fc_src = (flowcapSource_t)fproc->flow_src; /* Assume we can get a record from the probe. */ retVal = FP_RECORD; *out_probe = fproc->probe; /* Try to get the record */ if (flowcapSourceGetGeneric(fc_src, out_rwrec)) { /* get failed: either at EOF or got an error; assume EOF. */ retVal = FP_FILE_BREAK; *out_probe = NULL; /* Print results for the file we just processed. */ filename = flowcapSourceFileName(fc_src); rec_count = flowcapSourceNumRecs(fc_src); logMsg("Processed file %s, %d records.", filename, rec_count); flowcapSourceClose(fc_src); /* Either archive the file or remove it */ if (archiveFile(filename) == 1) { /* archiving not requested */ if (unlink(filename) == -1) { logMsg("Couldn't remove %s.", filename); } } /* All done with the flow source */ flowcapSourceDestroy(fc_src); fproc->flow_src = NULL; fproc->probe = NULL; } END: pthread_mutex_unlock(&mutex); return retVal; } /* * status = fcFilesReaderInitialize(processor_properties); * * Register this flow reader's options and do other initialization. * * Invoked by reader_type->initialize_fn(); */ int fcFilesReaderInitialize(reader_type_t *reader_type) { /* Set my name */ reader_type->reader_name = READER_TYPE_NAME; /* Set function pointers */ reader_type->setup_fn = &readerSetup; reader_type->teardown_fn = NULL; reader_type->start_fn = &readerStart; reader_type->stop_fn = &readerStop; reader_type->get_record_fn = &readerGetRecord; reader_type->want_probe_fn = &readerWantProbe; return 0; /* OK */ } /* * yes_or_no = readerWantProbe(probe); * * Return a TRUE value this reader_type is able to process the data * from the 'probe'; return a FALSE value otherwise. */ static int readerWantProbe(probe_def_t UNUSED(*probe)) { /* Flowcap mode is handled specially by rwflowpack, and this * function shouldn't be called; in other modes, this function * should refuse all probes. */ return 0; } /* * status = readerSetup(processor_properties); * * Do any setup required before starting the processor. * * Invoked by reader_type->setup_fn(); */ static int readerSetup( fp_daemon_mode_t *is_daemon, const sk_vector_t *probe_vec, reader_options_t *options) { /* store the vector of probes */ g_probe_vec = probe_vec; /* pull values out of options */ incoming_directory = options->fcfiles.incoming_directory; polling_interval = options->fcfiles.polling_interval; /* set up compressor library */ if (lzo_init()) { skAppPrintErr("unable to initialize lzo library"); return -1; } *is_daemon = FP_DAEMON_ON; return 0; } #endif /* SK_ENABLE_FLOWCAP */ /* ** Local variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */