/* ** Copyright (C) 2003-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** A "plug-in" (currently compile-time only) for rwflowpack to ** process NetFlow v5 PDUs from the network. ** */ #include "silk.h" RCSIDENT("$SiLK: pdureader.c 6070 2007-01-22 18:03:49Z mthomas $"); #include "rwflowpack.h" #include "pdusource.h" /* MACROS and DATA TYPES */ #define READER_TYPE_NAME "PDU Reader" /* Number of PDUs, used to create buffer for reading */ #define PDU_BUFFER_SIZE 60000 /* We need big socket buffers. Let's try 8MB */ #define BIG_SOCKET_BUF (1024 * 1024 * 8) /* PRIVATE FUNCTIONS */ static int readerSetup( fp_daemon_mode_t *is_daemon, const sk_vector_t *probe_vec, reader_options_t *options); static void readerTeardown(void); static int readerStart(flow_proc_t *fproc); static int readerStop(flow_proc_t *fproc); static fp_get_record_result_t readerGetRecord( rwRec *out_rec, const probe_def_t **out_probe, flow_proc_t *fproc); static int readerWantProbe(probe_def_t *probe); /* PRIVATE VARIABLES */ /* The pool allows PDU sources to share ports. */ static pduSourcePool_t pool = NULL; /* Number of probes we expect to process */ static int probe_count = 0; /* FUNCTION DEFINITIONS */ /* * readerTeardown(); * * Destroy the PDU reader. * * Invoked by reader_type->teardown_fn(); */ static void readerTeardown(void) { if (pool) { pduSourcePoolDestroy(pool); pool = NULL; } } /* * ok = readerStart(flow_processor) * * Create a flowsource object that will read PDU records from a * Berkeley socket. * * Invoked by reader_type->start_fn(); */ static int readerStart(flow_proc_t *fproc) { pduSource_t pdu_src; in_addr_t bind_addr; uint16_t bind_port; /* if a pdu_src already exists, just return. */ if (fproc->flow_src != NULL) { return 0; } assert(probe_count > 0); if (pool == NULL) { pool = pduSourcePoolCreate(); if (pool == NULL) { logMsg("Unable to create source pool."); return 1; } } pdu_src = pduSourceCreateFromProbeDef(pool, fproc->probe, PDU_BUFFER_SIZE, BIG_SOCKET_BUF / probe_count, &logMsg); if (pdu_src) { /* success. return */ fproc->flow_src = pdu_src; return 0; } /* failed. print error */ if (0 != skProbeGetListenAsHost(&bind_addr, &bind_port, fproc->probe)) { PRINT_AND_LOG(("Probe not configured for listening to network")); } else { PRINT_AND_LOG(("Could not create PDU listener on %s:%d", num2dot(bind_addr), bind_port)); } return -1; } /* * readerStop(); * * Stop reading records. * * Invoked by reader_type->stop_fn(); */ static int readerStop(flow_proc_t *fproc) { pduSource_t pdu_src = (pduSource_t)fproc->flow_src; if (pdu_src) { pduSourceDestroy(pdu_src); fproc->flow_src = NULL; } return 0; } /* * readerGetRecord(&out_rwrec, &out_probe, fproc); * * Fill 'out_rwrec' with an rw generic record from the underlying * flowsource object connected to a socket. Fill 'out_probe' with * the probe where the flow was collected. * * Invoked by reader_type->get_record_fn(); */ static fp_get_record_result_t readerGetRecord( rwRec *out_rwrec, const probe_def_t **out_probe, flow_proc_t *fproc) { pduSource_t pdu_src = (pduSource_t)fproc->flow_src; if (-1 == pduSourceGetGeneric(pdu_src, out_rwrec)) { return FP_GET_ERROR; } *out_probe = fproc->probe; /* When reading from the network, any point is a valid stopping * point */ return FP_BREAK_POINT; } /* * pduReaderInitialize: * * Set initial state of this flow reader. * * Invoked by reader_type->initialize_fn(); */ int pduReaderInitialize(reader_type_t *reader_type) { /* Set my name */ reader_type->reader_name = READER_TYPE_NAME; /* Set function pointers */ reader_type->setup_fn = &readerSetup; reader_type->teardown_fn = &readerTeardown; reader_type->start_fn = &readerStart; reader_type->stop_fn = &readerStop; reader_type->get_record_fn = &readerGetRecord; reader_type->want_probe_fn = &readerWantProbe; return 0; } /* * yes_or_no = readerWantProbe(probe); * * Return a TRUE value this reader_type is able to process the data * from the 'probe'; return a FALSE value otherwise. */ static int readerWantProbe(probe_def_t *probe) { /* This is what we expect: a network based NetFlow listener */ if ((0 == skProbeGetListenAsHost(NULL, NULL, probe)) && (PROBE_ENUM_NETFLOW == skProbeGetType(probe))) { return 1; } return 0; } /* * status = readerSetup(&out_daemon_mode, probe_vector, options); * * The reader_type should do any setup it requires prior to * starting its flow_processor instances: Validate and/or store the * reader_type's 'options'. 'probe_vector' is the list of probes * that this reader_type said it would process * (readerWantProbe). 'out_daemon_mode' should be set according * to how this reader_type expects to run. * * Invoked by reader_type->setup_fn(); */ static int readerSetup( fp_daemon_mode_t *is_daemon, const sk_vector_t *probe_vec, reader_options_t UNUSED(*options)) { /* this function should only be called if we actually have probes * to process */ probe_count = skVectorGetCount(probe_vec); if (0 == probe_count) { return 1; } /* Is a daemon */ *is_daemon = FP_DAEMON_ON; return 0; } /* ** Local variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */