/* ** Copyright (C) 2003-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** Reads PDUs from a router and writes the flows into packed files. ** */ #include "silk.h" RCSIDENT("$SiLK: rwflowpack.c 7800 2007-07-06 15:32:43Z mthomas $"); #include "sksite.h" #include "rwflowpack.h" #include "stream-cache.h" #include "sktimer.h" #include "skvector.h" #include "skdaemon.h" /* MACROS AND DATA TYPES */ /* #undef DEBUGGING */ /* #define DEBUGGING 1 */ /* Where to write usage (--help) information */ #define USAGE_FH stdout /* When (at what hour) to rotate the logs. We will disable rotation * when not running as a daemon. */ #define LOG_ROTATION_HOUR 0 /* How big to make the global file cache, g_streamCache */ #define STREAM_CACHE_DEPTH 5 /* The number of reader types to allow for */ #define MAX_READER_TYPE_COUNT 6 /* How often, in seconds, to flush the files in the g_streamCache */ #define FLUSH_CACHE_TIMEOUT 300 #define FLUSH_CACHE_TIMEOUT_STRING "300" /* Default file version to write. silk_site_*.h may have set this. */ #ifndef RWFLOWPACK_DEFAULT_VERSION # define RWFLOWPACK_DEFAULT_VERSION SK_FILE_VERSION_ANY #endif /* Must be greater than the number of switches defined */ #define MAX_OPTION_COUNT 32 #if SK_ENABLE_FLOWCAP /* Number of seconds to wait between polling the incoming directory */ #define DEFAULT_POLL_INTERVAL 15 #define DEFAULT_POLL_INTERVAL_STRING "15" #endif /* In each of the above mode, an option can be required, optional, * illegal, or non-sensical. This enumerates those values; the * mode_options[][] array holds the values for each option for each * mode. */ typedef enum { MODOPT_ILLEGAL = 0, MODOPT_REQUIRED, MODOPT_OPTIONAL, MODOPT_NONSENSE } mode_option_t; typedef void (*cleanupHandler)(void *); /* signal list */ struct siglist { int signal; char *name; }; /* Create an array of these to cache the options we get from the * user */ typedef struct { int seen; char *value; } opt_cache_t; /* PRIVATE FUNCTIONS */ static void appUsageLong(void); static void appTeardown(void); static void appSetup(int argc, char **argv); static int appOptionsHandler(clientData cData, int opt_index, char *opt_arg); static int appOptionsProcessOpt(int opt_index, char *opt_arg); static int optionsDirCheck(const char *opt_name, const char *opt_arg); static int byteOrderParse(const char *endian_string); static int validateOptions(opt_cache_t *ocache, size_t arg_count); static int startProcessor(void); static void stopProcessor(void); static void closeFiles(void); static int writeRecord(const rwRec *rwrec); static int noRwioLocking(rwIOStruct_t *rwios); static int getAllProbes(sk_vector_t *probe_vec); static int getProbesBySensor(sk_vector_t *probe_vec, const char *sensor_list); static int assignProbesToReaderTypes(void); static void nullSigHandler(int sig); static int removeFilesFromDir(const char *dir_name); static void sendFiles(void); static int defineRunModeOptions(void); /* PRIVATE VARIABLES */ static fr_initialize_fn_t reader_type_inits[] = { #if SK_ENABLE_FLOWCAP &fcReaderInitialize, &fcFilesReaderInitialize, #endif #ifdef HAVE_FIXBUF_PUBLIC_H &ipfixReaderInitialize, #endif #if SK_ENABLE_PDU_FILES &pduFileReaderInitialize, #endif &pduReaderInitialize }; /* The possible reader types */ static reader_type_t reader_types[MAX_READER_TYPE_COUNT]; /* Get a total count of the reader types actually implemented. */ static size_t num_reader_types = (sizeof(reader_type_inits) / sizeof(fr_initialize_fn_t)); /* The flow processors; one per probe */ static flow_proc_t *flow_processors = NULL; /* The number of flow_processors */ static size_t num_flow_processors = 0; /* The number of flow_processor threads currently running */ static int fproc_thread_count = 0; /* Mutex controlling access to 'fproc_thread_count' */ static pthread_mutex_t fproc_thread_count_mutex = PTHREAD_MUTEX_INITIALIZER; /* the compression method to use when writing the file. * sksiteCompmethodOptionsRegister() will set this to the default or * to the value the user specifies. */ static sk_compmethod_t comp_method; /* Command line option giving the sensor---from the probeconf * configuration file---that this rwflowpack is packing. If none * given, use first probe in the probeconf file. */ static const char *sensor_name = NULL; #if SK_ENABLE_FLOWCAP /* Which of the reader_types is for flowcap? */ static size_t fcreader_position = UINT32_MAX; /* And for flowcap files? */ static size_t fcfilesreader_position = UINT32_MAX; #endif /* True as long as we are reading. */ static uint8_t reading = 0; /* non-zero when rwflowpack is shutting down */ static volatile int shuttingDown = 0; /* The rwIOStruct_t locking function to use: either rwioLockFile() or * the local noRwioLocking() when --no-file-locking is specified. */ static int (*rwio_lock_fn)(rwIOStruct_t*); /* control thread */ static pthread_t main_thread; /* Timer that flushes files every so often */ static skTimer_t timing_thread = NULL; /* Number of seconds between cache flushes */ static uint32_t flush_timeout = FLUSH_CACHE_TIMEOUT; /* All open files to which we are writing */ static streamCache_t *g_streamCache; /* Mutex controlling access to the g_streamCache */ static pthread_mutex_t cache_mutex = PTHREAD_MUTEX_INITIALIZER; /* Byte order for newly packed files. When appending to existing * files, use the files' existing byte order. */ static silk_endian_t byte_order = SILK_ENDIAN_NATIVE; /* When non-zero, ignore the packing rules specified in the * silk_site_*.h headers and pack using the FT_RWROUTED format which * includes rounter SNMP interface numbers. */ static uint8_t pack_interfaces = 0; /* The directory where files successfully processed by rwflowpack are * stashed (for later debugging or re-packing). Administrator should * create an an external cron job to clean this directory.*/ static const char *archive_directory = NULL; /* When not in sending mode, rwflowpack writes the flows to files * located under the root directory (sksiteGetRootDir()). */ /* When in sending mode, rwflowpack uses this directory temporarily to * create the packed files while it is processing the flowcap input * file. Files in this directory are incomplete. */ static const char *incremental_directory = NULL; /* When in sending mode, rwflowpack copies the complete incremental * files to this directory and informs the sender about them. */ static const char *sender_directory = NULL; static reader_options_t reader_opts; /* options from the user's command line */ static opt_cache_t *opt_cache = NULL; /* How to run: Input and Output Modes */ typedef enum { INPUT_STREAM, #if SK_ENABLE_PDU_FILES INPUT_PDUFILE, #endif #if SK_ENABLE_FLOWCAP INPUT_FLOWCAP, INPUT_FLOWCAP_FILES, #endif OUTPUT_LOCAL_STORAGE, OUTPUT_SENDING } io_mode_t; /* The number of modes */ #define NUM_MODES \ (3 + (SK_ENABLE_PDU_FILES) + (2*(SK_ENABLE_FLOWCAP))) /* input and output modes */ static io_mode_t input_mode = INPUT_STREAM; static io_mode_t output_mode = OUTPUT_LOCAL_STORAGE; /* The index of the first Output Mode */ static const io_mode_t first_output_mode = OUTPUT_LOCAL_STORAGE; static const struct _available_modes { const char *name; const char *title; } available_modes[NUM_MODES] = { {"stream", "Stream Input"}, #if SK_ENABLE_PDU_FILES {"file", "PDU-File Input"}, #endif #if SK_ENABLE_FLOWCAP {"flowcap", "Flowcap Input"}, {"fcfiles", "Flowcap Files Input"}, #endif {"local-storage", "Local-Storage Output"}, {"sending", "Sending Output"} }; /* which options are valid in which modes. */ static mode_option_t mode_options[NUM_MODES][MAX_OPTION_COUNT]; /* Options for byte-order switch */ static struct { const char *name; silk_endian_t value; } byte_order_opts[] = { {"native", SILK_ENDIAN_NATIVE}, {"little", SILK_ENDIAN_LITTLE}, {"big", SILK_ENDIAN_BIG}, {NULL, SILK_ENDIAN_ANY} /* sentinel */ }; /* OPTIONS SETUP */ typedef enum { OPT_INPUT_MODE, OPT_OUTPUT_MODE, OPT_SENSOR_CONFIG, OPT_BYTE_ORDER, OPT_PACK_INTERFACES, OPT_NO_FILE_LOCKING, OPT_FLUSH_TIMEOUT, OPT_READER_FUNCTION, OPT_SENSOR_NAME, #if SK_ENABLE_FLOWCAP OPT_FLOWCAP_ADDRESS, OPT_FLOWCAP_PORT, OPT_WORK_DIRECTORY, OPT_VALID_DIRECTORY, OPT_INCOMING_DIRECTORY, OPT_POLLING_INTERVAL, #endif OPT_NETFLOW_FILE, OPT_ARCHIVE_DIRECTORY, OPT_ROOT_DIRECTORY, OPT_SENDER_DIRECTORY, OPT_INCREMENTAL_DIRECTORY } appOptionsEnum; static struct option appOptions[] = { {"input-mode", REQUIRED_ARG, 0, OPT_INPUT_MODE}, {"output-mode", REQUIRED_ARG, 0, OPT_OUTPUT_MODE}, {"sensor-configuration", REQUIRED_ARG, 0, OPT_SENSOR_CONFIG}, {"byte-order", REQUIRED_ARG, 0, OPT_BYTE_ORDER}, {"pack-interfaces", NO_ARG, 0, OPT_PACK_INTERFACES}, {"no-file-locking", NO_ARG, 0, OPT_NO_FILE_LOCKING}, {"flush-timeout", REQUIRED_ARG, 0, OPT_FLUSH_TIMEOUT}, {"reader-function", REQUIRED_ARG, 0, OPT_READER_FUNCTION}, {"sensor-name", REQUIRED_ARG, 0, OPT_SENSOR_NAME}, #if SK_ENABLE_FLOWCAP {"flowcap-address", REQUIRED_ARG, 0, OPT_FLOWCAP_ADDRESS}, {"flowcap-port", REQUIRED_ARG, 0, OPT_FLOWCAP_PORT}, {"work-directory", REQUIRED_ARG, 0, OPT_WORK_DIRECTORY}, {"valid-directory", REQUIRED_ARG, 0, OPT_VALID_DIRECTORY}, {"incoming-directory", REQUIRED_ARG, 0, OPT_INCOMING_DIRECTORY}, {"polling-interval", REQUIRED_ARG, 0, OPT_POLLING_INTERVAL}, #endif {"netflow-file", REQUIRED_ARG, 0, OPT_NETFLOW_FILE}, {"archive-directory", REQUIRED_ARG, 0, OPT_ARCHIVE_DIRECTORY}, {"root-directory", REQUIRED_ARG, 0, OPT_ROOT_DIRECTORY}, {"sender-directory", REQUIRED_ARG, 0, OPT_SENDER_DIRECTORY}, {"incremental-directory", REQUIRED_ARG, 0, OPT_INCREMENTAL_DIRECTORY}, #if SK_ENABLE_FLOWCAP {"fc-address", REQUIRED_ARG, 0, OPT_FLOWCAP_ADDRESS}, {"fc-port", REQUIRED_ARG, 0, OPT_FLOWCAP_PORT}, #endif {0,0,0,0} /* sentinel entry */ }; static const char *appHelp[] = { ("Select the source of flow records"), ("Select the destination for SiLK flow records"), ("Read sensor configuration from named file"), ("Byte order to use for newly packed files:\n" "\tChoices: 'native', 'little', or 'big'. Def. native"), ("Include SNMP interface indices in packed records\n" "\t(useful for debugging the router configuration). Def. No"), ("Do not attempt to lock the files prior to writing\n" "\trecords to them. Def. Use locking"), ("Time (in seconds) between periodic flushes of data\n" "\tto disk. Def. " FLUSH_CACHE_TIMEOUT_STRING), ("DEPRECATED. Ignored; for backward compatibility only"), ("Ignore all sensors in the sensor-configuration file\n" "\texcept this sensor"), #if SK_ENABLE_FLOWCAP ("Connect to each of these hostname:port pair(s) and\n" "\treceive flowcap files as they are generated by flowcap process(es).\n" "\tArgument syntax is: hostname[:port][,hostname[:port]]*\n" "\tWhen the port is not supplied, the --flowcap-port value is used"), ("Default port on which to connect to the machines\n" "\tspecified by the --flowcap-address option"), ("Directory used while receiving flowcap files;\n" "\tfiles in this directory are incomplete"), ("Directory where successfully received flowcap\n" "\tfiles are stored until processed by rwflowpack"), ("Directory which is monitored for flowcap files"), ("Interval (in seconds) between checks of the\n" "\tincoming directory for new files. Def. " DEFAULT_POLL_INTERVAL_STRING), #endif ("Read NetFlow v5 flow records from the named file,\n" "\tpack the flows, and exit rwflowpack"), ("Root of the directory tree into which successfully\n" "\tprocessed input files are moved Def. None"), ("Store the packed files locally under the directory\n" "\ttree tree rooted at this location"), ("Move the packed files into this directory for\n" "\tprocessing by rwsender"), ("Temporary directory to use while generating\n" "\tpacked SiLK data files before moving the completed files to the\n" "\tsender-directory"), #if SK_ENABLE_FLOWCAP ("DEPRECATED. Alias for --flowcap-address"), ("DEPRECATED. Alias for --flowcap-port"), #endif (char *)NULL /* sentinel entry */ }; static const appOptionsEnum last_common_option = OPT_READER_FUNCTION; /* FUNCTION DEFINITIONS */ /* * appUsageLong(); * * Print complete usage information to USAGE_FH. Pass this * function to skOptionsSetUsageCallback(); optionsParse() will * call this funciton and then exit the program when the --help * option is given. */ static void appUsageLong(void) { #define USAGE_MSG \ ("\n" \ "\tRead flow records generated by NetFlow(v5), IPFIX, or flowcap\n" \ "\tfrom a socket or from a file and pack the flow records into\n" \ "\thourly flat-files organized in a time-based directory structure.\n") FILE *fh = USAGE_FH; unsigned int i, j; fprintf(fh, "%s %s", skAppName(), USAGE_MSG); fprintf(fh, "\nCommon switches:\n"); skOptionsDefaultUsage(fh); sksiteOptionsUsage(fh); for (i = 0; i <= last_common_option; ++i) { fprintf(fh, "--%s %s. ", appOptions[i].name, SK_OPTION_HAS_ARG(appOptions[i])); switch (appOptions[i].val) { case OPT_INPUT_MODE: fprintf(fh, "%s\n\tChoices: %s", appHelp[i], available_modes[0].name); for (j = 1; j < first_output_mode; ++j) { fprintf(fh, ", %s", available_modes[j].name); } for (j = 0; j < first_output_mode; ++j) { if (j == input_mode) { fprintf(fh, ". Def. %s", available_modes[j].name); break; } } break; case OPT_OUTPUT_MODE: fprintf(fh, "%s\n\tChoices: %s", appHelp[i], available_modes[first_output_mode].name); for (j = 1+first_output_mode; j < NUM_MODES; ++j) { fprintf(fh, ", %s", available_modes[j].name); } for (j = first_output_mode; j < NUM_MODES; ++j) { if (j == output_mode) { fprintf(fh, ". Def. %s", available_modes[j].name); break; } } break; default: fprintf(fh, "%s", appHelp[i]); break; } fprintf(fh, "\n"); } sksiteCompmethodOptionsUsage(fh); skdaemonOptionsUsage(fh); for (j = 0; j < NUM_MODES; ++j) { fprintf(fh, "\n%s Mode (--%s=%s)", available_modes[j].title, appOptions[(j < first_output_mode) ? OPT_INPUT_MODE : OPT_OUTPUT_MODE].name, available_modes[j].name); if ((j == input_mode) || (j == output_mode)) { fprintf(fh, " [default]"); } fprintf(fh, "\n"); for (i = last_common_option+1; appOptions[i].name; ++i) { switch (mode_options[j][appOptions[i].val]) { case MODOPT_REQUIRED: case MODOPT_OPTIONAL: fprintf(fh, "--%s %s. %s\n", appOptions[i].name, SK_OPTION_HAS_ARG(appOptions[i]), appHelp[i]); break; case MODOPT_ILLEGAL: case MODOPT_NONSENSE: break; } } } } /* * appTeardown() * * Teardown all modules, close all files, and tidy up all * application state. * * This function is idempotent. */ static void appTeardown(void) { static int teardownFlag = 0; size_t i; reader_type_t *rt; if (teardownFlag) { return; } teardownFlag = 1; #if SK_ENABLE_PDU_FILES if (input_mode == INPUT_PDUFILE) { INFOMSG("Finishing rwflowpack..."); } else #endif { INFOMSG("Begin shutting down..."); } shuttingDown = 1; stopProcessor(); closeFiles(); for (i = 0; i < num_reader_types; ++i) { rt = &reader_types[i]; if (rt->teardown_fn != NULL) { rt->teardown_fn(); rt->teardown_fn = NULL; } if (rt->probes != NULL) { skVectorDestroy(rt->probes); rt->probes = NULL; } } if (flow_processors) { free(flow_processors); flow_processors = NULL; } if (opt_cache) { free(opt_cache); opt_cache = NULL; } probeConfTeardown(); #if SK_ENABLE_PDU_FILES if (input_mode == INPUT_PDUFILE) { INFOMSG("Finished processing PDU file."); } else #endif { INFOMSG("Finished shutting down."); } skdaemonTeardown(); skAppUnregister(); } /* * appSetup(argc, argv); * * Perform all the setup for this application include setting up * required modules, parsing options, etc. This function should be * passed the same arguments that were passed into main(). * * Returns to the caller if all setup succeeds. If anything fails, * this function will cause the application to exit with a FAILURE * exit status. */ static void appSetup(int argc, char **argv) { int arg_index; size_t i; reader_type_t *rt; size_t arg_count = sizeof(appOptions)/sizeof(struct option); struct sigaction action; /* verify same number of options and help strings */ assert((sizeof(appHelp)/sizeof(char *)) == arg_count); assert(arg_count < MAX_OPTION_COUNT); /* verify reader_types array is big enough */ assert(num_reader_types <= MAX_READER_TYPE_COUNT); /* register the application */ skAppRegister(argv[0]); skOptionsSetUsageCallback(&appUsageLong); /* initialize globals */ rwio_lock_fn = &rwioLockFile; memset(reader_types, 0, sizeof(reader_types)); memset(&reader_opts, 0, sizeof(reader_opts)); /* create an array to hold the user's command line options */ opt_cache = calloc(arg_count, sizeof(opt_cache_t)); if (NULL == opt_cache) { skAppPrintErr("memory allocation failure"); exit(EXIT_FAILURE); } /* Set which switches are valid for which modes */ if (defineRunModeOptions()) { skAppPrintErr("unable to initialize modes"); exit(EXIT_FAILURE); } /* register the options */ if (optionsRegister(appOptions, (optHandler)appOptionsHandler, (clientData)opt_cache) || sksiteCompmethodOptionsRegister(&comp_method) || sksiteOptionsRegister(SK_SITE_FLAG_CONFIG_FILE)) { skAppPrintErr("unable to register options"); exit(EXIT_FAILURE); } /* rwflowcap runs as a daemon; use the threaded logger */ if (skdaemonSetup((SKLOG_FEATURE_LEGACY | SKLOG_FEATURE_SYSLOG), argc, argv) || sklogEnableThreadedLogging()) { exit(EXIT_FAILURE); } /* Allow each reader to do its initial setup and register its options */ for (i = 0; i < num_reader_types; ++i) { rt = &reader_types[i]; #if SK_ENABLE_FLOWCAP /* is this the reader for flowcap? */ if (&fcReaderInitialize == reader_type_inits[i]) { assert(fcreader_position == UINT32_MAX); fcreader_position = i; } /* is this the reader for flowcap files? */ if (&fcFilesReaderInitialize == reader_type_inits[i]) { assert(fcfilesreader_position == UINT32_MAX); fcfilesreader_position = i; } #endif /* call it's initialize function */ if (0 != ((*reader_type_inits[i])(rt))) { /* initialize failed. print error and exit. */ if (rt->reader_name) { skAppPrintErr("unable to setup the %s flow reader", reader_types[i].reader_name); } else { skAppPrintErr("unable to setup the flow reader number %u", (unsigned int)i); } exit(EXIT_FAILURE); } } /* parse the options */ arg_index = optionsParse(argc, argv); if (arg_index < 0) { /* options handler has printed error */ skAppUsage(); } /* check for additional arguments */ if (argc != arg_index) { skAppPrintErr("too many or unrecognized argument specified '%s'", argv[arg_index]); skAppUsage(); } /* check for or missing inconsistent switches; we check i against * the options value to skip the deprecated options. */ for (i = 0; (i < arg_count && i == (size_t)appOptions[i].val); ++i) { if (opt_cache[i].seen == 0) { /* option 'i' not given; see if it is required */ if (mode_options[input_mode][i] == MODOPT_REQUIRED) { skAppPrintErr("--%s switch is required in %s Mode", appOptions[i].name, available_modes[input_mode].title); skAppUsage(); } if (mode_options[output_mode][i] == MODOPT_REQUIRED) { skAppPrintErr("--%s switch is required in %s Mode", appOptions[i].name, available_modes[output_mode].title); skAppUsage(); } } else { /* option 'i' was given; see if it is illegal */ if (mode_options[input_mode][i] == MODOPT_ILLEGAL) { skAppPrintErr("--%s switch is illegal in %s Mode", appOptions[i].name, available_modes[input_mode].title); skAppUsage(); } if (mode_options[output_mode][i] == MODOPT_ILLEGAL) { skAppPrintErr("--%s switch is illegal in %s Mode", appOptions[i].name, available_modes[output_mode].title); skAppUsage(); } } } /* validate the options */ if (validateOptions(opt_cache, arg_count)) { skAppUsage(); /* never returns */ } /* set the mask so that the mode is 0644 */ (void)umask((mode_t)0022); if (output_mode == OUTPUT_SENDING) { /* clean out incremental dir */ if (removeFilesFromDir(incremental_directory) == -1) { skAppPrintErr("couldn't open %s '%s' for cleaning", appOptions[OPT_INCREMENTAL_DIRECTORY].name, incremental_directory); exit(EXIT_FAILURE); } } /* Set the application to process SIGUSR2 */ memset(&action, 0, sizeof(action)); /* mask any further signals while we're inside the handler */ sigfillset(&action.sa_mask); action.sa_handler = &nullSigHandler; if (sigaction(SIGUSR2, &action, NULL) == -1) { skAppPrintErr("Couldn't handle SIGUSR2: %s", strerror(errno)); exit(EXIT_FAILURE); } /* who am I? */ main_thread = pthread_self(); return; /* OK */ } /* * status = appOptionsHandler(cData, opt_index, opt_arg); * * This function is passed to optionsRegister(); it will be called * by optionsParse() for each user-specified switch that the * application has registered; it should handle the switch as * required---typically by setting global variables---and return 1 * if the switch processing failed or 0 if it succeeded. Returning * a non-zero from from the handler causes optionsParse() to return * a negative value. * * The clientData in 'cData' is typically ignored; 'opt_index' is * the index number that was specified as the last value for each * struct option in appOptions[]; 'opt_arg' is the user's argument * to the switch for options that have a REQUIRED_ARG or an * OPTIONAL_ARG. */ static int appOptionsHandler( clientData cData, int opt_index, char *opt_arg) { static int arg_count = 0; opt_cache_t *ocache = (opt_cache_t*)cData; unsigned int i; int found_mode; switch ((appOptionsEnum)opt_index) { case OPT_INPUT_MODE: found_mode = 0; for (i = 0; i < first_output_mode; ++i) { if (0 == strcmp(opt_arg, available_modes[i].name)) { found_mode = 1; input_mode = (io_mode_t)i; break; } } if (!found_mode) { skAppPrintErr("Invalid %s '%s'", appOptions[opt_index].name, opt_arg); return 1; } break; case OPT_OUTPUT_MODE: found_mode = 0; for (i = first_output_mode; i < NUM_MODES; ++i) { if (0 == strcmp(opt_arg, available_modes[i].name)) { found_mode = 1; output_mode = (io_mode_t)i; break; } } if (!found_mode) { skAppPrintErr("Invalid %s '%s'", appOptions[opt_index].name, opt_arg); return 1; } break; case OPT_READER_FUNCTION: /* ignored */ break; default: if (ocache[opt_index].seen) { skAppPrintErr("Switch %s already seen", appOptions[opt_index].name); return 1; } ++arg_count; ocache[opt_index].seen = arg_count; ocache[opt_index].value = opt_arg; break; } return 0; } static int appOptionsProcessOpt( int opt_index, char *opt_arg) { uint32_t opt_val; switch ((appOptionsEnum)opt_index) { case OPT_ROOT_DIRECTORY: if (optionsDirCheck(appOptions[opt_index].name, opt_arg)) { return 1; } sksiteSetRootDir(opt_arg); break; case OPT_ARCHIVE_DIRECTORY: if (optionsDirCheck(appOptions[opt_index].name, opt_arg)) { return 1; } archive_directory = opt_arg; break; case OPT_SENDER_DIRECTORY: if (optionsDirCheck(appOptions[opt_index].name, opt_arg)) { return 1; } sender_directory = opt_arg; break; case OPT_INCREMENTAL_DIRECTORY: if (optionsDirCheck(appOptions[opt_index].name, opt_arg)) { return 1; } incremental_directory = opt_arg; break; case OPT_BYTE_ORDER: return byteOrderParse(opt_arg); break; case OPT_PACK_INTERFACES: pack_interfaces = 1; break; case OPT_NO_FILE_LOCKING: rwio_lock_fn = &noRwioLocking; break; case OPT_SENSOR_CONFIG: if (probeConfParse(opt_arg)) { return 1; } break; case OPT_SENSOR_NAME: sensor_name = opt_arg; break; #if SK_ENABLE_FLOWCAP case OPT_FLOWCAP_PORT: if (skStringParseUint32(&opt_val, opt_arg, 0, 0xFFFF)) { skAppPrintErr("Invalid %s value '%s'", appOptions[opt_index].name, opt_arg); return 1; } reader_opts.flowcap.flowcap_default_port = (int)opt_val; break; case OPT_FLOWCAP_ADDRESS: reader_opts.flowcap.flowcap_address_string = opt_arg; break; case OPT_WORK_DIRECTORY: if (optionsDirCheck(appOptions[opt_index].name, opt_arg)) { return 1; } reader_opts.flowcap.work_directory = opt_arg; break; case OPT_VALID_DIRECTORY: if (optionsDirCheck(appOptions[opt_index].name, opt_arg)) { return 1; } reader_opts.flowcap.valid_directory = opt_arg; break; case OPT_INCOMING_DIRECTORY: if (optionsDirCheck(appOptions[opt_index].name, opt_arg)) { return 1; } reader_opts.fcfiles.incoming_directory = opt_arg; break; case OPT_POLLING_INTERVAL: if (skStringParseUint32(&opt_val, opt_arg, 1, 0)) { skAppPrintErr("Invalid argument to --%s switch: '%s'", appOptions[opt_index].name, opt_arg); return 1; } reader_opts.fcfiles.polling_interval = opt_val; break; #endif /* SK_ENABLE_FLOWCAP */ case OPT_FLUSH_TIMEOUT: if (skStringParseUint32(&opt_val, opt_arg, 1, 0)) { skAppPrintErr("Invalid argument to --%s switch: '%s'", appOptions[opt_index].name, opt_arg); return 1; } flush_timeout = opt_val; break; case OPT_NETFLOW_FILE: if (opt_arg[0] == '\0') { skAppPrintErr("Empty filename supplied"); return 1; } reader_opts.pdu_file.netflow_file = opt_arg; break; case OPT_INPUT_MODE: case OPT_OUTPUT_MODE: case OPT_READER_FUNCTION: /* ain't supposed to happen */ assert(0); abort(); } return 0; /* OK */ } /* * dir_exists = optionsDirCheck(opt_name, opt_arg); * * Verify that the directory in 'opt_arg' exists and that we have a * full path to the directory. If so, return 0; otherwise, print * an error that the option named by 'opt_name' was bad and return * -1; */ static int optionsDirCheck(const char *opt_name, const char *opt_arg) { if (!opt_arg || !opt_arg[0]) { skAppPrintErr("the directory argument to %s is empty", opt_name); return -1; } if (!dirExists(opt_arg)) { skAppPrintErr("the %s '%s' does not exist", opt_name, opt_arg); return -1; } if (opt_arg[0] != '/') { skAppPrintErr(("must use complete path to %s.\n" "\t('%s' does not begin with slash)"), opt_name, opt_arg); return -1; } return 0; } /* * ok = byteOrderParse(argument) * * parse the argument to the --byte-order switch */ static int byteOrderParse(const char *endian_string) { static int option_seen = 0; int i; size_t len; /* only process option one time */ if (option_seen != 0) { skAppPrintErr("Option %s given multiple times", appOptions[OPT_BYTE_ORDER].name); return 1; } option_seen = 1; len = strlen(endian_string); if (len == 0) { skAppPrintErr("Empty string given for %s option", appOptions[OPT_BYTE_ORDER].name); return 1; } /* initialize byte order */ byte_order = SILK_ENDIAN_ANY; /* parse user's input */ for (i = 0; byte_order_opts[i].name != NULL; ++i) { if ((len <= strlen(byte_order_opts[i].name)) && (0 == strncmp(byte_order_opts[i].name, endian_string, len))) { if (byte_order != SILK_ENDIAN_ANY) { skAppPrintErr("Ambiguous %s value '%s'", byte_order_opts[i].name, endian_string); return 1; } byte_order = byte_order_opts[i].value; } } if (byte_order == SILK_ENDIAN_ANY) { skAppPrintErr("Cannot parse %s value '%s'", appOptions[OPT_BYTE_ORDER].name, endian_string); return 1; } return 0; } /* * ok = validateOptions(argc, argv); * * Call the options parser. If options parsing succeeds, validate * that all the required arguments are present, and that the user * didn't give inconsistent arguments. Returns 0 on success, or -1 * otherwise. */ static int validateOptions(opt_cache_t *ocache, size_t arg_count) { size_t i; int daemon_seen; fp_daemon_mode_t old_deamon_mode = FP_DAEMON_OFF; fp_daemon_mode_t deamon_mode = FP_DAEMON_OFF; reader_type_t *rt; /* Process the root directory first */ if (ocache[OPT_ROOT_DIRECTORY].seen) { if (appOptionsProcessOpt(OPT_ROOT_DIRECTORY, ocache[OPT_ROOT_DIRECTORY].value)) { return 1; } ocache[OPT_ROOT_DIRECTORY].seen = 0; } /* ensure the site config is available; do this after setting the * root directory */ if (sksiteConfigure(1)) { exit(EXIT_FAILURE); } /* setup the probe configuration parser */ if (probeConfSetup()) { skAppPrintErr("unable to setup probe config file parser"); exit(EXIT_FAILURE); } /* Call the "real" options handler for every option we saw */ for (i = 0; i < arg_count; ++i) { if (ocache[i].seen) { if (appOptionsProcessOpt(i, ocache[i].value)) { return 1; } } } #if SK_ENABLE_FLOWCAP /* Set the polling-interval default value */ if (input_mode == INPUT_FLOWCAP_FILES && ocache[OPT_POLLING_INTERVAL].seen == 0) { reader_opts.fcfiles.polling_interval = DEFAULT_POLL_INTERVAL; } #endif /* make certain we have at least one probe from the configuration * file. */ if (probeConfProbeCount() == 0) { skAppPrintErr("No probes were read from the configuration file."); return -1; } if (assignProbesToReaderTypes()) { return -1; } /* verify the required options for logging */ if (skdaemonOptionsVerify()) { return -1; } /* Call the setup function for each active reader; ignore readers * that have no probes. */ daemon_seen = 0; for (i = 0; i < num_reader_types; ++i) { rt = &reader_types[i]; if (NULL == rt->probes) { /* Call teardown_fn for this reader now? */ continue; } if (rt->setup_fn(&deamon_mode, rt->probes, &reader_opts)) { return -1; } /* All the active reader_types must have same "daemon-ness" */ if (daemon_seen == 0) { daemon_seen = 1; old_deamon_mode = deamon_mode; } else if (old_deamon_mode != deamon_mode) { skAppPrintErr("cannot mix probes that work as daemons with\n" "\tprobes that do not."); } } assert(daemon_seen); if (deamon_mode == FP_DAEMON_OFF) { skdaemonDontFork(); } return 0; } /* * status = defineRunModeOptions(); * * Set the values mode_options[][] array. */ static int defineRunModeOptions(void) { unsigned int i, j; memset(mode_options, MODOPT_ILLEGAL, sizeof(mode_options)); /* common options; all are optional except sensor-config */ for (i = 0; i < NUM_MODES; ++i) { for (j = 0; j <= last_common_option; ++j) { mode_options[i][j] = MODOPT_OPTIONAL; } mode_options[i][OPT_SENSOR_CONFIG] = MODOPT_REQUIRED; } /* for the input modes, we don't care whether the output options * were specified. */ for (i = 0; i < first_output_mode; ++i) { mode_options[i][OPT_SENDER_DIRECTORY] = MODOPT_NONSENSE; mode_options[i][OPT_INCREMENTAL_DIRECTORY] = MODOPT_NONSENSE; mode_options[i][OPT_ROOT_DIRECTORY] = MODOPT_NONSENSE; } /* for the output modes, we don't care about input-only options */ for (i = first_output_mode; i < NUM_MODES; ++i) { #if SK_ENABLE_FLOWCAP mode_options[i][OPT_WORK_DIRECTORY] = MODOPT_NONSENSE; mode_options[i][OPT_VALID_DIRECTORY] = MODOPT_NONSENSE; mode_options[i][OPT_FLOWCAP_ADDRESS] = MODOPT_NONSENSE; mode_options[i][OPT_FLOWCAP_PORT] = MODOPT_NONSENSE; mode_options[i][OPT_INCOMING_DIRECTORY] = MODOPT_NONSENSE; mode_options[i][OPT_POLLING_INTERVAL] = MODOPT_NONSENSE; #endif mode_options[i][OPT_ARCHIVE_DIRECTORY] = MODOPT_NONSENSE; mode_options[i][OPT_NETFLOW_FILE] = MODOPT_NONSENSE; mode_options[i][OPT_SENSOR_NAME] = MODOPT_NONSENSE; } #if SK_ENABLE_FLOWCAP mode_options[INPUT_FLOWCAP][OPT_WORK_DIRECTORY] = MODOPT_REQUIRED; mode_options[INPUT_FLOWCAP][OPT_VALID_DIRECTORY] = MODOPT_REQUIRED; mode_options[INPUT_FLOWCAP][OPT_FLOWCAP_ADDRESS] = MODOPT_REQUIRED; mode_options[INPUT_FLOWCAP][OPT_FLOWCAP_PORT] = MODOPT_OPTIONAL; mode_options[INPUT_FLOWCAP][OPT_ARCHIVE_DIRECTORY] = MODOPT_OPTIONAL; mode_options[INPUT_FLOWCAP_FILES][OPT_INCOMING_DIRECTORY]=MODOPT_REQUIRED; mode_options[INPUT_FLOWCAP_FILES][OPT_POLLING_INTERVAL] = MODOPT_OPTIONAL; mode_options[INPUT_FLOWCAP_FILES][OPT_ARCHIVE_DIRECTORY] =MODOPT_OPTIONAL; #endif #if SK_ENABLE_PDU_FILES mode_options[INPUT_PDUFILE][OPT_NETFLOW_FILE] = MODOPT_REQUIRED; mode_options[INPUT_PDUFILE][OPT_ARCHIVE_DIRECTORY] = MODOPT_OPTIONAL; mode_options[INPUT_PDUFILE][OPT_SENSOR_NAME] = MODOPT_OPTIONAL; #endif mode_options[INPUT_STREAM][OPT_SENSOR_NAME] = MODOPT_OPTIONAL; mode_options[OUTPUT_SENDING][OPT_SENDER_DIRECTORY] = MODOPT_REQUIRED; mode_options[OUTPUT_SENDING][OPT_INCREMENTAL_DIRECTORY] = MODOPT_REQUIRED; mode_options[OUTPUT_LOCAL_STORAGE][OPT_ROOT_DIRECTORY] = MODOPT_REQUIRED; return 0; } /* * ok = noRwioLocking(&rwios); * * Do nothing. Used in place rwioLockFile() when --no-file-locking * is specified on the command line. */ static int noRwioLocking(rwIOStruct_t UNUSED(*rwIOS)) { return LIBRW_OK; } /* * nullSigHandler(signal); * * Do nothing. Called when SIGUSR2 is signaled. */ void nullSigHandler(int UNUSED(s)) { return; } /* * flushFiles(NULL); * * THREAD ENTRY POINT * * Flushes all the files in global stream cache g_streamCache. * * Called every 'flush_timeout' seconds by the timing_thread. */ static skTimerRepeat_t flushFiles(void *UNUSED(dummy)) { pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&cache_mutex); pthread_mutex_lock(&cache_mutex); INFOMSG("Flushing files after %d seconds.", flush_timeout); skStreamCacheFlush(g_streamCache); pthread_cleanup_pop(1); return SK_TIMER_REPEAT; } /* * timedSendFiles(NULL); * * THREAD ENTRY POINT * * Closes all the open files in the stream cache and moves all the * files in the incremental-directory to the sender-directory. * * Called every 'flush_timeout' seconds by the timing_thread. */ static skTimerRepeat_t timedSendFiles(void *UNUSED(dummy)) { sendFiles(); return SK_TIMER_REPEAT; } /* * closeFiles(); * * Close all files in the global stream cache, g_streamCache, and * destroy the cache. */ static void closeFiles(void) { pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&cache_mutex); pthread_mutex_lock(&cache_mutex); INFOMSG("Closing all files."); /* Destroy the cache, flushing, closing and freeing all the open * streams */ skStreamCacheDestroy(g_streamCache); pthread_cleanup_pop(1); } /* * */ static int assignProbesToReaderTypes(void) { sk_vector_t *probe_vec; size_t count; size_t i, j; reader_type_t *rt; reader_type_t *probe_rt; probe_def_t *p; int rv = -1; /* create vector to hold probes */ probe_vec = skVectorNew(sizeof(probe_def_t*)); if (NULL == probe_vec) { skAppPrintErr("vector create failed"); goto END; } /* either get all probes or apply the --sensor-name to the probes * we read from the configuration file */ if (NULL == sensor_name) { if (getAllProbes(probe_vec)) { goto END; } } else { if (getProbesBySensor(probe_vec, sensor_name)) { goto END; } } /* determine number of probes */ count = skVectorGetCount(probe_vec); #if SK_ENABLE_FLOWCAP /* how we create the flow_processors depends on whether we're in * flowcap mode. */ if (input_mode == INPUT_FLOWCAP || input_mode == INPUT_FLOWCAP_FILES) { /* in flowcap mode, the flowcap reader_type handles all * probes, and there is one flow_processor per * flowcap_address. */ /* get the correct reader */ if (input_mode == INPUT_FLOWCAP) { assert(fcreader_position != UINT32_MAX); probe_rt = &reader_types[fcreader_position]; } else { assert(input_mode == INPUT_FLOWCAP_FILES); assert(fcfilesreader_position != UINT32_MAX); probe_rt = &reader_types[fcfilesreader_position]; } /* Currently, only one flow_processor is allowed. */ num_flow_processors = 1; flow_processors = calloc(num_flow_processors, sizeof(flow_proc_t)); if (!flow_processors) { skAppPrintErr("allocation error"); goto END; } flow_processors[0].probe = NULL; flow_processors[0].reader_type = probe_rt; /* need to set the 'probes' field on the reader_type to a * non-NULL value, so we know the flowcap reader_type is in * use. Create an empty vector; no need to fill it. */ probe_rt->probes = skVectorNew(sizeof(probe_def_t*)); if (probe_rt->probes == NULL) { skAppPrintErr("Cannot create probes vector"); goto END; } } else #endif /* SK_ENABLE_FLOWCAP */ { /* since there will never be more flow_processors than the * number of probes, allocate enough space for each probe to * have its own flow_processor */ num_flow_processors = 0; flow_processors = calloc(count, sizeof(flow_proc_t)); if (!flow_processors) { skAppPrintErr("allocation error"); goto END; } /* assign each probe to a reader_type */ for (j = 0; 0 == skVectorGetValue(&p, probe_vec, j); ++j) { probe_rt = NULL; for (i = 0; i < num_reader_types; ++i) { rt = &reader_types[i]; if (rt->want_probe_fn(p)) { /* reader 'rt' can process probe 'p' */ if (probe_rt == NULL) { probe_rt = rt; } else { skAppPrintErr("Multiple readers can process probe %s.", skProbeGetUniqueName(p)); goto END; } } } if (probe_rt == NULL) { skAppPrintErr("No reader wants to process probe %s.", skProbeGetUniqueName(p)); goto END; } assert(num_flow_processors < count); flow_processors[num_flow_processors].probe = p; flow_processors[num_flow_processors].reader_type = probe_rt; num_flow_processors++; /* add the probe to the 'probes' vector on the reader_type, * creating the vector if it does not exist. */ if (probe_rt->probes == NULL) { probe_rt->probes = skVectorNew(sizeof(probe_def_t*)); if (probe_rt->probes == NULL) { skAppPrintErr("Cannot create probes vector"); goto END; } } if (skVectorAppendValue(probe_rt->probes, &p)) { skAppPrintErr("Cannot append probe to vector"); goto END; } } /* sanity check that we processed every probe. */ if (j != count) { skAppPrintErr("Error getting probe %u from vector", (unsigned int)j); goto END; } /* should we realloc num_flow_processors here? */ } /* success */ rv = 0; END: if (rv != 0) { /* failure. clean up the vectors and the flow_processors[] */ for (i = 0; i < num_reader_types; ++i) { if (reader_types[i].probes != NULL) { skVectorDestroy(reader_types[i].probes); reader_types[i].probes = NULL; } } if (flow_processors) { free(flow_processors); flow_processors = NULL; } } if (probe_vec) { skVectorDestroy(probe_vec); } return rv; } /* * status = getAllProbes(out_probe_vector); * * Append all the probes defined in the probe-configuration file to * the 'out_probe_vector'. Return 0 on success, or nonzero on * error: memory allocation related. */ static int getAllProbes(sk_vector_t *probe_vec) { const probe_def_t *p; size_t count; size_t i; int rv = -1; count = probeConfProbeCount(); /* get each probe from probeConf and append to 'probe_vec' */ for (i = 0; i < count; ++i) { p = probeConfProbeGetByPosition(i); if (NULL == p) { skAppPrintErr("Cannot get probe #%u from configuration file", (unsigned int)i); goto END; } if (skVectorAppendValue(probe_vec, &p)) { skAppPrintErr("Cannot append probe #%u to the probe vector", (unsigned int)i); goto END; } } rv = 0; END: return rv; } /* * status = getProbesBySensor(out_probe_vector, sensor_list); * * Treat 'sensor_list' as a C string containing a comma separated * list of Sensor Names, Sensor IDs and Sensor ID-ranges. Append * all the probes defined in the probe-configuration file that * appear in the sensor-list to the 'out_probe_vector'. Return 0 * on success, or nonzero otherwise: memory allocation failure, * unable to parse 'sensor_list', no probes existing for a * specified sensor. */ static int getProbesBySensor(sk_vector_t *probe_vec, const char *sensor_list) { sk_vector_t *sensor_vec; sensorID_t sensor_id; ssize_t sensor_count; size_t i; ssize_t probe_count; int rv = -1; char sensor_name[SK_MAX_STRLEN_SENSOR+1]; /* create vector to hold list of sensors */ sensor_vec = skVectorNew(sizeof(sensorID_t)); if (NULL == sensor_vec) { skAppPrintErr("vector create failed"); goto END; } /* parse the textual 'sensor_list' to get a vector of sensor-ids */ sensor_count = sksiteParseSensorList(sensor_vec, sensor_list); if (sensor_count < 1) { skAppPrintErr("Bad sensor list '%s'", sensor_list); goto END; } /* for each sensor, add its probes to the 'probe_vec' */ for (i = 0; 0 == skVectorGetValue(&sensor_id, sensor_vec, i); ++i) { /* get probes for this sensor */ probe_count = probeConfGetProbesForSensor(probe_vec, sensor_id); if (probe_count < 1) { sksiteSensorGetName(sensor_name, sizeof(sensor_name), sensor_id); skAppPrintErr("No probes exist for sensor '%s'", sensor_name); goto END; } } if (i != (size_t)sensor_count) { skAppPrintErr("Cannot read value %u from sensor vector", (unsigned int)i); goto END; } /* successful */ rv = 0; END: if (sensor_vec) { skVectorDestroy(sensor_vec); } return rv; } /* * rwios = openOutputStream(flowtype, timestamp, sensor_id); * * Creates an rwIO Stream to store flow records of the specified * 'flowtype', hourly 'timestamp', and 'sensor_id'. * * The file's location is determined by the times_tamp and by using * 'flowtype' to index into the outFInfo[] array defined in * silk_site_*.h. * * The file output format is determined by 'flowtype', though the * global variable 'pack_interfaces' may be override the format. * * The format-version and byte order of the file are determined by * a CPP macro and a global variable. * * Gets a write lock on the file. * * Returns the stream on success, or NULL on error. */ static rwIOStruct_t *openOutputStream( uint8_t flowtype, uint32_t time_stamp, sensorID_t sensor_id) { char filename[PATH_MAX]; rwIOStruct_t *rwIOS = NULL; int file_format; int errnum = 0; int rv; /* Build the file name--WHERE the records will be written onto * disk. */ if (output_mode == OUTPUT_SENDING) { char tmpbuf[PATH_MAX]; char *fname; sksiteGeneratePathname(tmpbuf, sizeof(tmpbuf), flowtype, sensor_id, time_stamp, "", NULL, &fname); snprintf(filename, sizeof(filename), "%s/%s", incremental_directory, fname); } else { sksiteGeneratePathname(filename, sizeof(filename), flowtype, sensor_id, time_stamp, "", NULL, NULL); } /* Get the file output format---HOW the records will be written to * disk. */ if (pack_interfaces == 0) { file_format = sksite_filetypeFormats[flowtype]; } else { /* Use a format with SNMP interface information */ #ifdef SK_ENABLE_INITIAL_TCPFLAGS file_format = FT_RWAUGROUTING; #else file_format = FT_RWROUTED; #endif } if (fileExists(filename)) { /* Open existing file for append */ if ((rv = rwioCreate(&rwIOS, filename, SK_RWIO_APPEND)) || (rv = rwioOpen(rwIOS)) || (rv = (*rwio_lock_fn)(rwIOS))) { rwioPrintLastErr(rwIOS, rv, &CRITMSG); if (rwIOS) { errnum = rwIOS->errnum; } rwioDestroy(&rwIOS); goto END; } if (file_format != rwGetFileType(rwIOS)) { NOTICEMSG("Warning: File Format of %s does not match expected", rwGetFileName(rwIOS)); } if (time_stamp != rwGetFileSTime(rwIOS)) { NOTICEMSG("Warning: Timestamp of file %s does not match expected", rwGetFileName(rwIOS)); } } else { /* Open a new file */ INFOMSG("Opening new output file %s", filename); if ((rv = rwioCreate(&rwIOS, filename, SK_RWIO_WRITE)) || (rv = rwioMakeDirectory(rwIOS)) || (rv = rwioSetFileType(rwIOS, file_format)) || (rv = rwioSetFileVersion(rwIOS, RWFLOWPACK_DEFAULT_VERSION)) || (rv = rwioSetCompression(rwIOS, comp_method)) || (rv = rwioSetFileByteorder(rwIOS, byte_order)) || (rv = rwioSetFileSTime(rwIOS, time_stamp, 0)) || (rv = rwioOpen(rwIOS)) || (rv = (*rwio_lock_fn)(rwIOS)) || (rv = rwioWriteHeader(rwIOS))) { rwioPrintLastErr(rwIOS, rv, &CRITMSG); if (rwIOS) { errnum = rwIOS->errnum; } rwioDestroy(&rwIOS); if (fileExists(filename)) { unlink(filename); } goto END; } } /* rwioOpen() will map the sensor-name part of the file name to a * sensor-id based on the data in silk.conf. If the rwioOpen() * mapping does not match the sensor_id that was given, consider * it a fatal error UNLESS the rwioOpen() mapping gives an invalid * sensor ID, which probably means we are dealing with a temporary * sensor. */ if (rwIOS->sID != sensor_id) { if (rwIOS->sID == SK_INVALID_SENSOR) { rwIOS->sID = sensor_id; } else { WARNINGMSG(("Sensor ID from file name (%u)" " != specified sensor id (%u)"), rwIOS->sID, sensor_id); rwioDestroy(&rwIOS); goto END; } } END: if (errnum == ENOLCK) { INFOMSG("Unable to get write lock; consider using the --%s switch", appOptions[OPT_NO_FILE_LOCKING].name); } return rwIOS; } /* * ok = writeRecord(&rwrec); * * Given the time, sensor, and flowtype values as set in the SiLK * flow record 'rwrec', pack the record into the correct file using * the appropriate file output format. Return 0 on success; return * -1 to indicate a problem with the record, or -2 to indicate a * problem with the file. */ static int writeRecord(const rwRec *rwrec) { uint8_t flowtype; uint32_t time_stamp; rwIOStruct_t *rwIOS; int rv = 0; /* Only one thread accesses the file queue at a time */ pthread_mutex_lock(&cache_mutex); /* The flowtype (class/type) says where the flow was collected and * where to write the flow. */ flowtype = rwrec->flow_type; /* Determine the hour this data is associated with. This is a UTC * value expressed in seconds, rounded (down) to the hour. */ time_stamp = rwrec->sTime - rwrec->sTime % 3600; /* Get the stream to write the record to: first see if we have an * already open stream in the cache */ rwIOS = skStreamCacheLookup(g_streamCache,flowtype,time_stamp,rwrec->sID); /* When the file is not in the cache, bring it in. Opens or * creates file as necessary */ if (rwIOS == NULL) { rwIOS = openOutputStream(flowtype, time_stamp, rwrec->sID); if (rwIOS == NULL) { rv = -2; goto END; } /* Add the stream to the cache */ skStreamCacheAdd(g_streamCache, flowtype, rwIOS); } /* Write record */ rv = rwWrite(rwIOS, rwrec); if (rv != LIBRW_OK) { if (LIBRW_ERROR_IS_FATAL(rv)) { rwioPrintLastErr(rwIOS, rv, &ERRMSG); rv = -2; } else { rwioPrintLastErr(rwIOS, rv, &WARNINGMSG); rv = -1; } } END: pthread_mutex_unlock(&cache_mutex); return rv; } /* * manageProcessor(NULL); * * THREAD ENTRY POINT for the 'process_thread'. * * Gets a flow record from the flowsource object and passes the * record to writeRecord() for storage. Runs until the flow * source stream is exhausted (for file-based readers), until the * global variable 'reading' is set to 0, or until an error occurs. */ static void *manageProcessor(void *vp_fproc) { flow_proc_t *fproc = (flow_proc_t*)vp_fproc; reader_type_t *reader_type = fproc->reader_type; flowtypeID_t ftypes[MAX_SPLIT_FLOWTYPES]; sensorID_t sensorids[MAX_SPLIT_FLOWTYPES]; sigset_t sigs; rwRec rec; const probe_def_t *probe; int count; int rec_is_bad; int i; sigfillset(&sigs); pthread_sigmask(SIG_SETMASK, &sigs, NULL); while(1) { switch (reader_type->get_record_fn(&rec, &probe, fproc)) { case FP_FILE_BREAK: /* We've processed one input file; there may be more input * files. Tell the sender to send the packed files we've * created. This is a safe place to quit, so if we are no * longer reading, break out of the while(). Otherwise we * try again to get a record---we didn't get a record this * time. */ sendFiles(); if (!reading) { goto END; } continue; case FP_END_STREAM: /* We've processed all the input; there is no more. Tell * the sender to send the packed files we've created. * Begin the shutdown process by disabling 'reading' and * setting shuttingDown. */ sendFiles(); reading = 0; shuttingDown = 1; goto END; case FP_GET_ERROR: /* An error occurred and we did not get a record. Break * out of the while() if we are no longer reading, * otherwise try again to get a record. */ if (!reading) { goto END; } continue; case FP_BREAK_POINT: /* We got a record, and this is a safe place to quit if we * are no longer reading. If not shutting down, process * the record. */ if (!reading) { goto END; } /* FALLTHROUGH */ case FP_RECORD: /* We got a record and we may NOT stop processing. * Process the record. */ ++fproc->rec_count_total; /* Get the record's sensor(s) and flow_type(s) */ count = skProbeDetermineFlowtype(probe, &rec, ftypes, sensorids); assert(count >= -1); assert(count < MAX_SPLIT_FLOWTYPES); if (count == -1) { NOTICEMSG(("Cannot determine flowtype of record from" " probe %s: input %d; output %d"), skProbeGetUniqueName(probe), rec.input, rec.output); ++fproc->rec_count_bad; continue; } /* have we logged this record as bad? */ rec_is_bad = 0; /* Store the record in each flowtype/sensor file. */ for (i = 0; i < count; ++i) { rec.flow_type = ftypes[i]; rec.sID = sensorids[i]; switch (writeRecord(&rec)) { case 0: break; case -1: if (0 == rec_is_bad) { /* Count bad records, but only once */ ++fproc->rec_count_bad; rec_is_bad = 1; } break; case -2: default: /* error writing to disk--exit this thread */ ERRMSG(("Stopping the processing of flows from" " probe '%s' due to file system errors."), skProbeGetUniqueName(probe)); fproc->reader_type->stop_fn(fproc); goto END; } } break; } /*switch*/ } /*while*/ END: /* thread is ending, decrement the count */ pthread_mutex_lock(&fproc_thread_count_mutex); --fproc_thread_count; pthread_mutex_unlock(&fproc_thread_count_mutex); /* tell the main thread to check the thread count */ pthread_kill(main_thread, SIGUSR2); return NULL; } /* * status = startProcessor(); * * Create the stream cache, spawn the timing_thread, start the * reader_type, and finally spawn the process_thread to process * records. * * Return 1 if something fails, or 0 for success. */ static int startProcessor(void) { skTimerRepeat_t (*timer_func)(void*) = &flushFiles; flow_proc_t *fproc; size_t i; /* Create a cache of streams (file handles) so we don't have to * incur the expense of reopening files */ INFOMSG("Creating stream cache"); g_streamCache = skStreamCacheCreate(SK_MAX_NUM_FLOWTYPES, STREAM_CACHE_DEPTH); if (NULL == g_streamCache) { ERRMSG("Unable to create stream cache."); return 1; } reading = 1; /* Start each flow_processor and create a thread to manage it */ for (i = 0; i < num_flow_processors; ++i) { fproc = &flow_processors[i]; if (fproc->reader_type->start_fn(fproc) != 0) { ERRMSG("Unable to start the flow reader."); reading = 0; return 1; } pthread_mutex_lock(&fproc_thread_count_mutex); ++fproc_thread_count; pthread_mutex_unlock(&fproc_thread_count_mutex); if (pthread_create(&fproc->thread, NULL, manageProcessor, fproc)) { ERRMSG("Unable to create the manageProcessor thread."); reading = 0; return 1; } } if (output_mode == OUTPUT_SENDING) { if (input_mode == INPUT_STREAM) { timer_func = &timedSendFiles; } else { /* disable timer */ timer_func = NULL; } } /* Start timer */ if (timer_func) { if (skTimerCreate(&timing_thread, flush_timeout, timer_func, NULL) == -1) { ERRMSG("Unable to start flush timer."); return 1; } } return 0; } static void stopProcessor(void) { flow_proc_t *fproc; size_t i; if (reading) { INFOMSG("Stopping processors"); reading = 0; /* Give the threads a chance to quit on their own---by their * checking the 'reading' variable. */ sleep(2); if (timing_thread != NULL) { skTimerDestroy(timing_thread); } /* stop each flow processor and join its thread */ INFOMSG("Waiting for record handlers."); for (i = 0; i < num_flow_processors; ++i) { fproc = &flow_processors[i]; fproc->reader_type->stop_fn(fproc); pthread_join(fproc->thread, NULL); /* join */ } } } /* * sendFiles(); * * Closes all the open files in the stream cache and moves all the * files in the incremental-directory to the sender-directory. */ static void sendFiles(void) { int tmp_fd; char path[PATH_MAX]; char newpath[PATH_MAX]; struct dirent *entry; DIR *dir; int file_count = 0; int successfully_sent = 0; int rv; /* Return if sending mode not specified */ if (output_mode != OUTPUT_SENDING) { return; } INFOMSG("Preparing to move files for sending..."); /* Lock stream cache */ pthread_cleanup_push((cleanupHandler)pthread_mutex_unlock, (void *)&cache_mutex); pthread_mutex_lock(&cache_mutex); /* Close all the output files. */ INFOMSG("Closing incremental files..."); skStreamCacheCloseAll(g_streamCache); /* Open the directory holding the files to send and loop over the * files in the directory. */ dir = opendir(incremental_directory); if (NULL == dir) { CRITMSG("Fatal error: Unable to open incremental directory %s: %s", incremental_directory, strerror(errno)); exit(EXIT_FAILURE); } INFOMSG("Moving files to %s...", sender_directory); while ((entry = readdir(dir)) != NULL) { /* ignore dot-files */ if ('.' == entry->d_name[0]) { continue; } ++file_count; /* Copy each file to a unique name */ snprintf(path, sizeof(path), "%s/%s", incremental_directory, entry->d_name); snprintf(newpath, sizeof(newpath), "%s/%s.XXXXXX", sender_directory, entry->d_name); tmp_fd = mkstemp(newpath); if (tmp_fd == -1) { ERRMSG("couldn't create and open temporary file %s: %s", newpath, strerror(errno)); continue; } close(tmp_fd); rv = moveFile(path, newpath); if (rv != 0) { ERRMSG("couldn't move file '%s' to '%s' for sending: %s", path, newpath, strerror(rv)); continue; } /* Moving a file to the sender_directory is considered a * successful send, even when no sender is running. */ ++successfully_sent; INFOMSG("%s", newpath); } closedir(dir); /* Print status message */ if (file_count == 0) { INFOMSG("No files to send."); } else { INFOMSG("Successfully moved %d/%d file%s.", successfully_sent, file_count, ((file_count == 1) ? "" : "s")); } pthread_cleanup_pop(1); } /* * removeFilesFromDir(dir_name); * * Remove all the files from dir_name. Return the number of * directory entries that COULD NOT be removed, or -1 if function * is unable to open dir_name; i.e. 0 == fully successful, >0 == * unusual condition; <0 == error. */ static int removeFilesFromDir(const char *dir_name) { char path[PATH_MAX]; struct dirent *entry; DIR *dir; int count = 0; dir = opendir(dir_name); if (dir == NULL) { return -1; } while ((entry = readdir(dir))) { if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { continue; } snprintf(path, sizeof(path), "%s/%s", dir_name, entry->d_name); if (unlink(path) == -1) { skAppPrintErr("couldn't remove the file '%s': %s", path, strerror(errno)); ++count; } } closedir(dir); return count; } /* * status = archiveFile(filename); * * Moves 'filename' into the directory tree rooted at * 'archive_directory'. Files are stored in a subdirectory that is * based on the current time. * * Returns 0 on success; -1 on error; 1 if the archive_directory is * NULL. */ int archiveFile(const char *fn) { const char *c; char *s; char path[PATH_MAX]; time_t curtime; struct tm ctm; if ( !archive_directory) { return 1; } /* basename */ if ((c = strrchr(fn, '/')) == NULL) { c = fn; } else { c++; } /* create archive path based on current UTC time */ curtime = time(NULL); gmtime_r(&curtime, &ctm); snprintf(path, sizeof(path), "%s/%04d/%02d/%02d/%02d/%s", archive_directory, ctm.tm_year + 1900, ctm.tm_mon + 1, ctm.tm_mday, ctm.tm_hour, c); /* make the directory */ s = strrchr(path, '/'); *s = '\0'; mkDirPath(path); *s = '/'; /* move file */ if (rename(fn, path) == -1) { ERRMSG("Couldn't move %s to archive: %s", fn, strerror(errno)); return -1; } return 0; } int main(int argc, char **argv) { appSetup(argc, argv); /* never returns on error */ /* start the logger and become a daemon */ #ifdef DEBUGGING skdaemonDontFork(); #endif if (skdaemonize(&shuttingDown, &appTeardown) == -1) { exit(EXIT_FAILURE); } /* Choose which processing thread to run based on user choice */ if (startProcessor() != 0) { PRINT_AND_LOG(("unable to start flow processor")); exit(EXIT_FAILURE); } /* Any skAppPrintErr() messages should go to the log */ skAppSetFuncPrintErr(&ERRMSG_v); /* We now run forever, excepting signals, until the shuttingDown * flag is set or until all flow-processor threads exit. */ while (!shuttingDown && fproc_thread_count > 0) { pause(); } /* done */ appTeardown(); return 0; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */