/* ** Copyright (C) 2003-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ #include "silk.h" RCSIDENT("$SiLK: flowcapserver.c 7167 2007-05-15 16:23:28Z mthomas $"); #include "flowcap.h" #include "utils.h" #include "fcinternals.h" #include "sksite.h" #include "skdeque.h" #include "sklog.h" #include "skdaemon.h" #include "sktransfer.h" #include /* TYPEDEFS AND DEFINES */ /* It seems that solaris does not define SUN_LEN (yet) */ #ifndef SUN_LEN #define SUN_LEN(su) \ (sizeof(*(su)) - sizeof((su)->sun_path) + strlen((su)->sun_path)) #endif /* LOCAL VARIABLES */ /* Send queue */ static skDeque_t sendQ; /* Send queues split into priorites*/ static skDeque_t lowQ, highQ; /* Queue in which no longer needed fileStructs can be recycled. */ static skDeque_t freeQ; /* File which is currently being sent */ static fileStruct_t *curSendFPtr; /* Whether the interface is currently up. (Boolean) */ static uint8_t interfaceup = 1; /* The file sender. */ static skTransfer_t sender; /* Mutex and condition variable to protect the unallocated_space variable. This allows threads to wait (on spaceCond) until there is enough unallocated space. */ static pthread_mutex_t spaceMutex; static pthread_cond_t spaceCond; /* LOCAL FUNCTION DECLARATIONS */ static void shutdownInterface(void *); static void restartInterface(void *); static void dumpStats(fileStruct_t *fPtr); /* FUNCTION DEFINITIONS */ /* * loadFiles: * Load files from real disk into their data structures renaming * as we go along. Then enqueue them for sending. * Finally, grab files from the free queue to open writers. */ void loadFiles(void) { fileStruct_t *fPtr; uint32_t i; char newName[PATH_MAX]; char *fileName; glob_t files; int rv; /* Empty the ramdir */ if (destDirs[FC_RAM]) { sprintf(newName, "%s/*", destDirs[FC_RAM]); if ((rv = glob(newName, GLOB_ERR, NULL, &files)) != 0 #if HAVE_DECL_GLOB_NOMATCH && rv != GLOB_NOMATCH #endif ) { skAppPrintErr("Error doing globbing on %s", newName); perror(NULL); exit(EXIT_FAILURE); } #if HAVE_DECL_GLOB_NOMATCH if (rv == GLOB_NOMATCH) { files.gl_pathc = 0; } #endif for (i = 0; i < (uint32_t)files.gl_pathc; i++) { fileName = files.gl_pathv[i]; if (fileName[0] == '.') { continue; } DEBUGMSG("%s:%d Unlinking %s", __FILE__, __LINE__, fileName); unlink(fileName); } globfree(&files); } /* Get a list of current files from the disk dir*/ sprintf(newName, "%s/*", destDirs[FC_DISK]); if ((rv = glob(newName, GLOB_ERR, NULL, &files)) != 0 #if HAVE_DECL_GLOB_NOMATCH && rv != GLOB_NOMATCH #endif ) { skAppPrintErr("Error doing globbing on %s", newName); perror(NULL); exit(EXIT_FAILURE); } #if HAVE_DECL_GLOB_NOMATCH if (rv == GLOB_NOMATCH) { files.gl_pathc = 0; } #endif /* Add these files to the send queues */ for (i = 0; i < (uint32_t)files.gl_pathc; i++) { const probe_def_t *found = NULL; const char *sensor_start, *sensor_end; const char *probe_start, *probe_end; const char *name_start; fileName = files.gl_pathv[i]; /* Skip directory special files */ if (fileName[0] == '.') { continue; } /* If the file is empty (header only or smaller), delete. The extra four bytes are for the uint32_t zero which is added for an empty compressed block. */ if (fileSize(fileName) <= (off_t)(sizeof(fcHeader_V2_t) + 4)) { DEBUGMSG("%s:%d Unlinking %s", __FILE__, __LINE__, fileName); unlink(fileName); LOGDEBUG(("Deleting %s", fileName)); continue; } name_start = strrchr(fileName, '/'); if (name_start == NULL) { name_start = fileName; } else { name_start++; } /* Allocate the fPtr structure */ fPtr = (fileStruct_t*) calloc(1, sizeof(fileStruct_t)); fPtr->startTime = fPtr->endTime = (uint32_t)time(NULL); fPtr->size = fileSize(fileName); snprintf(fPtr->fName, sizeof(fPtr->fName), "%s", name_start); /* This isn't protected because it should run before the other main flowcap threads are created. */ unallocated_space[FC_DISK] -= fPtr->size; /* Here we parse the sensor name and probe name from the filename */ if ((sensor_start = strchr(name_start, '_'))) { sensor_start++; if (*sensor_start != '\0' && (sensor_end = strchr(sensor_start, '_'))) { probe_start = sensor_end + 1; if (*probe_start != '\0' && (probe_end = strchr(probe_start, '.'))) { /* Here, the sensor name and probe name have been successfully parsed, so we now look for a probe match. */ char sensor_nm[FC_SENSOR_MAX]; char probe_nm[FC_PROBE_MAX]; /* Copy the names */ strncpy(sensor_nm, sensor_start, sensor_end - sensor_start); sensor_nm[sensor_end - sensor_start] = 0; strncpy(probe_nm, probe_start, probe_end - probe_start); probe_nm[probe_end - probe_start] = 0; /* Find the probe */ found = probeConfGetProbeByName(sensor_nm, probe_nm); } } /* Add to the appropriate queue. Unknown is assumed to be high priority. */ if (!found || FC_PROBE_IS_HIGH_PRIORITY(found)) { addToHighPriorityQueue(fPtr); } else { addToLowPriorityQueue(fPtr); } } } globfree(&files); return; } /* * unloadFiles(); * * Must call closeWriteFiles() prior to calling this function. * * Flush write ram disk files to real disk. Then, flush all ram * disk files in the send queue. Since the file currently being * sent is not in the send queue, this should be OK; i.e., we wont * save and hence, double send the currently being sent file. */ void unloadFiles(void) { fileStruct_t *fPtr; char srcFile[PATH_MAX], destFile[PATH_MAX]; skDeque_t tmp; /* now check the send queues for ram disk files */ tmp = skDequeCreate(); while (!skDequePopFrontNB(highQ, (void **)&fPtr)) { if (fPtr->isRamDisk) { /* Copy to hard disk */ sprintf(srcFile, "%s/%s", destDirs[FC_RAM], fPtr->fName); sprintf(destFile, "%s/%s", destDirs[FC_DISK], fPtr->fName); logMsg("Moving %s to %s.", srcFile, destFile); if (copyFile(srcFile, destFile) == 0) { DEBUGMSG("%s:%d Unlinking %s", __FILE__, __LINE__,srcFile); unlink(srcFile); } else { logMsg("Unable to copy %s/%s to %s", destDirs[FC_RAM], fPtr->fName, destDirs[FC_DISK]); } } skDequePushBack(tmp, (void *)fPtr); } while (!skDequePopFrontNB(tmp, (void **)&fPtr)) { skDequePushBack(highQ, (void *)fPtr); } while (!skDequePopFrontNB(lowQ, (void **)&fPtr)) { if (fPtr->isRamDisk) { /* Copy to hard disk */ sprintf(srcFile, "%s/%s", destDirs[FC_RAM], fPtr->fName); sprintf(destFile, "%s/%s", destDirs[FC_DISK], fPtr->fName); logMsg("Moving %s to %s.", srcFile, destFile); if (copyFile(srcFile, destFile) == 0) { DEBUGMSG("%s:%d Unlinking %s", __FILE__, __LINE__,srcFile); unlink(srcFile); } else { logMsg("Unable to copy %s/%s to %s", destDirs[FC_RAM], fPtr->fName, destDirs[FC_DISK]); } } skDequePushBack(tmp, (void *)fPtr); } while (!skDequePopFrontNB(tmp, (void **)&fPtr)) { skDequePushBack(lowQ, (void *)fPtr); } skDequeDestroy(tmp); } /* * addToLowPriorityQueue: * Add a fileStructPtr to the acl send queue. * If possible, initiate another file send. */ void addToLowPriorityQueue(fileStruct_t *fPtr) { LOGDEBUG(("Adding %s to acl send queue", fPtr->fName)); skDequePushFront(lowQ, (void *)fPtr); } /* * addToHighPriorityQueue: * Add a fileStructPtr to the routed send queue. * If possible, initiate another file send. */ void addToHighPriorityQueue(fileStruct_t *fPtr) { LOGDEBUG(("Adding %s to routed send queue", fPtr->fName)); skDequePushFront(highQ, (void *)fPtr); } /* called by sender when sending failed */ void addToSendQueue(fileStruct_t *fPtr) { skDequePushFront(sendQ, (void *)fPtr); } /* * addToFreeQueue: * Add fPtr to the free queue. */ void addToFreeQueue(fileStruct_t *fPtr) { pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&spaceMutex); pthread_mutex_lock(&spaceMutex); unallocated_space[fPtr->isRamDisk] += fPtr->size; skDequePushFront(freeQ, (void *)fPtr); /* Announce the availability of more space */ pthread_cond_broadcast(&spaceCond); pthread_cleanup_pop(1); } void initQueues(void) { pthread_mutex_init(&spaceMutex, NULL); pthread_cond_init(&spaceCond, NULL); lowQ = skDequeCreate(); highQ = skDequeCreate(); freeQ = skDequeCreate(); sendQ = skDequeCreateMerged(lowQ, highQ); } void destroyQueues(void) { skDequeDestroy(sendQ); skDequeDestroy(freeQ); skDequeDestroy(highQ); skDequeDestroy(lowQ); } void dumpQueueLengths(void) { DEBUGMSG("highQ.len = %d lowQ.len = %d", skDequeSize(highQ), skDequeSize(lowQ)); } void broadcastFreeSpace(writeFileStruct_t *wfPtr, off_t size) { pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&spaceMutex); pthread_mutex_lock(&spaceMutex); unallocated_space[wfPtr->fPtr->isRamDisk] += allocFileSize - size; pthread_cond_broadcast(&spaceCond); pthread_cleanup_pop(1); } /* * getFileStruct: Get a free file structure for server mode */ fileStruct_t *getFileStructServer(void) { fileStruct_t *ptr; skDQErr_t err; uint8_t type; pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&spaceMutex); pthread_mutex_lock(&spaceMutex); do { if (unallocated_space[FC_RAM] >= allocFileSize) { type = FC_RAM; break; } else if (unallocated_space[FC_DISK] >= allocFileSize) { type = FC_DISK; break; } if (interfaceup) { shutdownInterface(NULL); } /* Wait for more space */ pthread_cond_wait(&spaceCond, &spaceMutex); } while (1); if ((err = skDequePopFrontNB(freeQ, (void **)&ptr))) { if (err == SKDQ_EMPTY) { ptr = (fileStruct_t*) calloc(1, sizeof(fileStruct_t)); } else { logMsg("Error: popFreeQueue"); return NULL; } } ptr->isRamDisk = type; assert(unallocated_space[type] >= allocFileSize); unallocated_space[type] -= allocFileSize; if (!interfaceup) { restartInterface(NULL); } pthread_cleanup_pop(1); return ptr; } static void fileStructPtr_name(void *item, char *path, char *name) { size_t size; fileStruct_t *fPtr = (fileStruct_t*)item; curSendFPtr = fPtr; size = snprintf(path, PATH_MAX, "%s/%s", destDirs[fPtr->isRamDisk], fPtr->fName); if (size >= PATH_MAX) { logMsg("Path too long. Exiting."); exit(EXIT_FAILURE); } name[0] = '\0'; fPtr->sendStartTime = time((time_t *)NULL); } static void sender_callback( char *path, void *item, sktErr_t err, skTransfer_t UNUSED(sender)) { fileStruct_t *fPtr = (fileStruct_t*)item; char *retry = ""; switch (err) { case SK_TRANSFER_ESUCCESS: logMsg("Successfully sent %s", path); fPtr->sentTime = time((time_t *)NULL); dumpStats(fPtr); curSendFPtr = NULL; addToFreeQueue(fPtr); DEBUGMSG("%s:%d Unlinking %s", __FILE__, __LINE__, path); unlink(path); break; case SK_TRANSFER_ETIMEOUT: case SK_TRANSFER_EFAILED: fPtr->sendingTries++; if (fPtr->sendingTries < 3) { retry = " (Retrying)"; addToSendQueue(fPtr); } logMsg("Failed to transfer %s.%s", path, retry); break; case SK_TRANSFER_EFATAL: logMsg("Fatally failed to transfer %s.", path); curSendFPtr = NULL; addToFreeQueue(fPtr); DEBUGMSG("%s:%d Unlinking %s", __FILE__, __LINE__, path); unlink(path); break; case SK_TRANSFER_ESYSTEM: case SK_TRANSFER_ESYSTEM_THREAD: logMsg("Fatal system error in file transfer. Shutting down."); exit(EXIT_FAILURE); case SK_TRANSFER_ESHUTDOWN_EXTERNAL: logMsg("File transfer aborted."); break; default: abort(); /* Should never get here */ } } /* return 1 if failure. 0 if ok */ int startSender(void) { if (skCreateSenderServer(&sender, fc_listen_port) != SK_TRANSFER_ESUCCESS) { return 1; } if (fc_client_addr == INADDR_ANY) { if (skServerValidClientAny(sender) != SK_TRANSFER_ESUCCESS) { goto error; } } else if (skServerValidClientAdd(sender, fc_client_addr) != SK_TRANSFER_ESUCCESS) { goto error; } if (skTransferSetQueue(sender, sendQ) != SK_TRANSFER_ESUCCESS) { goto error; } if (skTransferSetAckTimeout(sender, ack_timeout) != SK_TRANSFER_ESUCCESS) { goto error; } if (skTransferSetCallback(sender, sender_callback) != SK_TRANSFER_ESUCCESS) { goto error; } if (skTransferSetLogFn(sender, logMsg) != SK_TRANSFER_ESUCCESS) { goto error; } if (skSenderSetFilenameFn(sender, fileStructPtr_name) != SK_TRANSFER_ESUCCESS) { goto error; } if (skTransferStart(sender) != SK_TRANSFER_ESUCCESS) { goto error; } return 0; error: skDestroyTransfer(sender); return 1; } void stopSender(void) { if (sender != NULL) { logMsg("Stopping sender."); skTransferStop(sender, 10); skDestroyTransfer(sender); } sender = NULL; return; } static void dumpStats(fileStruct_t *fPtr) { logMsg("%s Write %u secs. Wait %u secs. Send %u secs.", fPtr->fName, (fPtr->endTime - fPtr->startTime), (fPtr->sendStartTime - fPtr->endTime), (fPtr->sentTime - fPtr->sendStartTime)); } static void shutdownInterface(void *UNUSED(dummy)) { logMsg("Shutting down netflow interface."); stopReader(); interfaceup = 0; } static void restartInterface(void *UNUSED(dummy)) { logMsg("Restarting netflow interface."); if (startReader()) { logMsg("Couldn't restart reader."); exit(EXIT_FAILURE); } interfaceup = 1; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */