/* ** Copyright (C) 2004-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** Interface to pull a single flow from a NetFlow v5 PDU ** */ #include "silk.h" RCSIDENT("$SiLK: pdusource.c 7800 2007-07-06 15:32:43Z mthomas $"); #include "utils.h" #include "v5pdu.h" #include "convert-cflowd.h" #include "pdusource.h" #include "udpsource.h" #include "libflowsource.h" #ifdef TEST_PRINTF_FORMATS # define IF_LOGFN(f) printf #else # define IF_LOGFN(f) if (! f) { } else f #endif typedef struct pdu_source_t { flowcap_stats_t statistics; pthread_mutex_t stats_mutex; udpSource_t source; int source_index; cflowdTimeInfo_t ti; sk_msg_fn_t logfn; uint8_t count; /* Number of recs left in pdu */ v5PDU *pdu; /* Current pdu */ uint32_t flowSeqNumbers[65536]; BITMAP_DECLARE( engineMasks, 65536); int sockbufsize; in_addr_t listen_addr; int port; uint8_t logopt; unsigned file : 1; /* File-based source */ } pdu_source_t; typedef struct pdu_source_pool_item_t { const probe_def_t *probe; udpSource_t source; int index; } pdu_source_pool_item_t; /* FUNCTION PROTOTYPES */ static udpSource_t pduUdpSourceCreate( int port, in_addr_t from_address, in_addr_t listen_address, uint32_t max_pkts, int sockbufsize, sk_msg_fn_t logfn); #ifdef HAVE_ZLIB_H static udpSource_t pduUdpFileSourceCreate( const char *path, sk_msg_fn_t logfn); #endif pduSource_t pduSourceCreate( int port, in_addr_t from_address, in_addr_t listen_address, uint32_t max_pkts, int sockbufsize, sk_msg_fn_t logfn) { udpSource_t udpsource; pduSource_t source; udpsource = pduUdpSourceCreate(port, from_address, listen_address, max_pkts, sockbufsize, logfn); if (udpsource == NULL) { return NULL; } source = (pduSource_t)calloc(1, sizeof(*source)); if (source == NULL) { udpSourceDestroy(udpsource); return NULL; } source->source = udpsource; source->logfn = logfn; source->listen_addr = listen_address; source->sockbufsize = sockbufsize; source->port = port; pthread_mutex_init(&source->stats_mutex, NULL); source->logopt = SOURCE_LOG_ALL; return source; } #ifdef HAVE_ZLIB_H pduSource_t pduFileSourceCreate( const char *path, sk_msg_fn_t logfn) { udpSource_t udpsource; pduSource_t source; udpsource = pduUdpFileSourceCreate(path, logfn); if (udpsource == NULL) { return NULL; } source = (pduSource_t)calloc(1, sizeof(*source)); if (source == NULL) { udpSourceDestroy(udpsource); return NULL; } source->file = 1; source->source = udpsource; source->logfn = logfn; return source; } #endif /* HAVE_ZLIB_H */ void pduSourceDestroy(pduSource_t source) { udpSourceDestroy(source->source); pthread_mutex_destroy(&source->stats_mutex); free(source); } static void handleBadPacket( pduSource_t source, v5PDU *UNUSED(pdu)) { pthread_mutex_lock(&source->stats_mutex); source->statistics.badPkts++; pthread_mutex_unlock(&source->stats_mutex); } static void handleBadRecord( pduSource_t source, v5PDU *UNUSED(pdu)) { pthread_mutex_lock(&source->stats_mutex); source->statistics.badRecs++; pthread_mutex_unlock(&source->stats_mutex); } static v5PDU *next_pdu(pduSource_t source) { #define LOG IF_LOGFN(source->logfn) assert (source != NULL); /* Infloop; exit by return only */ while (1) { v5PDU *pdu; uint16_t count; uint16_t engine; uint32_t flow_sequence; pdu = (v5PDU *)udpNextByIndex(source->source, source->source_index); if (pdu == NULL) { return NULL; } pthread_mutex_lock(&source->stats_mutex); source->statistics.procPkts++; pthread_mutex_unlock(&source->stats_mutex); if (ntohs(pdu->hdr.version) != 5) { /* reject packet */ LOG("PDU record was not marked as version 5."); handleBadPacket(source, pdu); continue; } count = ntohs(pdu->hdr.count); if (count > V5PDU_MAX_RECS) { /* reject packet */ LOG("PDU reported more than %d records. Rejecting.", V5PDU_MAX_RECS); handleBadPacket(source, pdu); continue; } if (count == 0) { /* reject packet */ LOG("PDU reported zero records. Rejecting."); handleBadPacket(source, pdu); continue; } /* handle seq numbers here */ flow_sequence = ntohl(pdu->hdr.flow_sequence); engine = (pdu->hdr.engine_type << 8) | pdu->hdr.engine_id; if (BITMAP_GETBIT(source->engineMasks, engine)) { pthread_mutex_lock(&source->stats_mutex); /* check hdr seqnum against expected seq num */ if (flow_sequence < source->flowSeqNumbers[engine]) { /* ** Out of sequence packet. Reduce missing flow count. ** However, do not change the expected seq num. */ source->statistics.missingRecs -= count; } else { /* Check if this is a later packet and handling missing recs */ if (flow_sequence > source->flowSeqNumbers[engine]) { /* Increase missing flow count */ source->statistics.missingRecs += flow_sequence - source->flowSeqNumbers[engine]; if (source->logopt & SOURCE_LOG_MISSING) { LOG(("Missing netflow records: " "%" PRId64 "/%" PRIu64 " == %7.4g%%"), source->statistics.missingRecs, (source->statistics.goodRecs + source->statistics.badRecs), ((float)source->statistics.missingRecs / (float)(source->statistics.goodRecs + source->statistics.missingRecs + source->statistics.badRecs)) * 100.0); } } /* Update the next expected seq */ source->flowSeqNumbers[engine] = flow_sequence + count; } pthread_mutex_unlock(&source->stats_mutex); } else { /* A new engine. Mark and record */ BITMAP_SETBIT(source->engineMasks, engine); source->flowSeqNumbers[engine] = flow_sequence + count; } cflowdTimeInfoSetup(&pdu->hdr, &source->ti); return pdu; } #undef LOG } static int pduSourceGetNextRec(pduSource_t source) { #define LOG IF_LOGFN(source->logfn) assert (source != NULL); /* Infloop; exit by return only */ while (1) { v5Record *v5RPtr; uint32_t SysUptime, dF, dL; /* If we need a pdu, get a new one, otherwise we are not finished with the last. */ if (source->count == 0) { source->pdu = next_pdu(source); if (source->pdu == NULL) { return -1; } source->count = ntohs(source->pdu->hdr.count); } /* Get next record, and decrement counter*/ v5RPtr = &source->pdu->data[ntohs(source->pdu->hdr.count) - source->count--]; /* Check for zero packets or bytes. No need for byteswapping when checking zero. */ if (v5RPtr->dPkts == 0 || v5RPtr->dOctets == 0) { if (source->logopt & SOURCE_LOG_BAD) { LOG("Netflow record has zero packets or bytes."); } handleBadRecord(source, source->pdu); } /* Check to see if more packets them bytes. */ if (ntohl(v5RPtr->dPkts) > ntohl(v5RPtr->dOctets)) { if (source->logopt & SOURCE_LOG_BAD) { LOG("Netflow record has more packets them bytes."); } handleBadRecord(source, source->pdu); } /* Check for correct ordering of the First, Last, and SysUptime values. The correct ordering is First, Last, SysUptime, possibly modified by wraparound. This is done by calculating the distances of First and Last from SysUptime in a leftwards direction on a modulo 2^32 number line. If the distance from the SysUptime to First is less than the distance from SysUptime to Last, then Last is happening before First, which is invalid. */ SysUptime = ntohl(source->pdu->hdr.SysUptime) + sysUptimeAdjustMsecs; dF = SysUptime - ntohl(v5RPtr->First); dL = SysUptime - ntohl(v5RPtr->Last); if (dF < dL) { if (source->logopt & SOURCE_LOG_BAD) { LOG("Netflow record has earlier end time than start time."); } handleBadRecord(source, source->pdu); } /* Check for bogosities in how the ICMP type/code are set. It should be in dest port, but sometimes it is backwards in src port. */ if (v5RPtr->prot == 1 && /* ICMP */ v5RPtr->dstport == 0) /* No byteswapping for check against 0 */ { uint32_t *ports = (uint32_t *)&v5RPtr->srcport; *ports = BSWAP32(*ports); /* This will swap src into dest, while byteswapping. */ } pthread_mutex_lock(&source->stats_mutex); source->statistics.goodRecs++; pthread_mutex_unlock(&source->stats_mutex); return 0; } #undef LOG } pdu_source_breakable_t pduSourceGetFlowcap( pduSource_t source, flowcapRec_t *fc_rec, int fc_version) { #define LOG IF_LOGFN(source->logfn) assert(FC_REC_SIZE(fc_version) != -1); if (pduSourceGetNextRec(source)) { LOG("PDU source has ended"); return PDUS_ABORT; } if (cflowdToFlowcap(&source->pdu->data[ntohs(source->pdu->hdr.count) - (source->count + 1)], fc_rec, fc_version, &source->ti)) { LOG("PDU conversion failure"); return PDUS_ABORT; } return PDUS_BREAKABLE; #undef LOG } int pduSourceGetGeneric( pduSource_t source, rwRec *rwrec) { if (pduSourceGetNextRec(source)) { return -1; } return cflowdToGeneric(&source->pdu->data[ntohs(source->pdu->hdr.count) - (source->count + 1)], rwrec, &source->ti); } static udpSource_t pduUdpSourceCreate( int port, in_addr_t from_address, in_addr_t listen_address, uint32_t max_pkts, int sockbufsize, sk_msg_fn_t logfn) { #define LOG IF_LOGFN(logfn) struct sockaddr_in addr; int sock; /* Get a socket */ sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock == -1) { LOG("Failed to allocate socket: %s", strerror(errno)); return NULL; } /* Make the socket receiver buffer size as close as we can to sockbufsize. */ bigsockbuf(sock, SO_RCVBUF, sockbufsize); /* Set up the address structure */ memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(listen_address); addr.sin_port = htons(port); /* Bind socket to port */ if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) { LOG("Failed to bind address: %s", strerror(errno)); close(sock); return NULL; } return udpSourceCreate(sock, from_address, V5PDU_LEN, max_pkts, logfn); #undef LOG } #ifdef HAVE_ZLIB_H static udpSource_t pduUdpFileSourceCreate( const char *path, sk_msg_fn_t logfn) { return udpFileSourceCreate(path, V5PDU_LEN, logfn); } #endif pduSourcePool_t pduSourcePoolCreate(void) { return skVectorNew(sizeof(pdu_source_pool_item_t)); } void pduSourcePoolDestroy(pduSourcePool_t pool) { skVectorDestroy(pool); } pduSource_t pduSourceCreateFromProbeDef( pduSourcePool_t pool, const probe_def_t *probe, uint32_t max_pkts, int sockbufsize, sk_msg_fn_t logfn) { uint8_t i; in_addr_t paddr; in_addr_t haddr; uint16_t pport; pduSource_t source; pdu_source_pool_item_t current; int rv; uint8_t flags; assert(pool); assert(probe); flags = skProbeGetLogFlags(probe); rv = skProbeGetListenAsHost(&paddr, &pport, probe); if (rv == -1) { return NULL; } skProbeGetAcceptFromHost(&haddr, probe); /* Loop through the udpSources to see if one matches the probe. */ for (i = 0; i < skVectorGetCount(pool); i++) { in_addr_t addr; uint16_t port; skVectorGetValue(¤t, pool, i); rv = skProbeGetListenAsHost(&addr, &port, current.probe); if (rv == -1) { continue; } if (port == pport && addr == paddr) { in_addr_t from_addr; skProbeGetAcceptFromHost(&from_addr, current.probe); if (from_addr == haddr) { /* Matches in all respects. Create a duplicate. */ source = (pduSource_t)calloc(1, sizeof(pdu_source_t)); if (source == NULL) { return NULL; } source->source = current.source; udpSourceIncRef(current.source); source->source_index = current.index; source->logfn = logfn; pduSourceSetLogopt(source, flags); return source; } if (from_addr == INADDR_ANY || haddr == INADDR_ANY) { /* Can't mix INADDR_ANY and specific ips */ return NULL; } /* Matches in all but from_addr. Add that addr. */ source = (pduSource_t)calloc(1, sizeof(pdu_source_t)); if (source == NULL) { return NULL; } source->source = current.source; udpSourceIncRef(current.source); source->source_index = udpSourceAddAddress(source->source, haddr); source->logfn = logfn; pduSourceSetLogopt(source, flags); return source; } } /* No match. Add new source. */ source = pduSourceCreate(pport, haddr, paddr, max_pkts, sockbufsize, logfn); if (source == NULL) { return NULL; } pduSourceSetLogopt(source, flags); current.source = source->source; current.index = source->source_index; current.probe = probe; skVectorAppendValue(pool, ¤t); return source; } /* Get statistics associated with a pdu source. */ void pduSourceGetStats(pduSource_t source, flowStats_t stats) { pthread_mutex_lock(&source->stats_mutex); *stats = source->statistics; pthread_mutex_unlock(&source->stats_mutex); } /* Clear out current statistics */ void pduSourceClearStats(pduSource_t source) { pthread_mutex_lock(&source->stats_mutex); memset(&source->statistics, 0, sizeof(source->statistics)); pthread_mutex_unlock(&source->stats_mutex); } /* Set logging options */ void pduSourceSetLogopt(pduSource_t source, uint8_t opt) { source->logopt = opt; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */