/* ** Copyright (C) 2004-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** A "plug-in" (currently compile-time only) for rwflowpack to ** process FloCap records from the network. ** */ #include "silk.h" RCSIDENT("$SiLK: fcreader.c 7165 2007-05-15 16:13:04Z mthomas $"); #include "sksite.h" #include "rwflowpack.h" #if SK_ENABLE_FLOWCAP #include "sktransfer.h" #include "flowcap-source.h" #include "convert-flowcap.h" #include "lzo-file.h" /* MACROS and DATA TYPES */ /* The number of flowcaps that can be connected to simultaneously. */ #define FLOWCAP_MAX 25 /* A name for this reader type. */ #define READER_TYPE_NAME "FlowCap Reader" #define MINBAUD 300 /* Minimum transfer rate (bps) */ typedef void (*cleanupHandler)(void *); typedef struct flowcap_info_t { in_addr_t address; /* Address of machine creating the flowcap * files that rwflowpack will connect to. */ int port; /* Port on which to contact the flowcap machine. */ skTransfer_t fcClient; /* Receiver transfer object */ char *name; /* Textual representation of address */ } flowcap_info_t; /* PRIVATE FUNCTIONS */ static int readerSetup( fp_daemon_mode_t *is_daemon, const sk_vector_t *probe_vec, reader_options_t *options); static int readerStart(flow_proc_t *fproc); static int readerStop(flow_proc_t *fproc); static fp_get_record_result_t readerGetRecord( rwRec *out_rwrec, const probe_def_t **out_probe, flow_proc_t *fproc); static int readerWantProbe(probe_def_t *probe); /* PRIVATE VARIABLES */ /* Valid file queue */ static skDeque_t vq = NULL; /* The flowcap address string */ static char *flowcap_addresses = NULL; /* The flowcap default port */ static int flowcap_default_port = -1; /* The temporary directory to use while receiving a flowcap file. * Files in this directory are incomplete. */ static const char *work_directory = NULL; /* The directory where received flowcap files (ACK was successful) are * kept until processed by rwflowpack */ static const char *valid_directory = NULL; /* Vector of probes to operate on; needed for Flowcap V1 */ static const sk_vector_t *g_probe_vec = NULL; /* Information for each flowcap we are connecting to. */ static flowcap_info_t flowcap_info[FLOWCAP_MAX]; /* Number of flowcaps we are connecting to. */ static int flowcap_num = 0; /* Thread-local key for storing the name of the last file */ static pthread_key_t last_file_key; /* FUNCTION DEFINITIONS */ /* * loadValidQueue(); * * Put the names of the files that are in the valid directory onto * the valid queue. Will exit the application if the * valid_directory cannot be read, or if files cannot be added to * the queue. */ static void loadValidQueue(void) { struct dirent *entry; DIR *dir; char buffer[PATH_MAX]; char *name; dir = opendir(valid_directory); if (dir == NULL) { skAppPrintErr("Couldn't open %s for queueing", valid_directory); exit(EXIT_FAILURE); } while ((entry = readdir(dir))) { if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { continue; } /* Add name to queue */ snprintf(buffer, sizeof(buffer), "%s/%s", valid_directory, entry->d_name); /* skAppPrintErr("Adding new file %s to the queue", buffer); */ name = strdup(buffer); if (name == NULL) { skAppPrintErr("Memory failure"); exit(EXIT_FAILURE); } if (skDequePushFront(vq, name) != SKDQ_SUCCESS) { skAppPrintErr("Failed to add '%s' to file queue.", buffer); exit(EXIT_FAILURE); } } closedir(dir); } /* * receiver_client_callback(file, item, status, transfer); * * The sktransfer library will invoke this function once 'file' has * been received (or receiving failed): 'status' gives the status * of the transfer. * * If the file was successfully received, this function will move * the file from the 'work_directory' to the 'valid_directory' and * add the file to the global valid queue, 'vq'. */ static void receiver_client_callback( char *file, void UNUSED(*item), sktErr_t status, skTransfer_t UNUSED(transfer)) { if (status == SK_TRANSFER_ESUCCESS) { char newname[PATH_MAX]; char *baseloc; char *oldname; char *name; strncpy(newname, valid_directory, sizeof(newname)); baseloc = memchr(newname, '\0', sizeof(newname)); *baseloc++ = '/'; baseName_r(baseloc, file, (sizeof(newname) - (baseloc - newname))); if (rename(file, newname) == -1) { logMsg("Error occured renaming %s to %s.", file, newname); } /* Check to see if the file is a duplicate of the last file. */ oldname = (char *)pthread_getspecific(last_file_key); if (oldname != NULL && strcmp(oldname, newname) == 0) { NOTICEMSG("Discarding duplicate flowcap file: %s", newname); unlink(newname); return; } if (oldname != NULL) { free(oldname); } name = strdup(newname); if (name == NULL) { logMsg("Memory failure"); exit(EXIT_FAILURE); } pthread_setspecific(last_file_key, name); name = strdup(newname); if (name == NULL) { logMsg("Memory failure"); exit(EXIT_FAILURE); } /* Add the new filename to the queue */ skDequePushFront(vq, name); } } /* * readerStart(); * * Create a receiver client to download flowcap files from the * flowcap server into the work_directory. * * Invoked by reader_type->start_fn(); */ static int readerStart(flow_proc_t UNUSED(*fproc)) { sktErr_t err; int i; skTransfer_t fcClient; for (i = 0; i < flowcap_num; i++) { if (flowcap_info[i].fcClient != NULL) { continue; } /* Create client and set its properties */ err = skCreateReceiverClient(&fcClient, flowcap_info[i].port, flowcap_info[i].address, work_directory); if (err == SK_TRANSFER_ESUCCESS) { err = skTransferSetAckTimeout(fcClient, 5); } if (err == SK_TRANSFER_ESUCCESS) { err = skTransferSetMinimumBitrate(fcClient, MINBAUD); } if (err == SK_TRANSFER_ESUCCESS) { err = skTransferSetCallback(fcClient, receiver_client_callback); } if (err == SK_TRANSFER_ESUCCESS) { err = skTransferSetLogFn(fcClient, logMsg); } if (err == SK_TRANSFER_ESUCCESS) { err = skTransferStart(fcClient); } /* If something didn't work; destroy the transfer object */ if (err != SK_TRANSFER_ESUCCESS) { if (fcClient) { skDestroyTransfer(fcClient); } PRINT_AND_LOG(("Failed to create receiver client for %s.", flowcap_info[i].name)); return -1; } flowcap_info[i].fcClient = fcClient; } return 0; } /* * pduReaderStop(); * * Destroy the receiver. * * Invoked by reader_type->stop_fn(); */ static int readerStop(flow_proc_t UNUSED(*fproc)) { int i; uint8_t retval = 0; for (i = 0; i < flowcap_num; i++) { sktErr_t err = SK_TRANSFER_ESUCCESS; /* Destroy the receiver client */ logMsg("Shutting down connection to %s", flowcap_info[i].name); skAppPrintErr("Shutting down connection to %s", flowcap_info[i].name); if (flowcap_info[i].fcClient) { skTransferStop(flowcap_info[i].fcClient, 5); err = skDestroyTransfer(flowcap_info[i].fcClient); flowcap_info[i].fcClient = NULL; } if (err != SK_TRANSFER_ESUCCESS) { logMsg("Failed to destroy receiver client for %s.", flowcap_info[i].name); skAppPrintErr("Failed to destroy receiver client for %s.", flowcap_info[i].name); retval = -1; } else { logMsg("Connection to %s has been shut down", flowcap_info[i].name); skAppPrintErr("Connection to %s has been shut down", flowcap_info[i].name); } } /* Unblock the queue, if necessary */ skDequeUnblock(vq); return retval; } /* * readerGetNextValidFile(&fc_src, &probe); * * Pull the next file name off of the valid-queue and create a * flowsource object to read the flowcap records in it. Fills * 'fc_src' with the new flowcap-source object; fills 'probe' with * the skProbe object that represents the flows in the file. */ static int readerGetNextValidFile(flow_proc_t *fproc) { flowcapSource_t fc_src; skDQErr_t dqerr; char *filename; const char *sensor_name; const char *probe_name; /* Pop next file off of the valid queue */ dqerr = skDequePopBack(vq, (void *)&filename); if (dqerr != SKDQ_SUCCESS) { if (dqerr == SKDQ_ERROR) { abort(); } return -1; } logMsg((READER_TYPE_NAME " processing %s"), filename); /* open 'filename' to create a source of records */ fc_src = flowcapSourceCreateFromFile(filename, &logMsg); free(filename); if (fc_src == NULL) { return -1; } /* * Figure out which probe this source is associated with. * * Get the name of the sensor and of the probe from the flowcap * source. Flowcap Files V2 and above know their sensor and probe * names; version 1 files did not. */ sensor_name = flowcapSourceSensorName(fc_src); probe_name = flowcapSourceProbeName(fc_src); if (sensor_name[0] != '\0' && probe_name[0] != '\0') { /* We have sensor and probe names; get the skProbe object for * this sensor and probe. Abort if we don't recognize it. */ fproc->probe = probeConfGetProbeByName(sensor_name, probe_name); if (fproc->probe == NULL) { logMsg("Cannot get valid probe for sensor %s, probe %s, file %s", sensor_name, probe_name, flowcapSourceFileName(fc_src)); abort(); } } else { logMsg("File '%s' appears to be unsupported v1 flowcap file. Abort", flowcapSourceFileName(fc_src)); exit(EXIT_FAILURE); } fproc->flow_src = fc_src; return 0; } /* * readerGetRecord(&out_rwrec, &out_probe); * * Fill 'out_rwrec' with an rw generic record from the underlying * flowsource object. If there is no flowsource object, create one * by pulling a file off the queue. Fill 'out_probe' with the * probe where the flow was collected. * * Invoked by reader_type->get_record_fn(); */ fp_get_record_result_t readerGetRecord( rwRec *out_rwrec, const probe_def_t **out_probe, flow_proc_t *fproc) { static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; flowcapSource_t fc_src; const char *filename; fp_get_record_result_t retVal = FP_GET_ERROR; uint32_t rec_count; pthread_mutex_lock(&mutex); /* If we don't have a source, pop a file off the valid-queue * and start processing it. */ if (fproc->flow_src == NULL) { if (readerGetNextValidFile(fproc) != 0) { /* error getting file */ goto END; } } fc_src = (flowcapSource_t)fproc->flow_src; /* Assume we can get a record from the probe. */ retVal = FP_RECORD; *out_probe = fproc->probe; /* Try to get the record */ if (flowcapSourceGetGeneric(fc_src, out_rwrec)) { /* get failed: either at EOF or got an error; assume EOF. */ retVal = FP_FILE_BREAK; *out_probe = NULL; /* Print results for the file we just processed. */ filename = flowcapSourceFileName(fc_src); rec_count = flowcapSourceNumRecs(fc_src); logMsg("Processed file %s, %d records.", filename, rec_count); flowcapSourceClose(fc_src); /* Either archive the file or remove it */ if (archiveFile(filename) == 1) { /* archiving not requested */ if (unlink(filename) == -1) { logMsg("Couldn't remove %s.", filename); } } /* All done with the flow source */ flowcapSourceDestroy(fc_src); fproc->flow_src = NULL; fproc->probe = NULL; } END: pthread_mutex_unlock(&mutex); return retVal; } /* * status = fcReaderInitialize(processor_properties); * * Register this flow reader's options and do other initialization. * * Invoked by reader_type->initialize_fn(); */ int fcReaderInitialize(reader_type_t *reader_type) { int rv; /* Create the last_file key */ rv = pthread_key_create(&last_file_key, free); if (rv != 0) { return -1; } /* Set my name */ reader_type->reader_name = READER_TYPE_NAME; /* Set function pointers */ reader_type->setup_fn = &readerSetup; reader_type->teardown_fn = NULL; reader_type->start_fn = &readerStart; reader_type->stop_fn = &readerStop; reader_type->get_record_fn = &readerGetRecord; reader_type->want_probe_fn = &readerWantProbe; return 0; /* OK */ } /* * yes_or_no = readerWantProbe(probe); * * Return a TRUE value this reader_type is able to process the data * from the 'probe'; return a FALSE value otherwise. */ static int readerWantProbe(probe_def_t UNUSED(*probe)) { /* Flowcap mode is handled specially by rwflowpack, and this * function shouldn't be called; in other modes, this function * should refuse all probes. */ return 0; } /* Parse a list of hostname[:port][,hostname[:port]]* items as flowcap address/port pairs. */ static int parseFlowcapAddress(char *addr) { struct hostent *he; char *next, *port; /* Loop around addresses */ do { /* next points to the border for the next address specification */ next = strchr(addr, ','); if (next) { /* Change the comma to a null so it looks like we're only parsing one address specification */ *next = '\0'; } /* Look for a port specification in the current address */ port = strchr(addr, ':'); if (port) { /* Change the colon to a null. This makes addr hold just the host name. */ *port = '\0'; } /* Parse the host name into an address */ if (((he = gethostbyname(addr)) == NULL) || he->h_length != sizeof (in_addr_t)) { skAppPrintErr("illegal address %s", addr); return -1; } /* Parse the port number. Set to default port if no specification. */ if (port && *(port + 1)) { flowcap_info[flowcap_num].port = atoi(port + 1); } else if (flowcap_default_port != 0) { flowcap_info[flowcap_num].port = flowcap_default_port; } else { skAppPrintErr("The address %s requires a port specification.", addr); return -1; } /* If there is a valid port specification, put the colon back in so the address name will contain the port number. */ if (port && flowcap_info[flowcap_num].port != -1) { *port = ':'; } /* Save the address name. */ flowcap_info[flowcap_num].name = strdup(addr); if (flowcap_info[flowcap_num].name == NULL) { skAppPrintErr("Memory failure"); exit(EXIT_FAILURE); } /* Save the host address. */ memcpy(&flowcap_info[flowcap_num].address, he->h_addr, sizeof (in_addr_t)); if (next) { /* Find the beginning of the next address */ next++; while (isspace((int)*next)) { next++; } addr = next; } flowcap_info[flowcap_num].fcClient = NULL; /* Increment the number of addresses. */ flowcap_num++; } while (next && (flowcap_num < FLOWCAP_MAX)); if (next) { skAppPrintErr("Too many flowcap addresses (max %d)", FLOWCAP_MAX); return -1; } return 0; } /* * status = readerSetup(processor_properties); * * Do any setup required before starting the processor. * * Invoked by reader_type->setup_fn(); */ static int readerSetup( fp_daemon_mode_t *is_daemon, const sk_vector_t *probe_vec, reader_options_t *options) { /* store the vector of probes */ g_probe_vec = probe_vec; /* pull values out of options */ flowcap_addresses = strdup(options->flowcap.flowcap_address_string); flowcap_default_port = options->flowcap.flowcap_default_port; work_directory = options->flowcap.work_directory; valid_directory = options->flowcap.valid_directory; if (flowcap_addresses == NULL) { skAppPrintErr("Memory failure"); return -1; } if (parseFlowcapAddress(flowcap_addresses)) { return -1; } if (flowcap_num < 1) { skAppPrintErr("Must specify the address of at least one flowcap."); return -1; } /* set up compressor library */ if (lzo_init()) { skAppPrintErr("unable to initialize lzo library"); return -1; } /* Set up valid file queue */ vq = skDequeCreate(); if (vq == NULL) { skAppPrintErr("Failed to create file queue."); return -1; } /* Populate the initial file queue */ loadValidQueue(); *is_daemon = FP_DAEMON_ON; return 0; } #endif /* SK_ENABLE_FLOWCAP */ /* ** Local variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */