/* ** Copyright (C) 2004-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** Functions to create and read from a UDP socket. ** */ #include "silk.h" RCSIDENT("$SiLK: udpsource.c 7399 2007-06-06 14:48:53Z mthomas $"); #include "circbuf.h" #include "udpsource.h" #include "v5pdu.h" #include "skvector.h" #ifdef HAVE_ZLIB_H #include #endif #ifdef TEST_PRINTF_FORMATS # define IF_LOGFN(f) printf #else # define IF_LOGFN(f) if (! f) { } else f #endif #define LOG IF_LOGFN(source->logfn) typedef struct udp_data_store_t { in_addr_t address; circBuf_t data_buffer; void *tmp_buffer; } udp_data_store_t; typedef struct udp_source_struct_t { sk_msg_fn_t logfn; sk_vector_t *store; uint8_t *file_buffer; pthread_t thread; pthread_mutex_t mutex; pthread_cond_t cond; size_t data_size; uint32_t bufsize; #ifdef HAVE_ZLIB_H gzFile udpfile; #endif int fd; int read_pipe; int write_pipe; uint8_t ref; unsigned file : 1; unsigned destroyed : 1; } udp_source_struct_t; static void *udp_reader(void *vsource) { udpSource_t source = (udpSource_t)vsource; sigset_t sigs; int maxfd; struct sockaddr_in addr; struct sockaddr *addrp; uint8_t i; void *data; udp_data_store_t *store; assert(source != NULL); /* Lock for initialization */ pthread_mutex_lock(&source->mutex); /* Don't handle signals */ sigfillset(&sigs); pthread_sigmask(SIG_SETMASK, &sigs, NULL); /* Disable cancelling */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); /* Determine maximum file descriptor for future select */ maxfd = ((source->fd > source->read_pipe) ? source->fd : source->read_pipe) + 1; data = (void *)malloc(source->data_size); assert(data != NULL); addrp = (struct sockaddr *)&addr; for (i = 0; i < skVectorGetCount(source->store); i++) { skVectorGetValue(&store, source->store, i); store->tmp_buffer = (void *)circBufNextHead(store->data_buffer); assert(store->tmp_buffer != NULL); if (store->address == INADDR_ANY) { addrp = NULL; break; } } /* Signal completion of initialization */ pthread_cond_signal(&source->cond); pthread_mutex_unlock(&source->mutex); /* Main loop */ while (!source->destroyed) { fd_set readset; int rv; socklen_t len; /* Set up select data (must be redone every loop) */ FD_ZERO(&readset); FD_SET(source->fd, &readset); FD_SET(source->read_pipe, &readset); /* Wait for data */ rv = select(maxfd, &readset, NULL, NULL, NULL); if (rv == -1) { if (errno == EINTR) { /* Interrupted by a signal, try again. */ continue; } /* Error */ LOG("Select error (%d) [%s]", errno, strerror(errno)); break; } /* See if we were requested to stop */ if (FD_ISSET(source->read_pipe, &readset)) { break; } /* We must have gotten real data */ assert(FD_ISSET(source->fd, &readset)); /* Read the data */ len = sizeof(addr); rv = recvfrom(source->fd, data, source->data_size, 0, addrp, &len); /* Check for error or recv from wrong address */ if (rv == -1) { if (errno == EINTR) { /* Interrupted by a signal, try again. */ continue; } /* Error */ LOG("recvfrom error (%d) [%s]", errno, strerror(errno)); break; } if (addrp) { int found = 0; int count; count = skVectorGetCount(source->store); for (i = 0; i < count; i++) { skVectorGetValue(&store, source->store, i); if (store->address == ntohl(addr.sin_addr.s_addr)) { found = 1; break; } } if (found == 0) { continue; } } else { i = 0; } /* Acquire the next location */ skVectorGetValue(&store, source->store, i); memcpy(store->tmp_buffer, data, source->data_size); store->tmp_buffer = (void *)circBufNextHead(store->data_buffer); if (store->tmp_buffer == NULL) { LOG("Non-existant data buffer,"); break; } } /* while */ free(data); pthread_mutex_lock(&source->mutex); /* Wait to be explicitly destroyed */ while (!source->destroyed) { pthread_cond_wait(&source->cond, &source->mutex); } /* Signal that we are finished */ pthread_cond_signal(&source->cond); pthread_mutex_unlock(&source->mutex); return NULL; } udpSource_t udpSourceCreate( int fd, in_addr_t address, uint32_t itemsize, uint32_t bufsize, sk_msg_fn_t logfn) { udpSource_t source; int pipefd[2]; int flags; udp_data_store_t *store; int rv; /* Create thread structure */ source = (udpSource_t)malloc(sizeof(udp_source_struct_t)); if (source == NULL) { close(fd); return NULL; } store = (udp_data_store_t *)malloc(sizeof(udp_data_store_t)); if (store == NULL) { close(fd); free(source); return NULL; } /* Fill the data structure */ source->file = 0; /* Not a file */ source->fd = fd; store->data_buffer = circBufCreate(itemsize, bufsize); if (store->data_buffer == NULL) { close(fd); free(source); free(store); return NULL; } if (pipe(pipefd) == -1) { close(fd); circBufDestroy(store->data_buffer); free(source); free(store); return NULL; } /* Make write pipe non-blocking */ flags = fcntl(pipefd[1], F_GETFL, 0); fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK); store->address = address; source->store = skVectorNew(sizeof(udp_data_store_t *)); if (source->store == NULL) { close(fd); close(pipefd[0]); close(pipefd[1]); circBufDestroy(store->data_buffer); free(source); free(store); return NULL; } rv = skVectorAppendValue(source->store, &store); if (rv == -1) { close(fd); close(pipefd[0]); close(pipefd[1]); circBufDestroy(store->data_buffer); skVectorDestroy(source->store); free(store); free(source); return NULL; } source->read_pipe = pipefd[0]; source->write_pipe = pipefd[1]; source->destroyed = 0; source->data_size = itemsize; source->bufsize = bufsize; source->ref = 1; source->logfn = logfn; pthread_mutex_init(&source->mutex, NULL); pthread_cond_init(&source->cond, NULL); /* Start the thread */ pthread_mutex_lock(&source->mutex); if (pthread_create(&source->thread, NULL, udp_reader, (void *)source) != 0) { close(fd); close(pipefd[0]); close(pipefd[1]); circBufDestroy(store->data_buffer); pthread_mutex_unlock(&source->mutex); pthread_mutex_destroy(&source->mutex); pthread_cond_destroy(&source->cond); skVectorDestroy(source->store); free(source); free(store); return NULL; } /* Wait for the thread to finish initializing before returning. */ pthread_cond_wait(&source->cond, &source->mutex); pthread_mutex_unlock(&source->mutex); return source; } int udpSourceAddAddress(udpSource_t source, in_addr_t address) { udp_data_store_t *store; int retval; int rv; if (source == NULL || source->file != 0 || address == INADDR_ANY) { return -1; } store = (udp_data_store_t *)malloc(sizeof(udp_data_store_t)); if (store == NULL) { return -1; } store->data_buffer = circBufCreate(source->data_size, source->bufsize); if (store->data_buffer == NULL) { free(store); return -1; } store->address = address; pthread_mutex_lock(&source->mutex); rv = skVectorAppendValue(source->store, &store); if (rv == -1) { free(store); return -1; } retval = skVectorGetCount(source->store) - 1; pthread_mutex_unlock(&source->mutex); return retval; } #ifdef HAVE_ZLIB_H udpSource_t udpFileSourceCreate( const char *path, uint32_t itemsize, sk_msg_fn_t logfn) { udpSource_t source; /* Create thread structure */ source = (udpSource_t)malloc(sizeof(udp_source_struct_t)); if (source == NULL) { return NULL; } source->file = 1; /* Source is a file */ source->destroyed = 0; source->data_size = itemsize; source->udpfile = gzopen(path, "r"); if (source->udpfile == NULL) { free(source); return NULL; } source->file_buffer = (uint8_t *)malloc(source->data_size); if (source->file_buffer == NULL) { gzclose(source->udpfile); free(source); return NULL; } source->logfn = logfn; pthread_mutex_init(&source->mutex, NULL); return source; } #endif /* HAVE_ZLIB_H */ void udpSourceDestroy(udpSource_t source) { uint8_t i; udp_data_store_t *store; if (!source->file) { if (--source->ref > 0) { return; } for (i = 0; i < skVectorGetCount(source->store); i++) { skVectorGetValue(&store, source->store, i); /* Break out of waits for buffer space */ circBufDestroy(store->data_buffer); /* Calling the above before acquiring the lock makes sure calls to udpNext docomplete, thus relenquising its hold on the lock. */ } } pthread_mutex_lock(&source->mutex); /* Signal that we are destroyed */ source->destroyed = 1; if (!source->file) { /* Break out of any blocking io */ write(source->write_pipe, "", 1); /* Wait for thread to end */ pthread_cond_wait(&source->cond, &source->mutex); /* Close all handles */ close(source->fd); close(source->write_pipe); close(source->read_pipe); /* Unlock and destroy mutex */ for (i = 0; i < skVectorGetCount(source->store); i++) { skVectorGetValue(&store, source->store, i); free(store); } skVectorDestroy(source->store); } else { #ifdef HAVE_ZLIB_H /* Close the file */ gzclose(source->udpfile); free(source->file_buffer); #else assert(0); abort(); #endif /* HAVE_ZLIB_H */ } pthread_mutex_unlock(&source->mutex); pthread_mutex_destroy(&source->mutex); if (!source->file) { pthread_cond_destroy(&source->cond); } /* Destroy source */ free(source); } uint8_t *udpNextByIndex(udpSource_t source, int index) { uint8_t *data; assert(source); pthread_mutex_lock(&source->mutex); if (source->file) { #ifdef HAVE_ZLIB_H int size = gzread(source->udpfile, source->file_buffer, source->data_size); if (size <= 0 || (uint32_t)size < source->data_size) { data = NULL; } else { data = source->file_buffer; } #else assert(0); abort(); #endif /* HAVE_ZLIB_H */ } else { udp_data_store_t *store; int rv; rv = skVectorGetValue(&store, source->store, index); if (rv == -1) { data = NULL; } else { data = circBufNextTail(store->data_buffer); } } pthread_mutex_unlock(&source->mutex); return data; } uint8_t *udpNext(udpSource_t source) { return udpNextByIndex(source, 0); } void udpSourceIncRef(udpSource_t source) { source->ref++; } void udpSourceResetStop(udpSource_t source, uint8_t close_sock) { udp_data_store_t *store; uint8_t i; if (!source->file) { for (i = 0; i < skVectorGetCount(source->store); i++) { skVectorGetValue(&store, source->store, i); /* Break out of waits for buffer space */ circBufDestroy(store->data_buffer); /* Calling the above before acquiring the lock makes sure calls to udpNext docomplete, thus relenquising its hold on the lock. */ } } pthread_mutex_lock(&source->mutex); /* Signal that we are destroyed */ if (!source->file) { /* Break out of any blocking io */ write(source->write_pipe, "", 1); /* Wait for thread to end */ pthread_cond_wait(&source->cond, &source->mutex); /* Close control handles */ close(source->write_pipe); close(source->read_pipe); if (close_sock) { close(source->fd); } } else { /* What should we do for files? Currently nothing. */ } } int udpSourceResetStart(udpSource_t source, int *fd) { udp_data_store_t *store; int pipefd[2]; uint8_t i; /* Now, recreate the parts we destoyed. */ if (!source->file) { for (i = 0; i < skVectorGetCount(source->store); i++) { skVectorGetValue(&store, source->store, i); store->data_buffer = circBufCreate(source->data_size, source->bufsize); } } if (pipe(pipefd) == -1) { return -1; } source->read_pipe = pipefd[0]; source->write_pipe = pipefd[1]; if (fd) { source->fd = *fd; } /* Restart the thread. */ if (pthread_create(&source->thread, NULL, udp_reader, (void *)source) != 0) { return -1; } /* Wait for the thread to finish initializing before returning. */ pthread_cond_wait(&source->cond, &source->mutex); pthread_mutex_unlock(&source->mutex); return 0; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */