/* ** Copyright (C) 2006-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ #include "silk.h" RCSIDENT("$SiLK: rwscan.c 7600 2007-06-20 19:05:24Z tonyc $"); #include "rwscan.h" #include "rwscan_db.h" #include "rwscan_workqueue.h" /* EXTERNAL VARIABLE DEFINITIONS */ /* program options structure */ options_t options; /* structure for reporting totals for each run */ summary_metrics_t summary_metrics; /* input files */ iochecksInfoStruct_t *ioISP = NULL; /* Lock to prevent interleaved output from threads */ pthread_mutex_t output_mutex; FILE *out_scans = NULL; /* LOCAL FUNCTION DECLARATIONS */ static int process_file( const char *infile); static int invoke_trw_model( worker_thread_data_t *work); static int invoke_blr_model( worker_thread_data_t *work); /* LOCAL VARIABLE DEFINITIONS */ static int numthreads = 0; static work_queue_t *work_queue; static work_queue_t *cleanup_queue; /* FUNCTION DEFINITONS */ int invoke_trw_model( worker_thread_data_t *work) { rwRec *flows = NULL; event_metrics_t *metrics = NULL; trw_counters_t *counters = NULL; rwRec *rwcurr = NULL; uint32_t i; uint32_t dip_prev = 0xffffffff, dip_curr = 0; flows = work->flows; metrics = work->metrics; counters = work->counters; metrics->model = RWSCAN_MODEL_TRW; for (i = 0; i < metrics->event_size; i++) { uint32_t j; rwcurr = &(flows[i]); dip_curr = rwcurr->dIP.ipnum; if (options.verbose_flows) { printf("%4u/%4u ", i + 1, metrics->event_size); print_flow(rwcurr); } counters->flows++; if (dip_curr != dip_prev) { pthread_mutex_lock(&trw_data.mutex); if (skIPTreeCheckAddress(rwcurr->dIP.ipnum, trw_data.existing)) { counters->hits++; } else { if (rwcurr->flags == TCP_FLAGS_SYN) { counters->misses++; } else { counters->hits++; } } pthread_mutex_unlock(&trw_data.mutex); counters->dips++; } if (rwcurr->flags == TCP_FLAGS_SYN) { counters->syns++; } if (rwcurr->flags == TCP_FLAGS_RST || rwcurr->flags == TCP_FLAGS_SYNACK || rwcurr->flags == TCP_FLAGS_RSTACK) { counters->bs++; } if (rwcurr->flags == TCP_FLAGS_RST || rwcurr->flags == TCP_FLAGS_SYNRST || rwcurr->flags == TCP_FLAGS_RSTACK) { counters->floodresponse++; } if (dip_curr != dip_prev) { for (j = 0, counters->likelihood = 1.0; j < counters->hits; j++) { counters->likelihood = counters->likelihood * (TRW_THETA1 / TRW_THETA0); } for (j = 0; j < counters->misses; j++) { counters->likelihood = counters->likelihood * ((1.0 - TRW_THETA1) / (1.0 - TRW_THETA0)); } } if (i > RWSCAN_FLOW_CUTOFF) { printf("warning: TRW giving up after %d flows\n", RWSCAN_FLOW_CUTOFF); break; } if (counters->syns == counters->flows) { if (counters->likelihood > TRW_ETA1) { /* add to scanners iptree */ pthread_mutex_lock(&trw_data.mutex); skIPTreeAddAddress(rwcurr->sIP.ipnum, trw_data.scanners); pthread_mutex_unlock(&trw_data.mutex); metrics->scan_probability = counters->likelihood; calculate_shared_metrics(flows, metrics); return (metrics->event_class = EVENT_SCAN); } else if (counters->likelihood < TRW_ETA0) { /* add to benign iptree */ pthread_mutex_lock(&trw_data.mutex); skIPTreeAddAddress(rwcurr->sIP.ipnum, trw_data.benign); pthread_mutex_unlock(&trw_data.mutex); metrics->scan_probability = counters->likelihood; return (metrics->event_class = EVENT_BENIGN); } } dip_prev = dip_curr; } if (counters->bs == counters->flows && counters->dips > 3 && counters->flows > 100) { return (metrics->event_class = EVENT_BACKSCATTER); } if (counters->dips == 1 && (counters->syns >= (counters->flows * 0.5)) && ((counters->syns + counters->floodresponse) == counters->flows) && counters->flows > 10) { return (metrics->event_class = EVENT_FLOOD); } return (metrics->event_class = EVENT_UNKNOWN); } int invoke_blr_model( worker_thread_data_t *work) { uint32_t i; rwRec *flows; event_metrics_t *metrics; flows = work->flows; metrics = work->metrics; metrics->model = RWSCAN_MODEL_BLR; if (metrics->event_size >= EVENT_FLOW_THRESHOLD) { rwRec *rwcurr = NULL; /* Loop through each RW record in the event, incrementing various * counters which will be used later. */ for (i = 0; i < metrics->event_size; i++) { rwcurr = &(flows[i]); if (options.verbose_flows) { printf("%4u/%4u ", i + 1, metrics->event_size); print_flow(rwcurr); } switch (rwcurr->proto) { case IPPROTO_ICMP: increment_icmp_counters(rwcurr, metrics); break; case IPPROTO_TCP: increment_tcp_counters(rwcurr, metrics); break; case IPPROTO_UDP: increment_udp_counters(rwcurr, metrics); break; default: /* we only detect scans in ICMP, TCP, and UDP protocols */ skAppPrintErr("%s:%d: invalid protocol", __FILE__, __LINE__); exit(EXIT_FAILURE); break; } } /* Now that we know we have a scan, we sort by dest IP and source * port (or for ICMP, just dest IP) to get further metrics-> */ qsort(flows, metrics->event_size, sizeof(rwRec), rwrec_compare_dip_sport); switch (metrics->protocol) { case IPPROTO_ICMP: calculate_icmp_metrics(flows, metrics); calculate_icmp_scan_probability(metrics); break; case IPPROTO_TCP: calculate_tcp_metrics(flows, metrics); calculate_tcp_scan_probability(metrics); break; case IPPROTO_UDP: calculate_udp_metrics(flows, metrics); calculate_udp_scan_probability(metrics); break; default: skAppPrintErr("%s:%d: invalid protocol", __FILE__, __LINE__); exit(EXIT_FAILURE); } } return metrics->event_class; } void *worker_thread( void *myarg) { work_queue_node_t *mynode; worker_thread_data_t *mywork; cleanup_node_t *cleanup_node; rwRec *flows; event_metrics_t *metrics; cleanup_node = (cleanup_node_t *) myarg; pthread_mutex_lock(&work_queue->mutex); while (work_queue->active) { while (workqueue_depth(work_queue) == 0 && work_queue->active) { pthread_cond_wait(&work_queue->cond_posted, &work_queue->mutex); } if (!work_queue->active) { printf("work queue no longer active\n"); break; } workqueue_get(work_queue, &mynode); mywork = (worker_thread_data_t *) mynode; flows = mywork->flows; metrics = mywork->metrics; pthread_mutex_unlock(&work_queue->mutex); if ((metrics->protocol == IPPROTO_TCP) && (options.scan_model == RWSCAN_MODEL_HYBRID || options.scan_model == RWSCAN_MODEL_TRW)) { mywork->counters = calloc(1, sizeof(trw_counters_t)); if (mywork->counters == NULL) { skAppPrintErr("Out of memory allocating TRW counters"); return NULL; } memset(mywork->counters, 0, sizeof(trw_counters_t)); invoke_trw_model(mywork); } if ((metrics->event_class != EVENT_SCAN && metrics->event_class != EVENT_FLOOD && metrics->event_class != EVENT_BACKSCATTER) && (options.scan_model == RWSCAN_MODEL_HYBRID || options.scan_model == RWSCAN_MODEL_BLR)) { qsort(flows, metrics->event_size, sizeof(rwRec), rwrec_compare_proto_stime); invoke_blr_model(mywork); } switch (metrics->event_class) { case EVENT_SCAN: { scan_info_t *scan = malloc(sizeof(scan_info_t)); if (scan == NULL) { skAppPrintErr("Out of memory allocating scan data"); return NULL; } /* yup, it's a scan */ pthread_mutex_lock(&summary_metrics.mutex); summary_metrics.scanners++; pthread_mutex_unlock(&summary_metrics.mutex); memset(scan, 0, sizeof(scan_info_t)); scan->ip = metrics->sip; scan->model = metrics->model; scan->stime = metrics->stime; scan->etime = metrics->etime; scan->flows = metrics->event_size; scan->pkts = metrics->pkts; scan->bytes = metrics->bytes; scan->proto = metrics->protocol; scan->scan_prob = metrics->scan_probability; assert(scan->scan_prob > 0); pthread_mutex_lock(&output_mutex); write_scan_record(scan, out_scans, options.no_columns, options.delimiter, options.model_fields); pthread_mutex_unlock(&output_mutex); free(scan); } break; case EVENT_BENIGN: pthread_mutex_lock(&summary_metrics.mutex); summary_metrics.benign++; pthread_mutex_unlock(&summary_metrics.mutex); break; case EVENT_BACKSCATTER: pthread_mutex_lock(&summary_metrics.mutex); summary_metrics.backscatter++; pthread_mutex_unlock(&summary_metrics.mutex); break; case EVENT_FLOOD: pthread_mutex_lock(&summary_metrics.mutex); summary_metrics.flooders++; pthread_mutex_unlock(&summary_metrics.mutex); break; case EVENT_UNKNOWN: pthread_mutex_lock(&summary_metrics.mutex); summary_metrics.unknown++; pthread_mutex_unlock(&summary_metrics.mutex); break; } if (mywork->flows) { free(mywork->flows); } if (mywork->metrics) { free(mywork->metrics); } if (mywork->counters) { free(mywork->counters); } free(mywork); pthread_mutex_lock(&work_queue->mutex); work_queue->pending--; pthread_cond_signal(&work_queue->cond_avail); } printf("work queue deactivated\n"); pthread_mutex_unlock(&work_queue->mutex); workqueue_put(cleanup_queue, &(cleanup_node->node)); pthread_cond_signal(&cleanup_queue->cond_posted); printf("thread %d shutting down...\n", cleanup_node->threadnum); return NULL; } int process_file( const char *infile) { rwIOStruct_t *in; rwRec *event_flows = NULL; /* all flows for a given sip/proto combo */ rwRec rwrec; /* holds each record read */ static char ipstr[16]; uint32_t last_sip = 0; uint32_t last_proto = 0; uint32_t last_stime = 0; int done = 0; event_metrics_t *metrics = NULL; metrics = calloc(1, sizeof(event_metrics_t)); if (metrics == NULL) { skAppPrintErr("Out of memory allocating metrics data"); return -1; } memset(&rwrec, 0, sizeof(rwRec)); /* open the input file */ in = rwOpenFile(infile, ioISP->inputCopyFD); if (!in) { /* rwOpenFile dumps error */ return -1; } /* The main program runloop. */ while (!done) { /* Read in a single RW record. */ if (rwRead(in, &rwrec)) { pthread_mutex_lock(&summary_metrics.mutex); summary_metrics.total_flows++; pthread_mutex_unlock(&summary_metrics.mutex); } else { done = 1; } /* If the proto is one we don't care about, read the next record. */ if ((rwrec.proto != IPPROTO_ICMP) && (rwrec.proto != IPPROTO_TCP) && (rwrec.proto != IPPROTO_UDP)) { pthread_mutex_lock(&summary_metrics.mutex); summary_metrics.ignored_flows++; pthread_mutex_unlock(&summary_metrics.mutex); continue; } /* These are the conditions under which we process the current event * (if applicable) and begin a new one. */ if (rwrec.sIP.ipnum != last_sip || rwrec.proto != last_proto || done) { /* If we have flows to examine, do so. */ if (metrics->event_size > 0) { worker_thread_data_t *mywork = NULL; if ((last_sip & options.verbose_progress) != (rwrec.sIP.ipnum & options.verbose_progress)) { num2dot_r(rwrec.sIP.ipnum & options.verbose_progress, ipstr); printf("progress: %s\n", ipstr); } mywork = calloc(1, sizeof(worker_thread_data_t)); if (mywork == NULL) { skAppPrintErr("Out of memory allocating worker thread data"); return -1; } mywork->flows = event_flows; mywork->metrics = metrics; workqueue_put(work_queue, &(mywork->node)); metrics = NULL; event_flows = NULL; mywork = NULL; } /* begin new event */ if (event_flows == NULL) { event_flows = malloc(RWSCAN_ALLOC_SIZE * sizeof(rwRec)); } else { event_flows = realloc(event_flows, (RWSCAN_ALLOC_SIZE * sizeof(rwRec))); } if (event_flows == NULL) { skAppPrintErr("Out of memory allocating event flow data"); return -1; } if (metrics == NULL) { metrics = malloc(sizeof(event_metrics_t)); if (metrics == NULL) { skAppPrintErr("Out of memory allocating metrics data"); return -1; } } memset(metrics, 0, sizeof(event_metrics_t)); metrics->protocol = rwrec.proto; metrics->sip = rwrec.sIP.ipnum; metrics->stime = rwrec.sTime; metrics->etime = rwrec.sTime + rwrec.elapsed; } else { /* No new event, so keep adding flows to the current event. */ if (rwrec.sTime < metrics->stime) { metrics->stime = rwrec.sTime; } if (rwrec.sTime > metrics->etime) { metrics->etime = rwrec.sTime + rwrec.elapsed; } } if (!(metrics->event_size % RWSCAN_ALLOC_SIZE) && (metrics->event_size != 0)) { event_flows = realloc(event_flows, ((metrics->event_size + RWSCAN_ALLOC_SIZE) * sizeof(rwRec))); if (event_flows == NULL) { skAppPrintErr("Out of memory reallocating event flow data"); return -1; } } metrics->event_size++; memcpy(&(event_flows[metrics->event_size - 1]), &rwrec, sizeof(rwRec)); last_sip = rwrec.sIP.ipnum; last_proto = rwrec.proto; last_stime = rwrec.sTime; } rwCloseFile(in); if (event_flows != NULL) { free(event_flows); } if (metrics != NULL) { free(metrics); } return 0; } int create_worker_threads( void) { uint32_t x; cleanup_node_t *curnode; for (x = 1; x <= options.worker_threads; x++) { curnode = malloc(sizeof(cleanup_node_t)); if (!curnode) return 1; memset(curnode, 0, sizeof(cleanup_node_t)); curnode->threadnum = x; if (pthread_create (&curnode->tid, NULL, worker_thread, (void *) curnode)) return 1; printf("created worker thread %u\n", x); numthreads++; } return 0; } void join_threads( void) { cleanup_node_t *curnode; work_queue_node_t *mynode; printf("joining threads...\n"); while (numthreads) { pthread_mutex_lock(&cleanup_queue->mutex); while (workqueue_depth(cleanup_queue) == 0) { pthread_cond_wait(&cleanup_queue->cond_posted, &cleanup_queue->mutex); } workqueue_get(cleanup_queue, &mynode); curnode = (cleanup_node_t *) mynode; pthread_mutex_unlock(&cleanup_queue->mutex); pthread_join(curnode->tid, NULL); printf("joined with thread %d\n", curnode->threadnum); free(curnode); numthreads--; } } int main( int argc, char **argv) { int rv = 0; int i; /* set up for application */ appSetup(argc, argv); pthread_mutex_init(&output_mutex, NULL); pthread_mutex_init(&summary_metrics.mutex, NULL); cleanup_queue = workqueue_create(options.worker_threads); work_queue = workqueue_create(options.work_queue_depth); if (!options.no_titles) { write_scan_header(out_scans, options.no_columns, options.delimiter, options.model_fields); } if (create_worker_threads()) { printf("Error starting worker threads!\n"); abort(); } for (i = ioISP->firstFile; i < ioISP->fileCount; i++) { printf("processing: %s\n", ioISP->fnArray[i]); process_file(ioISP->fnArray[i]); } pthread_mutex_lock(&work_queue->mutex); while (workqueue_depth(work_queue) > 0) { printf("waiting for worker threads to finish...\n"); pthread_cond_wait(&work_queue->cond_avail, &work_queue->mutex); } pthread_mutex_unlock(&work_queue->mutex); workqueue_deactivate(work_queue); join_threads(); workqueue_destroy(work_queue); workqueue_destroy(cleanup_queue); printf("Read %u flows\n", summary_metrics.total_flows); printf("\t%u scanners\n", summary_metrics.scanners); printf("\t%u benign\n", summary_metrics.benign); printf("\t%u unknown\n", summary_metrics.unknown); printf("\t\t%u backscatter\n", summary_metrics.backscatter); printf("\t\t%u SYN flooders\n", summary_metrics.flooders); /* done */ appTeardown(); return rv; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */