/* ** Copyright (C) 2003-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ #include "silk.h" RCSIDENT("$SiLK: flowcap.c 7800 2007-07-06 15:32:43Z mthomas $"); #include #ifdef HAVE_SYS_STATVFS_H #include #endif #include "flowcap.h" #include "utils.h" #include "fcinternals.h" #include "sksite.h" #include "skdeque.h" #include "sklog.h" #include "skdaemon.h" #include "sktransfer.h" /* TYPEDEFS AND DEFINES */ /* Size of the end-of-compression marker */ #define END_COMPRESSION_MARKER sizeof(uint32_t) /* Whether a probe's priority is high */ #define FC_PROBE_IS_HIGH_PRIORITY(probe) \ (skProbeGetPriority(probe) > 50) /* EXPORTED VARIABLES */ /* Amount of disk space which has not already been allocated to files {DISK, RAM} */ uint64_t unallocated_space[2]; /* High (RAM) and low (DISK) speed disk locations {DISK, RAM} */ char *destDirs[2] = {NULL, NULL}; /* File write timeout in seconds. A file will be closed out if this amount of time passes without getting too large. */ uint32_t writeTimeout = 60; /* ACK timout in seconds. */ uint32_t ack_timeout = 10; /* Address of the flowcap client (consumer). */ in_addr_t fc_client_addr = INADDR_ANY; /* Port upon which flowcap listens for connecting clients (consumers). */ uint16_t fc_listen_port = 0xFFFF; /* Files will be split into multiple files if they reach this size. */ uint32_t maxFileSize = 0; /* Ammount of disk space to allocate to a file before we know its actual size. */ uint64_t allocFileSize = 0; /* The version of flowcap to produce */ uint8_t flowcap_version = FC_VERSION_DEFAULT; /* The list of probes we care about */ sk_vector_t *probe_vec = NULL; /* Whether to send files at all */ int server_mode_flag = 0; #ifdef HAVE_STATVFS /* leave at least this much free space on the disk; specified by * --freespace-minimum. Gets set to DEFAULT_FREESPACE_MINIMUM */ int64_t freespace_minimum = -1; /* take no more that this amount of the disk; as a percentage. * specified by --space-maximum-percent */ double space_maximum_percent = DEFAULT_SPACE_MAXIMUM_PERCENT; #endif /* HAVE_STATVFS */ /* LOCAL VARIABLES */ /* Reader shut down flag (0 == stop) */ static volatile uint8_t reading; /* Indicator of whether flowcap is in the process of shutting down */ static volatile int shuttingDown = 0; /* Main thread id. */ static pthread_t main_thread; /* The array of write file structures, and the array's size. */ static writeFileStruct_t *write_files; static size_t num_write_files; /* LOCAL FUNCTION DECLARATIONS */ static void appTeardown(void); static skTimerRepeat_t timerHandler(void *t); static void createWriteFiles(void); static void closeWriteFiles(void); static void openWFile(writeFileStruct_t *wfPtr); static void openWFileBase(writeFileStruct_t *wfPtr); static void closeWFileBase(writeFileStruct_t *wfPtr, uint8_t open); static uint32_t dumpHeader(writeFileStruct_t* wfPtr); static fileStruct_t *getFileStruct(void); static void *reader(void *); /* FUNCTION DEFINITIONS */ /* * appTeardown() * * Teardown all modules, close all files, and tidy up all * application state. * * This function is idempotent. */ static void appTeardown(void) { static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static uint8_t teardownFlag = 0; pthread_mutex_lock(&mutex); if (teardownFlag) { pthread_mutex_unlock(&mutex); return; } teardownFlag = 1; pthread_mutex_unlock(&mutex); logMsg("Shutting down"); shuttingDown = 1; stopReader(); if (server_mode_flag) { stopSender(); } logMsg("Closing all files"); closeWriteFiles(); if (server_mode_flag) { logMsg("Unloading files"); unloadFiles(); destroyQueues(); } probeConfTeardown(); skdaemonTeardown(); skAppUnregister(); } /* * timerHandler: * Timer fired for a file. Ship it. * IMPORTANT: We must set the struct's timerHandle to NULL since, * otherwise, closeWFile() will free it and then the selector * lib will free it again. */ static skTimerRepeat_t timerHandler(void *t) { register writeFileStruct_t *wfPtr = (writeFileStruct_t*)t; if (shuttingDown) { return SK_TIMER_END; } /* Set the close flag first. */ wfPtr->close = 1; /* Timer handler stuff */ logMsg("Timer fired for %s", wfPtr->fPtr->fName); wfPtr->timerHandle = NULL; /* Close the file, and open a new one. */ closeWFile(wfPtr, 1); return SK_TIMER_END; } /* * createWriteFiles: * Creates the write file structures */ static void createWriteFiles(void) { probe_def_t *probe; size_t i; num_write_files = skVectorGetCount(probe_vec); write_files = (writeFileStruct_t*)calloc(num_write_files, sizeof(writeFileStruct_t)); if (write_files == NULL) { logMsg("Cannot create write_files: out of memory"); exit(EXIT_FAILURE); } for (i = 0; i < num_write_files; i++) { skVectorGetValue(&probe, probe_vec, i); /* Fill in the probe */ write_files[i].probe = probe; write_files[i].type = FT_FLOWCAP; /* The rest of the initialization */ write_files[i].version = flowcap_version; write_files[i].HWM = maxFileSize; write_files[i].lzofile = lzo_create_compr_buffer(); pthread_mutex_init(&write_files[i].mutex, NULL); /* Create the file */ openWFile(&write_files[i]); } } /* * closeWriteFiles(void); * * Close all write files. */ static void closeWriteFiles(void) { size_t i; /* close all files */ for (i = 0; i < num_write_files; i++) { write_files[i].close = 1; closeWFile(&write_files[i], 0); } } static void openWFile(writeFileStruct_t *wfPtr) { LOGDEBUG(("Locking in openWfile.")); pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&wfPtr->mutex); pthread_mutex_lock(&wfPtr->mutex); LOGDEBUG(("Locked in openWfile.")); openWFileBase(wfPtr); LOGDEBUG(("Unlocking in openWfile.")); pthread_cleanup_pop(1); LOGDEBUG(("Unlocked in openWfile.")); } /* * openWFileBase * Open the next available file as a write file. * Take care of file naming, cycling the global file seqnum * Open file with gzopen. * Stop the reader if no files are available. * If we are actively reading, install a timeout. * Write the appropriate header depending on wfPtr->type. */ static void openWFileBase(writeFileStruct_t *wfPtr) { char buffer[PATH_MAX]; char dotname[PATH_MAX]; char dotbuffer[PATH_MAX]; char ts[FC_TIMESTAMP_MAX + 1]; /* timestamp */ time_t ct; struct tm ut; int fd; logMsg("Opening new file."); /* Get an empty fptr */ if (server_mode_flag) { wfPtr->fPtr = getFileStructServer(); } else { wfPtr->fPtr = getFileStruct(); } assert(flowcap_version >= FC_VERSION_MIN); assert(flowcap_version <= FC_VERSIONS); assert(FC_REC_SIZE(flowcap_version) != -1); /* Create a timestamp */ ct = time(NULL); gmtime_r(&ct, &ut); strftime(ts, sizeof(ts), "%Y%m%d%H%M%S", &ut); /* Create a filename from the timestamp and probe */ if (snprintf(wfPtr->fPtr->fName, sizeof(wfPtr->fPtr->fName), "%s_%s_%s.XXXXXX", ts, skProbeGetSensorName(wfPtr->probe), skProbeGetName(wfPtr->probe)) >= (int)sizeof(wfPtr->fPtr->fName)) { logMsg("Filename exceeded transfer filename size."); exit(EXIT_FAILURE); } /* Add in the directory */ if (snprintf(buffer, sizeof(buffer), "%s/%s", destDirs[wfPtr->fPtr->isRamDisk], wfPtr->fPtr->fName) >= (int)sizeof(buffer)) { logMsg("Filename exceeded buffer size."); exit(EXIT_FAILURE); } /* Open the file; making sure it is unique */ fd = mkstemp(buffer); if (fd == -1) { logMsg("Write file creation (%s) error: %s", buffer, strerror(errno)); exit(EXIT_FAILURE); } /* Set the permissions */ fchmod(fd, 0644); if (!server_mode_flag) { close(fd); /* Get the changed filename */ baseName_r(dotname, buffer, sizeof(dotname)); /* Add a dot */ if (snprintf(dotbuffer, sizeof(dotbuffer), "%s/.%s", destDirs[wfPtr->fPtr->isRamDisk], dotname) >= (int)sizeof(buffer)) { logMsg("Filename exceeded buffer size."); exit(EXIT_FAILURE); } fd = open(dotbuffer, O_WRONLY | O_CREAT | O_EXCL, 0644); if (fd == -1) { logMsg("Write file creation (%s) error: %s", dotbuffer, strerror(errno)); unlink(buffer); exit(EXIT_FAILURE); } } /* Put the uniquified dot name back in the fptr */ baseName_r(wfPtr->fPtr->fName, buffer, sizeof(wfPtr->fPtr->fName)); DEBUGMSG("File path: %s", buffer); DEBUGMSG("File name: %s", wfPtr->fPtr->fName); /* Get a FILE * handle to the file */ if ((wfPtr->gzFILE = fdopen(fd, "w")) == NULL) { logMsg("Write file open (%s) error: %s", buffer, strerror(errno)); exit(EXIT_FAILURE); } LOGDEBUG(("%s opened for write", server_mode_flag ? buffer : dotbuffer)); /* write the header, and retrieve the size of the header */ wfPtr->fPtr->size = wfPtr->fPtr->uncompressSize = wfPtr->headerSize = dumpHeader(wfPtr); /* Bind the compression buffer to the file */ lzo_bind_compr_buffer(wfPtr->lzofile, wfPtr->gzFILE); /* Set up default values */ wfPtr->fPtr->startTime = time((time_t *)NULL); wfPtr->fPtr->sendingTries = 0; wfPtr->fPtr->endTime = wfPtr->fPtr->sendStartTime = wfPtr->fPtr->sentTime = 0; wfPtr->records = 0; wfPtr->bufLen = 0; wfPtr->closing = 0; wfPtr->close = 0; /* set the timer to writeTimeout + some random # of secs <= 10 */ skTimerCreate(&wfPtr->timerHandle, writeTimeout, timerHandler, (void *) wfPtr); logMsg("Opened new file %s", wfPtr->fPtr->fName); return; } void closeWFile(writeFileStruct_t *wfPtr, uint8_t open) { static pthread_mutex_t close_lock = PTHREAD_MUTEX_INITIALIZER; uint8_t quit = 0; /* Ah, the perils of threads. wfPtr->closing keeps us from double-closing a wfPtr. wfPtr->close makes sure we don't honor a request to close a wfPtr that has been closed and reopened since the request. */ pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&close_lock); pthread_mutex_lock(&close_lock); if (wfPtr->closing || !wfPtr->close) { quit = 1; } else { wfPtr->closing = 1; } pthread_cleanup_pop(1); /* unlock close_lock */ if (quit) { logMsg("Avoiding duplicate call to closeWFile."); return; } LOGDEBUG(("Locking in closeWFile.")); pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&wfPtr->mutex); pthread_mutex_lock(&wfPtr->mutex); LOGDEBUG(("Locked in closeWFile.")); closeWFileBase(wfPtr, open); LOGDEBUG(("Unlocking in closeWFile.")); pthread_cleanup_pop(1); /* unlock wfPtr->mutex */ LOGDEBUG(("Unlocked in closeWFile.")); } /* * closeWFileBase: * Close a writer file. */ static void closeWFileBase(writeFileStruct_t *wfPtr, uint8_t open) { char buffer[PATH_MAX]; off_t size; double change; logMsg("Closing %s", wfPtr->fPtr->fName); if(wfPtr->timerHandle) { skTimerDestroy(wfPtr->timerHandle); wfPtr->timerHandle = NULL; } if ((size = lzo_flush(wfPtr->lzofile)) == -1) { logMsg("Error flushing file."); logMsg(lzo_compr_strerror(wfPtr->lzofile)); exit(EXIT_FAILURE); } wfPtr->fPtr->size += size; if (server_mode_flag) { broadcastFreeSpace(wfPtr, size); } wfPtr->fPtr->endTime = time((time_t *)NULL); if (wfPtr->fPtr->uncompressSize == 0) { change = 0.0; } else { change = (100.0 * ((double)wfPtr->fPtr->uncompressSize - (double)wfPtr->fPtr->size) / (double)wfPtr->fPtr->uncompressSize); } logMsg(("Closing file %s: %u seconds, %u records, " "%" PRIu64 " bytes, %4.1f%% compression"), wfPtr->fPtr->fName, wfPtr->fPtr->endTime - wfPtr->fPtr->startTime, wfPtr->records, wfPtr->fPtr->size, change); /* Close the file */ if (fclose(wfPtr->gzFILE) == EOF) { logMsg("fclose error of %s/%s: %s", destDirs[wfPtr->fPtr->isRamDisk], wfPtr->fPtr->fName, strerror(errno)); exit(EXIT_FAILURE); } sprintf(buffer, "%s/%s", destDirs[wfPtr->fPtr->isRamDisk], wfPtr->fPtr->fName); if (!server_mode_flag) { char dotbuffer[PATH_MAX]; int rv; sprintf(dotbuffer, "%s/.%s", destDirs[wfPtr->fPtr->isRamDisk], wfPtr->fPtr->fName); if (wfPtr->fPtr->size <= wfPtr->headerSize + END_COMPRESSION_MARKER) { unlink(dotbuffer); } else { rv = rename(dotbuffer, buffer); if (rv != 0) { CRITMSG("Failed to replace %s with %s", buffer, dotbuffer); exit(EXIT_FAILURE); } } } wfPtr->gzFILE = NULL; if (wfPtr->fPtr->size <= wfPtr->headerSize + END_COMPRESSION_MARKER) { logMsg("Removing empty file %s", wfPtr->fPtr->fName); DEBUGMSG("%s:%d Unlinking %s", __FILE__, __LINE__, buffer); unlink(buffer); if (server_mode_flag) { addToFreeQueue(wfPtr->fPtr); } else { free(wfPtr->fPtr); } } else if (!server_mode_flag) { free(wfPtr->fPtr); } else if (FC_PROBE_IS_HIGH_PRIORITY(wfPtr->probe)) { addToHighPriorityQueue(wfPtr->fPtr); } else { addToLowPriorityQueue(wfPtr->fPtr); } logMsg("Finished closing %s", wfPtr->fPtr->fName); if (open) { openWFileBase(wfPtr); } return; } /* * dumpHeader: * Write the header to the file. All info required is in wfPtr */ static uint32_t dumpHeader(writeFileStruct_t *wfPtr) { fcHeader_V2_t hdr; assert(FC_REC_SIZE(wfPtr->version) != -1); memset(&hdr, 0, sizeof(hdr)); PREPHEADER(&hdr.gHdr); hdr.gHdr.isBigEndian = 1; hdr.gHdr.version = wfPtr->version; hdr.gHdr.type = wfPtr->type; hdr.gHdr.compMethod = SK_COMPMETHOD_LZO1X; strcpy(hdr.sensorID, skProbeGetSensorName(wfPtr->probe)); strcpy(hdr.probeID, skProbeGetName(wfPtr->probe)); fwrite(&hdr, sizeof(hdr), 1, wfPtr->gzFILE); return sizeof(hdr); } /* * getFileStruct: Get a free file structure for local mode */ static fileStruct_t *getFileStruct(void) { #ifdef HAVE_STATVFS struct statvfs vfs; int64_t free, total, newfree; int rv; double percent_used; rv = statvfs(destDirs[FC_DISK], &vfs); if (rv != 0) { CRITMSG("Could not statvfs %s", destDirs[FC_DISK]); exit(EXIT_FAILURE); } /* free bytes is fundamental block size multiplied by the * available (non-privileged) blocks. */ free = ((int64_t)vfs.f_frsize * (int64_t)vfs.f_bavail); /* to compute the total (non-privileged) blocks, subtract the * available blocks from the free (privileged) blocks to get * the count of privileged-only blocks, subtract that from the * total blocks, and multiply the result by the block size. */ total = (((int64_t)vfs.f_blocks - ((int64_t)vfs.f_bfree - (int64_t)vfs.f_bavail)) * (int64_t)vfs.f_frsize); newfree = free - allocFileSize * num_write_files; percent_used = ((double)(total - newfree) / ((double)total / 100.0)); if (newfree < freespace_minimum) { CRITMSG(("Free disk space limit overrun: " "free=%" PRId64 " < min=%" PRId64 " (used %.4f%%"), newfree, freespace_minimum, percent_used); /* TODO: Create a wait routine instead of exiting? */ abort(); } if (percent_used > space_maximum_percent) { CRITMSG(("Free disk space limit overrun: " "used=%.4f%% > max=%.4f%% (free %" PRId64 " bytes)"), percent_used, space_maximum_percent, newfree); /* TODO: Create a wait routine instead of exiting? */ abort(); } #endif /* HAVE_STATVFS */ return calloc(1, sizeof(fileStruct_t)); } /* * reader: * Reads records from a wfPtr source. * Packs and write all real records according to type. * If full (i.e., past the high water mark), close file and put * into send queue at tail. * Open a new file. */ static void *reader(void *vwfPtr) { sigset_t sigs; writeFileStruct_t *wfPtr = (writeFileStruct_t*)vwfPtr; volatile pdu_source_breakable_t rv = PDUS_BREAKABLE; probe_enum_t type; volatile ssize_t rec_size; assert(wfPtr); /* Turn off all signals except for SIGPIPE. */ sigfillset(&sigs); sigdelset(&sigs, SIGPIPE); pthread_sigmask(SIG_SETMASK, &sigs, NULL); /* Only handle SIGPIPE */ logMsg("%s packer for %s started.", skProbeGetName(wfPtr->probe), skProbeGetSensorName(wfPtr->probe)); type = skProbeGetType(wfPtr->probe); rec_size = FC_REC_SIZE(wfPtr->version); /* Infloop */ while (reading) { uint8_t rec[FC_MAX_SIZE]; flowcapRec_t *fcr = (flowcapRec_t *)rec; /* Get the next flowcap record */ switch (type) { case PROBE_ENUM_NETFLOW: rv = pduSourceGetFlowcap(wfPtr->source.pdu, fcr, wfPtr->version); if (rv == PDUS_ABORT) { goto endloop; } break; #ifdef HAVE_FIXBUF_PUBLIC_H case PROBE_ENUM_IPFIX: { int iv = ipfixSourceGetFlowcap(wfPtr->source.ipfix, fcr, wfPtr->version); if (iv == -1) { goto endloop; } } break; #endif default: assert(0); abort(); } pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&wfPtr->mutex); pthread_mutex_lock(&wfPtr->mutex); /* Write the record to the file */ if (lzo_write(wfPtr->lzofile, rec, rec_size) != rec_size) { logMsg("Failed compressed write."); logMsg(lzo_compr_strerror(wfPtr->lzofile)); exit(EXIT_FAILURE); } wfPtr->fPtr->uncompressSize += rec_size; wfPtr->records++; /* Check to see if we have reached the size limit */ if ((rv == PDUS_BREAKABLE) && (lzo_compr_size_upper_bound(wfPtr->lzofile) >= wfPtr->HWM)) { wfPtr->close = 1; } pthread_cleanup_pop(1); /* unlock wfPtr->mutex */ /* Close and acquire new file if necessary */ if (wfPtr->close) { /* Close and ship the file */ closeWFile(wfPtr, 1); } } endloop: logMsg("%s packer for %s ended.", skProbeGetName(wfPtr->probe), skProbeGetSensorName(wfPtr->probe)); /* End thread */ return NULL; } /* * startReader: * Setup for reading from the netflow IP + UDP port socket. * Inputs: None * Outputs: 1 if failure. 0 if ok */ int startReader(void) { size_t i; pduSourcePool_t pdupool; pdupool = pduSourcePoolCreate(); if (pdupool == NULL) { return 1; } reading = 1; for (i = 0 ; i < num_write_files; i++) { switch (skProbeGetType(write_files[i].probe)) { case PROBE_ENUM_NETFLOW: { write_files[i].source.pdu = pduSourceCreateFromProbeDef(pdupool, write_files[i].probe, 32768, 8388608 / num_write_files, logMsg); if (write_files[i].source.pdu == NULL) { return 1; } } break; #ifdef HAVE_FIXBUF_PUBLIC_H case PROBE_ENUM_IPFIX: { in_addr_t addr; uint16_t port; probe_proto_t proto; int rv; rv = skProbeGetListenAsHost(&addr, &port, write_files[i].probe); if (rv != 0) { return 1; } proto = skProbeGetProtocol(write_files[i].probe); write_files[i].source.ipfix = ipfixSourceCreate(port, addr, proto, 8388608 / num_write_files, logMsg); if (write_files[i].source.ipfix == NULL) { return 1; } } break; #endif default: logMsg("Unsupported probe type %d", (int)skProbeGetType(write_files[i].probe)); assert(0); abort(); } /* switch () */ write_files[i].valid_source = 1; if (pthread_create(&write_files[i].reader_thread, NULL, reader, (void *)&write_files[i])) { return 1; } write_files[i].running = 1; } /* for (i) */ pduSourcePoolDestroy(pdupool); return 0; } /* Stop the reader */ void stopReader(void) { size_t i; if (!reading) { return; } logMsg("Stopping readers."); if (server_mode_flag) { dumpQueueLengths(); } reading = 0; for (i = 0; i < num_write_files; i++) { if (write_files[i].valid_source) { switch (skProbeGetType(write_files[i].probe)) { case PROBE_ENUM_NETFLOW: pduSourceDestroy(write_files[i].source.pdu); break; #ifdef HAVE_FIXBUF_PUBLIC_H case PROBE_ENUM_IPFIX: ipfixSourceDestroy(write_files[i].source.ipfix); break; #endif default: assert(0); } write_files[i].valid_source = 0; } if (write_files[i].running) { pthread_join(write_files[i].reader_thread, NULL); write_files[i].running = 0; } } return; } /* Program entry point. */ int main(int argc, char **argv) { appSetup(argc, argv); /* never returns on failure */ sklogOpen(); if (server_mode_flag) { /* Initialize the queues */ initQueues(); } /* start the logger and become a daemon */ #ifdef DEBUG skdaemonDontFork(); #endif if (skdaemonize(&shuttingDown, &appTeardown) == -1) { exit(EXIT_FAILURE); } if (server_mode_flag) { /* Populate the queues */ loadFiles(); } /* Create the write files */ createWriteFiles(); /* Store the main thread ID */ main_thread = pthread_self(); if (server_mode_flag) { /* Start the sender thread */ if (startSender()) { logMsg("Failed to start sender. Exiting."); skAppPrintErr("Failed to start sender. Exiting."); exit(EXIT_FAILURE); } } /* Start the reader threads */ if (startReader()) { logMsg("Failed to start reader. Exiting."); skAppPrintErr("Failed to start reader. Exiting."); exit(EXIT_FAILURE); } /* We now run forever, excepting signals */ while (!shuttingDown) { pause(); } appTeardown(); exit(EXIT_SUCCESS); /* Make sure we exit even if we have to tear down threads. */ return 0; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */