/* ** Copyright (C) 2004-2006 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ #include "silk.h" RCSIDENT("$SiLK: stream-cache.c 4301 2006-07-19 21:50:10Z mthomas $"); #include "utils.h" #include "sklog.h" #include "rwpack.h" #include "stream-cache.h" #include "silk_site.h" /* MACROS and DATA TYPES */ #define MAX_STREAM_TYPES 64 struct _streamCache { /* Pointers to the streams the stream cache manages; indexed by * the type (FT_RWROUTED, FT_RWSPLIT, etc) of stream */ rwIOStruct **streams[MAX_STREAM_TYPES]; /* The record count for each stream. */ uint64_t *s_reccount[MAX_STREAM_TYPES]; /* Number of each type of stream currently in cache */ int counts[MAX_STREAM_TYPES]; /* Max number of any one type that is allowed */ int size; /* Max number of types allowed. Must be < MAX_STREAM_TYPES */ int type_count; }; /* * new_count = skStreamLogRecordCount(rwios, old_count); * * Write a message to the log giving the name of the file that is * bound to 'rwios' and number of records written to that file * since it was opened or last flushed. * * The value of 's_reccount' for this stream should be passed in as * the 'old_count' value; the value of the new count will be * returned. */ static uint64_t skStreamLogRecordCount( const rwIOStruct *rwios, uint64_t old_count) { uint64_t new_count = rwGetRecordCount(rwios); if (old_count == new_count) { return old_count; } assert(old_count < new_count); logMsg(("%s: %" PRId64 " recs"), rwGetFileName(rwios), (new_count - old_count)); return new_count; } /* * skStreamCloseFile(rwios, old_count); * * Close the stream--unbind it from the rwios. Doesn't delete the * actual object. May be rebound. */ static void skStreamCloseFile( rwIOStruct *rwios, uint64_t old_count) { int rv; assert(rwios); (void)skStreamLogRecordCount(rwios, old_count); rv = rwioClose(rwios); if (rv) { rwioPrintLastErr(rwios, rv, &logMsg); } rwioDestroy(&rwios); } /* * cache = skStreamCacheCreate(type_count, size); * * Creates a streamcache. A stream cache is actually an array of * caches, indexed by the stream type. Returns NULL on error. */ streamCache_t *skStreamCacheCreate(int type_count, int size) { streamCache_t *cache = NULL; int i; /* verify input */ if (type_count < 1 || type_count > MAX_STREAM_TYPES) { logMsg(("Illegal number of types %d for stream cache; " "use 1 <= count <= %d"), type_count, MAX_STREAM_TYPES); return NULL; } if (size < 1) { logMsg("Illegal size %d for stream cache; use size >= 1", size); return NULL; } cache = (streamCache_t*)calloc(1, sizeof(streamCache_t)); if (cache == NULL) { logMsg("malloc() failed creating stream cache"); return NULL; } cache->type_count = type_count; cache->size = size; cache->counts[0] = 0; /* Allocate one big block for all of the streams, then make each * streams[i] point into that block. */ cache->streams[0] = (rwIOStruct**)calloc(type_count*size, sizeof(rwIOStruct*)); if (NULL == cache->streams[0]) { logMsg("calloc() failed creating stream cache streams"); free(cache); return NULL; } for (i = 1; i < type_count; i++) { cache->counts[i] = 0; cache->streams[i] = cache->streams[0]+(i*size); } /* Allocate one big block for all of the record counts, then make * each s_reccount[i] point into that block. */ cache->s_reccount[0] = (uint64_t*)calloc(type_count*size,sizeof(uint64_t)); if (NULL == cache->s_reccount[0]) { logMsg("calloc() failed creating stream cache record counts"); free(cache->streams[0]); free(cache); return NULL; } for (i = 1; i < type_count; i++) { cache->s_reccount[i] = cache->s_reccount[0]+(i*size); } return cache; } /* * skStreamCacheDestroy(cache); * * Close all streams and free all memory associated with the * streams. Free the memory associated with the cache. The cache * pointer is invalid after a call to these function. */ void skStreamCacheDestroy(streamCache_t *cache) { int i,j; if (cache == NULL) { logMsg("Warning: tried to destroy unitialized stream cache."); return; } /* Free streams and the arrays */ for (i = 0; i < cache->type_count; i++) { /* Free all the streams */ for (j = 0; j < cache->counts[i]; j++) { skStreamCloseFile(cache->streams[i][j], cache->s_reccount[i][j]); } cache->counts[i] = 0; } for (i = 1; i < cache->type_count; i++) { cache->streams[i] = NULL; cache->s_reccount[i] = NULL; } free(cache->streams[0]); cache->streams[0] = NULL; free(cache->s_reccount[0]); cache->s_reccount[0] = NULL; /* Free the structure itself */ free(cache); } /* * skStreamCacheFlush(cache); * * Flush the all the streams in the cache, and log the number of * records processed since the most recent flush or open. */ void skStreamCacheFlush(streamCache_t *cache) { int i, j; int rv; assert(cache); for (i = 0; i < cache->type_count; i++) { for (j = 0; j < cache->counts[i]; j++) { rwIOStruct *rwios = cache->streams[i][j]; rv = rwioFlush(rwios); if (rv) { rwioPrintLastErr(rwios, rv, &logMsg); } cache->s_reccount[i][j] = skStreamLogRecordCount(rwios, cache->s_reccount[i][j]); } } } /* * skStreamCacheClose(cache); * * Close all the the streams in the cache and remove from cache. * Log the number of records processed since the most recent flush * or open. */ void skStreamCacheCloseAll(streamCache_t *cache) { int i, j; assert(cache); for (i = 0; i < cache->type_count; i++) { for (j = 0; j < cache->counts[i]; j++) { skStreamCloseFile(cache->streams[i][j], cache->s_reccount[i][j]); } cache->counts[i] = 0; } } /* * skStreamCacheAdd(cache, type, stream); * * Note the cache will own stream memory and will free it when the * stream falls off the end of the cache, or when the cache is * destroyed. */ void skStreamCacheAdd( streamCache_t *cache, int stream_type, rwIOStruct *rwios) { rwIOStruct *last; assert(cache); assert(rwios); assert(0 <= stream_type && stream_type < cache->type_count); /* If the cache full, flush, close and free the last stream */ if (cache->size == cache->counts[stream_type]) { last = cache->streams[stream_type][cache->size-1]; skStreamCloseFile(last, cache->s_reccount[stream_type][cache->size-1]); cache->counts[stream_type]--; } /* Make room for the new stream */ memmove(&(cache->streams[stream_type][1]), &(cache->streams[stream_type][0]), (cache->size-1)*sizeof(rwIOStruct*)); memmove(&(cache->s_reccount[stream_type][1]), &(cache->s_reccount[stream_type][0]), (cache->size-1)*sizeof(uint64_t)); cache->streams[stream_type][0] = rwios; cache->s_reccount[stream_type][0] = rwGetRecordCount(rwios); cache->counts[stream_type]++; } /* * stream = skStreamCacheLookup(cache, type, time_stamp, sensor_id); * * Return the stream for the specified type, time, and sensor_id. * Return NULL if no stream for the specified type and time is * found. */ rwIOStruct *skStreamCacheLookup( streamCache_t *cache, int stream_type, uint32_t time_stamp, sensorID_t sensor_id) { int j; assert(cache); assert(0 <= stream_type && stream_type < cache->type_count); /* loop over all streams for this stream_type until we find one * where the timestamps and sensor_names match. If found, move it * to the front of the array of streams for this stream_type. */ for (j = 0; j < cache->counts[stream_type]; j++) { rwIOStruct *rwios = cache->streams[stream_type][j]; if ((rwGetFileSTime(rwios) == time_stamp) && (sensor_id == rwios->sID)) { /* Move to the head of the array */ if (j > 0) { uint64_t count = cache->s_reccount[stream_type][j]; /* Move everything to the right */ memmove(&(cache->streams[stream_type][1]), &(cache->streams[stream_type][0]), j*sizeof(rwIOStruct*)); memmove(&(cache->s_reccount[stream_type][1]), &(cache->s_reccount[stream_type][0]), j*sizeof(uint64_t)); cache->streams[stream_type][0] = rwios; cache->s_reccount[stream_type][0] = count; } return rwios; } } return NULL; }