/* ** Copyright (C) 2006-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** Interface to pull a single flow from IPFIX ** */ #include "silk.h" RCSIDENT("$SiLK: ipfixsource.c 7475 2007-06-11 18:28:37Z mwd $"); #include "utils.h" #include "circbuf.h" #include "ipfixsource.h" #include "skipfix.h" #include "convert-flowcap.h" #ifdef TEST_PRINTF_FORMATS # define IF_LOGFN(f) printf #else # define IF_LOGFN(f) if (! f) { } else f #endif #define LOG IF_LOGFN(source->logfn) struct ipfix_source_t { flowcap_stats_t statistics; pthread_mutex_t stats_mutex; in_addr_t listen_addr; int port; fbListener_t *listener; fbConnSpec_t *connspec; pthread_t thread; uint32_t bufsize; circBuf_t data_buffer; void *tmp_buffer; sk_msg_fn_t logfn; pthread_mutex_t mutex; pthread_cond_t cond; unsigned destroyed : 1; }; /* Mutex around calls to skiCreateListener. */ static pthread_mutex_t create_listener_mutex = PTHREAD_MUTEX_INITIALIZER; #if 0 static gboolean fixbufConnect( fbListener_t *UNUSED(listener), void **UNUSED(ctx), int UNUSED(fd), struct sockaddr *peer, size_t peerlen, GError **UNUSED(err)) { } #endif static void *ipfix_reader(void *vsource) { ipfixSource_t source = (ipfixSource_t)vsource; GError *err = NULL; fBuf_t *ipfix_buf = NULL; int circbuf_advanced = 0; pthread_mutex_lock(&source->mutex); pthread_cond_signal(&source->cond); pthread_mutex_unlock(&source->mutex); while (!source->destroyed) { ipfix_buf = fbListenerWait(source->listener, &err); if (!ipfix_buf) { if (err && err->domain == FB_ERROR_DOMAIN && err->code == FB_ERROR_NLREAD) { g_clear_error(&err); continue; } if (err) { LOG("fbListenerWait: %s", err->message); } g_clear_error(&err); break; } while (!source->destroyed) { rwRec reverse; gboolean rb; /* Advance the circular buffer */ if (!circbuf_advanced) { source->tmp_buffer = (void *)circBufNextHead(source->data_buffer); circbuf_advanced = 1; } if (source->tmp_buffer == NULL) { assert(source->destroyed); break; } /* Insert the next record */ rb = skiRwNextRecord(ipfix_buf, (rwRec *)source->tmp_buffer, &reverse, &err); if (rb == 0) { if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD) || g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_EOM)) { g_clear_error(&err); break; } fBufFree(ipfix_buf); ipfix_buf = NULL; LOG("fbListenerWait: %s", err->message); g_clear_error(&err); break; } circbuf_advanced = 0; /* Deal with the reverse flow */ if (reverse.pkts != 0) { source->tmp_buffer = (void *)circBufNextHead(source->data_buffer); if (source->tmp_buffer == NULL) { assert(source->destroyed); break; } memcpy(source->tmp_buffer, &reverse, sizeof(reverse)); } } } if (ipfix_buf) { fBufFree(ipfix_buf); } pthread_mutex_lock(&source->mutex); while (!source->destroyed) { pthread_cond_wait(&source->cond, &source->mutex); } pthread_cond_signal(&source->cond); fbListenerFree(source->listener); source->listener = NULL; pthread_mutex_unlock(&source->mutex); return NULL; } static void free_conspec(fbConnSpec_t *conspec) { if (conspec->host) { free(conspec->host); } if (conspec->svc) { free(conspec->svc); } free(conspec); } ipfixSource_t ipfixSourceCreate( int port, in_addr_t listen_on_address, probe_proto_t protocol, uint32_t max_flows, sk_msg_fn_t logfn) { ipfixSource_t source; GError *err = NULL; char port_string[7]; char ipname[SK_NUM2DOT_STRLEN]; int rv; source = (ipfixSource_t)calloc(1, sizeof(*source)); if (source == NULL) { return NULL; } source->logfn = logfn; source->listen_addr = listen_on_address; source->port = port; source->connspec = (fbConnSpec_t *)calloc(1, sizeof(*source->connspec)); if (source->connspec == NULL) { free(source); return NULL; } switch (protocol) { case PROBE_PROTO_SCTP: source->connspec->transport = FB_SCTP; break; case PROBE_PROTO_TCP: source->connspec->transport = FB_TCP; break; case PROBE_PROTO_UDP: source->connspec->transport = FB_UDP; break; default: assert(0); abort(); } if (source->listen_addr != INADDR_ANY) { source->connspec->host = strdup(num2dot_r(source->listen_addr, ipname)); if (source->connspec->host == NULL) { free_conspec(source->connspec); free(source); return NULL; } } rv = snprintf(port_string, sizeof(port_string), "%i", source->port); assert(rv < (int)sizeof(port_string)); source->connspec->svc = strdup(port_string); if (source->connspec->svc == NULL) { free_conspec(source->connspec); free(source); return NULL; } pthread_mutex_lock(&create_listener_mutex); source->listener = skiCreateListener(source->connspec, NULL, NULL, &err); pthread_mutex_unlock(&create_listener_mutex); if (source->listener == NULL) { if (err) { LOG("skiCreateListener: %s", err->message); } g_clear_error(&err); free_conspec(source->connspec); free(source); return NULL; } pthread_mutex_init(&source->stats_mutex, NULL); pthread_mutex_init(&source->mutex, NULL); pthread_cond_init(&source->cond, NULL); source->data_buffer = circBufCreate(sizeof(rwRec), max_flows); if (source->data_buffer == NULL) { fbListenerFree(source->listener); free_conspec(source->connspec); free(source); return NULL; } source->bufsize = max_flows; pthread_mutex_lock(&source->mutex); rv = pthread_create(&source->thread, NULL, ipfix_reader, (void *)source); if (rv != 0) { circBufDestroy(source->data_buffer); free_conspec(source->connspec); fbListenerFree(source->listener); free(source); return NULL; } pthread_cond_wait(&source->cond, &source->mutex); if (source->destroyed) { circBufDestroy(source->data_buffer); pthread_mutex_unlock(&source->mutex); pthread_join(source->thread, NULL); fbListenerFree(source->listener); free_conspec(source->connspec); free(source); return NULL; } pthread_mutex_unlock(&source->mutex); return source; } void ipfixSourceDestroy( ipfixSource_t source) { assert(source); pthread_mutex_lock(&source->mutex); source->destroyed = 1; if (source->listener) { fbListenerInterrupt(source->listener); } circBufDestroy(source->data_buffer); pthread_cond_broadcast(&source->cond); pthread_cond_wait(&source->cond, &source->mutex); pthread_join(source->thread, NULL); pthread_mutex_unlock(&source->mutex); pthread_mutex_destroy(&source->mutex); pthread_cond_destroy(&source->cond); free_conspec(source->connspec); free(source); } int ipfixSourceGetGeneric( ipfixSource_t source, rwRec *rwrec) { rwRec *rec; assert(source); assert(rwrec); rec = (rwRec *)circBufNextTail(source->data_buffer); if (rec == NULL) { return -1; } memcpy(rwrec, rec, sizeof(*rwrec)); return 0; } int ipfixSourceGetFlowcap( ipfixSource_t source, flowcapRec_t *fc_rec, int fc_version) { rwRec *rec; assert(source); assert(fc_rec); rec = (rwRec *)circBufNextTail(source->data_buffer); if (rec == NULL) { return -1; } return genericToFlowcap(rec, fc_rec, fc_version); } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */