/* ** skipfix.c ** SiLK rwio IPFIX translation core ** ** ------------------------------------------------------------------------ ** Copyright (C) 2007 Carnegie Mellon University. All Rights Reserved. ** ------------------------------------------------------------------------ ** Authors: Brian Trammell ** ------------------------------------------------------------------------ ** GNU General Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ------------------------------------------------------------------------ ** $SiLK: skipfix.c 6907 2007-04-14 13:03:06Z mthomas $ ** ------------------------------------------------------------------------ */ #define _SKIPFIX_SOURCE_ #include "skipfix.h" /* IPFIX information elements in 6871 space for SiLK */ static fbInfoElement_t ski_info_elements[] = { FB_IE_INIT("initialTCPFlags", 6871, 14, 1, FB_IE_F_ENDIAN | FB_IE_F_REVERSIBLE), FB_IE_INIT("unionTCPFlags", 6871, 15, 1, FB_IE_F_ENDIAN | FB_IE_F_REVERSIBLE), FB_IE_INIT("reverseFlowDeltaMilliseconds", 6871, 21, 4, FB_IE_F_ENDIAN), FB_IE_INIT("silkFlowType", 6871, 30, 1, FB_IE_F_ENDIAN), FB_IE_INIT("silkFlowSensor", 6871, 31, 2, FB_IE_F_ENDIAN), FB_IE_INIT("silkTCPState", 6871, 32, 1, FB_IE_F_ENDIAN), FB_IE_INIT("silkAppLabel", 6871, 33, 2, FB_IE_F_ENDIAN), FB_IE_NULL }; static fbInfoElement_t ski_init_flags_ie = FB_IE_INIT("initialTCPFlags", 6871, 14, 1, FB_IE_F_ENDIAN); static fbInfoElementSpec_t ski_flow_spec[] = { /* Millisecond start and end (epoch) (native time) */ { "flowStartMilliseconds", 0, 0 }, { "flowEndMilliseconds", 0, 0 }, /* 4-tuple */ { "sourceIPv4Address", 0, 0 }, { "destinationIPv4Address", 0, 0 }, { "sourceTransportPort", 0, 0 }, { "destinationTransportPort", 0, 0 }, /* Router interface information */ { "ipNextHopIPv4Address", 0, 0 }, { "ingressInterface", 0, 0 }, { "egressInterface", 0, 0 }, /* Counters (reduced length encoding for SiLK) */ { "packetDeltaCount", 0, 0 }, { "octetDeltaCount", 0, 0 }, /* Protocol; sensor information */ { "protocolIdentifier", 0, 0 }, { "silkFlowType", 0, 0 }, { "silkFlowSensor", 0, 0 }, /* Flags */ { "tcpControlBits", 0, 0 }, { "initialTCPFlags", 0, 0 }, { "unionTCPFlags", 0, 0 }, { "silkTCPState", 0, 0 }, { "silkAppLabel", 0, 0 }, /* pad record to 64-bit boundary */ { "paddingOctets", 6, 0 }, }; typedef struct rwIpfixRec_V3 { uint64_t sTime_ems; uint64_t eTime_ems; uint32_t sIP; uint32_t dIP; uint16_t sPort; uint16_t dPort; uint32_t nhIP; uint32_t input; uint32_t output; uint64_t pkts; uint64_t bytes; uint8_t proto; flowtypeID_t flow_type; sensorID_t sID; uint8_t flags; uint8_t init_flags; uint8_t rest_flags; uint8_t tcp_state; uint16_t application; uint8_t pad[6]; } rwIpfixRec_V3_t; static fbInfoElementSpec_t ski_extflow_spec[] = { /* Total counter support */ { "packetTotalCount", 0, 0 }, { "octetTotalCount", 0, 0 }, /* Reverse counter support */ { "reversePacketTotalCount", 0, 0 }, { "reverseOctetTotalCount", 0, 0 }, /* Microsecond start and end (RFC1305-style) (extended time) */ { "flowStartMicroseconds", 0, 0 }, { "flowEndMicroseconds", 0, 0 }, /* Second start and end (extended time) */ { "flowStartSeconds", 0, 0 }, { "flowEndSeconds", 0, 0 }, /* Flow durations (extended time) */ { "flowDurationMicroseconds", 0, 0 }, { "flowDurationMilliseconds", 0, 0 }, /* Microsecond delta start and end (extended time) */ { "flowStartDeltaMicroseconds", 0, 0 }, { "flowEndDeltaMicroseconds", 0, 0 }, /* Initial packet roundtrip */ { "reverseFlowDeltaMilliseconds", 0, 0 }, /* Reverse flags */ { "reverseTcpControlBits", 0, 0 }, { "reverseInitialTCPFlags", 0, 0 }, { "reverseUnionTCPFlags", 0, 0 }, /* End reason */ { "flowEndReason", 0, 0 }, FB_IESPEC_NULL }; typedef struct rwIpfixExtRec_V3 { rwIpfixRec_V3_t rw; uint64_t pkts_total; uint64_t bytes_total; uint64_t rev_pkts_total; uint64_t rev_bytes_total; uint64_t sTime_nus; uint64_t eTime_nus; uint32_t sTime_es; uint32_t eTime_es; uint32_t elapsed_us; uint32_t elapsed_ms; uint32_t sTime_dus; uint32_t eTime_dus; uint32_t rev_dms; uint8_t rev_flags; uint8_t rev_init_flags; uint8_t rev_rest_flags; uint8_t end_reason; } rwIpfixExtRec_V3_t; #define SKI_END_IDLE 1 #define SKI_END_ACTIVE 2 #define SKI_END_CLOSED 3 #define SKI_END_FORCED 4 #define SKI_END_RESOURCE 5 #define SKI_END_MASK 0x7f #define SKI_END_ISCONT 0x80 static fbInfoModel_t *skiInfoModel(void) { static fbInfoModel_t *ski_model = NULL; if (!ski_model) { ski_model = fbInfoModelAlloc(); fbInfoModelAddElementArray(ski_model, ski_info_elements); } return ski_model; } fbListener_t *skiCreateListener( fbConnSpec_t *spec, fbListenerAppInit_fn appinit, fbListenerAppFree_fn appfree, GError **err) { fbInfoModel_t *model = skiInfoModel(); fbSession_t *session = NULL; fbTemplate_t *tmpl = NULL; fbListener_t *listener = NULL; /* Allocate a session */ session = fbSessionAlloc(model); /* Add the full record template */ tmpl = fbTemplateAlloc(model); if (!fbTemplateAppendSpecArray(tmpl, ski_flow_spec, 0, err)) { goto err; } if (!fbSessionAddTemplate(session, TRUE, SKI_FLOW_TID, tmpl, err)) { goto err; } /* Add the extended record template */ tmpl = fbTemplateAlloc(model); if (!fbTemplateAppendSpecArray(tmpl, ski_flow_spec, 0, err)) { goto err; } if (!fbTemplateAppendSpecArray(tmpl, ski_extflow_spec, 0, err)) { goto err; } if (!fbSessionAddTemplate(session, TRUE, SKI_EXTFLOW_TID, tmpl, err)) { goto err; } /* Allocate a listener */ if (!(listener = fbListenerAlloc(spec, session, appinit, appfree, err))) { goto err; } return listener; err: fbTemplateFreeUnused(tmpl); if (session) { fbSessionFree(session); } return NULL; } fBuf_t *skiCreateReadBuffer( fbCollector_t *collector, GError **err) { fbInfoModel_t *model = skiInfoModel(); fbSession_t *session = NULL; fbTemplate_t *tmpl = NULL; fBuf_t *fbuf = NULL; /* Allocate a session */ session = fbSessionAlloc(model); /* Add the full record template */ tmpl = fbTemplateAlloc(model); if (!fbTemplateAppendSpecArray(tmpl, ski_flow_spec, 0, err)) { goto err; } if (!fbSessionAddTemplate(session, TRUE, SKI_FLOW_TID, tmpl, err)) { goto err; } /* Add the extended record template */ tmpl = fbTemplateAlloc(model); if (!fbTemplateAppendSpecArray(tmpl, ski_flow_spec, 0, err)) { goto err; } if (!fbTemplateAppendSpecArray(tmpl, ski_extflow_spec, 0, err)) { goto err; } if (!fbSessionAddTemplate(session, TRUE, SKI_EXTFLOW_TID, tmpl, err)) { goto err; } /* Create a buffer with the session and supplied collector */ fbuf = fBufAllocForCollection(session, collector); /* done */ return fbuf; err: if (fbuf) { fBufFree(fbuf); } else { fbTemplateFreeUnused(tmpl); if (session) { fbSessionFree(session); } } return NULL; } fBuf_t *skiCreateReadBufferForFP( FILE *fp, GError **err) { return skiCreateReadBuffer(fbCollectorAllocFP(NULL, fp), err); } fBuf_t *skiCreateWriteBuffer( fbExporter_t *exporter, uint32_t domain, GError **err) { fbInfoModel_t *model = skiInfoModel(); fbSession_t *session = NULL; fbTemplate_t *tmpl = NULL; fBuf_t *fbuf = NULL; /* Allocate a session */ session = fbSessionAlloc(model); /* set observation domain */ fbSessionSetDomain(session, domain); /* Add the full record template */ tmpl = fbTemplateAlloc(model); if (!fbTemplateAppendSpecArray(tmpl, ski_flow_spec, 0, err)) { goto err; } if (!fbSessionAddTemplate(session, TRUE, SKI_FLOW_TID, tmpl, err)) { goto err; } if (!fbSessionAddTemplate(session, FALSE, SKI_FLOW_TID, tmpl, err)) { goto err; } /* Create a buffer with the session and supplied exporter */ fbuf = fBufAllocForExport(session, exporter); /* write RW base flow template */ if (!fbSessionExportTemplates(session, err)) { goto err; } /* set default templates */ if (!fBufSetInternalTemplate(fbuf, SKI_FLOW_TID, err)) { goto err; } if (!fBufSetExportTemplate(fbuf, SKI_FLOW_TID, err)) { goto err; } /* done */ return fbuf; err: if (fbuf) { fBufFree(fbuf); } else { fbTemplateFreeUnused(tmpl); if (session) { fbSessionFree(session); } } return NULL; } fBuf_t *skiCreateWriteBufferForFP( FILE *fp, uint32_t domain, GError **err) { return skiCreateWriteBuffer(fbExporterAllocFP(fp), domain, err); } static uint64_t skiNTPDecode( uint64_t ntp) { double dntp; uint64_t millis; if (!ntp) { return 0; } dntp = (ntp & 0xFFFFFFFF00000000LL) >> 32; dntp += ((ntp & 0x00000000FFFFFFFFLL) * 1.0) / (2LL << 32); millis = dntp * 1000; return millis; } gboolean skiRwNextRecord( fBuf_t *fbuf, rwRec *rec, rwRec *revRec, GError **err) { rwIpfixExtRec_V3_t fixrec; size_t len; uint64_t sTime, eTime; uint64_t pkts, bytes; /* set internal template */ if (!fBufSetInternalTemplate(fbuf, SKI_EXTFLOW_TID, err)) { return FALSE; } /* read a record */ len = sizeof(fixrec); if (!fBufNext(fbuf, (uint8_t *)&fixrec, &len, err)) { return FALSE; } /* Run the Gauntlet of Time */ if (fixrec.rw.sTime_ems) { /* Flow start time in epoch milliseconds */ rec->sTime = fixrec.rw.sTime_ems / 1000; rec->sTime_msec = fixrec.rw.sTime_ems % 1000; if (fixrec.rw.eTime_ems >= fixrec.rw.sTime_ems) { /* Flow end time in epoch milliseconds */ rec->elapsed = (fixrec.rw.eTime_ems - fixrec.rw.sTime_ems) / 1000; rec->elapsed_msec = (fixrec.rw.eTime_ems - fixrec.rw.sTime_ems) % 1000; } else { /* Flow duration in milliseconds */ rec->elapsed = fixrec.elapsed_ms / 1000; rec->elapsed_msec = fixrec.elapsed_ms % 1000; } } else if (fixrec.sTime_nus) { /* Flow start time in NTP microseconds */ sTime = skiNTPDecode(fixrec.sTime_nus); rec->sTime = sTime / 1000; rec->sTime_msec = sTime % 1000; if (fixrec.eTime_nus >= fixrec.sTime_nus) { /* Flow end time in NTP microseconds */ eTime = skiNTPDecode(fixrec.eTime_nus); rec->elapsed = (eTime - sTime) / 1000; rec->elapsed_msec = (eTime - sTime) % 1000; } else { /* Flow duration in microseconds */ rec->elapsed = fixrec.elapsed_us / 1000000; rec->elapsed_msec = (fixrec.elapsed_us / 1000) % 1000; } } else if (fixrec.sTime_es) { /* Seconds? Sure, why not. */ rec->sTime = fixrec.sTime_es; rec->sTime_msec = 0; rec->elapsed = fixrec.eTime_es - fixrec.sTime_es; rec->elapsed_msec = 0; } else if (fixrec.sTime_dus) { /* Flow start time in delta microseconds */ sTime = fBufGetExportTime(fbuf) * 1000 - fixrec.sTime_dus / 1000; rec->sTime = sTime / 1000; rec->sTime_msec = sTime % 1000; if (fixrec.eTime_dus && (fixrec.eTime_dus <= fixrec.sTime_dus)) { /* Flow end time in delta microseconds */ eTime = fBufGetExportTime(fbuf) * 1000 - fixrec.eTime_dus / 1000; rec->elapsed = (eTime - sTime) / 1000; rec->elapsed_msec = (eTime - sTime) % 1000; } else { /* Flow duration in microseconds */ rec->elapsed = fixrec.elapsed_us / 1000000; rec->elapsed_msec = (fixrec.elapsed_us / 1000) % 1000; } } else { /* No per-flow time information. Assume message export is flow end time. */ if (fixrec.elapsed_ms) { /* Flow duration in milliseconds */ rec->elapsed = fixrec.elapsed_ms / 1000; rec->elapsed_msec = fixrec.elapsed_ms % 1000; } else if (fixrec.elapsed_us) { /* Flow duration in microseconds */ rec->elapsed = fixrec.elapsed_us / 1000000; rec->elapsed_msec = (fixrec.elapsed_us / 1000) % 1000; } else { /* Presume zero duration flow */ rec->elapsed = 0; rec->elapsed_msec = 0; } /* Set start time based on export and elapsed time */ rec->sTime = fBufGetExportTime(fbuf) - rec->elapsed; rec->sTime_msec = 0; } /* Convert counters */ pkts = fixrec.rw.pkts; if (!pkts) { pkts = fixrec.pkts_total; } bytes = fixrec.rw.bytes; if (!bytes) { bytes = fixrec.bytes_total; } rec->pkts = (pkts > UINT32_MAX) ? UINT32_MAX : pkts; rec->bytes = (bytes > UINT32_MAX) ? UINT32_MAX : bytes; /* Copy rest of record */ rec->sIP.ipnum = fixrec.rw.sIP; rec->dIP.ipnum = fixrec.rw.dIP; rec->sPort = fixrec.rw.sPort; rec->dPort = fixrec.rw.dPort; rec->nhIP.ipnum = fixrec.rw.nhIP; rec->input = (uint16_t)fixrec.rw.input; rec->output = (uint16_t)fixrec.rw.output; rec->proto = fixrec.rw.proto; rec->flow_type = fixrec.rw.flow_type; rec->sID = fixrec.rw.sID; rec->flags = fixrec.rw.flags; rec->init_flags = fixrec.rw.init_flags; rec->rest_flags = fixrec.rw.rest_flags; rec->tcp_state = fixrec.rw.tcp_state; rec->application = fixrec.rw.application; /* Convert flags if necessary */ if (!rec->flags) { rec->flags = rec->init_flags | rec->rest_flags; } /* Convert TCP state info */ if (!rec->tcp_state) { /* Note expanded flags */ if ((rec->init_flags | rec->rest_flags) || fbTemplateContainsElement(fBufGetCollectionTemplate(fbuf, NULL), &ski_init_flags_ie)) { rec->tcp_state |= SK_TCPSTATE_EXPANDED; } /* Note active timeout */ if ((fixrec.end_reason & SKI_END_MASK) == SKI_END_ACTIVE) { rec->tcp_state |= SK_TCPSTATE_TIMEOUT_KILLED; } /* Note continuation */ if (fixrec.end_reason & SKI_END_ISCONT) { rec->tcp_state |= SK_TCPSTATE_TIMEOUT_STARTED; } } /* Zero bpp */ rec->bpp = 0; /* Fill in reverse record if available */ if (revRec) { if (fixrec.rev_bytes_total) { /* start with forward record */ memcpy(revRec, rec, sizeof(*rec)); /* reverse key */ revRec->sIP.ipnum = rec->dIP.ipnum; revRec->dIP.ipnum = rec->sIP.ipnum; revRec->sPort = rec->dPort; revRec->dPort = rec->sPort; /* convert time */ revRec->elapsed -= (fixrec.rev_dms / 1000); while (revRec->elapsed_msec < (fixrec.rev_dms % 1000)) { revRec->elapsed -= 1; revRec->elapsed_msec += 1000; } revRec->elapsed_msec -= (fixrec.rev_dms % 1000); revRec->sTime += (fixrec.rev_dms / 1000); revRec->sTime_msec += (fixrec.rev_dms % 1000); while (revRec->sTime_msec >= 1000) { revRec->sTime += 1; revRec->sTime_msec -= 1000; } /* fill in counters */ revRec->pkts = ((fixrec.rev_pkts_total > UINT32_MAX) ? UINT32_MAX : fixrec.rev_pkts_total); revRec->bytes = ((fixrec.rev_bytes_total > UINT32_MAX) ? UINT32_MAX : fixrec.rev_bytes_total); /* fill in tcp flags */ revRec->flags = fixrec.rev_flags; revRec->init_flags = fixrec.rev_init_flags; revRec->rest_flags = fixrec.rev_rest_flags; /* Convert flags if necessary */ if (!revRec->flags) { revRec->flags = revRec->init_flags | revRec->rest_flags; } } else { /* want a reverse record but none available; zero it. */ memset(revRec, 0, sizeof(*revRec)); } } /* all done */ return TRUE; } gboolean skiRwAppendRecord( fBuf_t *fbuf, rwRec *rec, GError **err) { rwIpfixRec_V3_t fixrec; /* Convert time */ fixrec.sTime_ems = ((uint64_t)rec->sTime * 1000) + rec->sTime_msec; fixrec.eTime_ems = ((uint64_t)fixrec.sTime_ems) + (rec->elapsed * 1000) + rec->elapsed_msec; /* Copy rest of record */ fixrec.sIP = rec->sIP.ipnum; fixrec.dIP = rec->dIP.ipnum; fixrec.sPort = rec->sPort; fixrec.dPort = rec->dPort; fixrec.nhIP = rec->nhIP.ipnum; fixrec.input = rec->input; fixrec.output = rec->output; fixrec.pkts = rec->pkts; fixrec.bytes = rec->bytes; fixrec.proto = rec->proto; fixrec.flow_type = rec->flow_type; fixrec.sID = rec->sID; fixrec.flags = rec->flags; fixrec.init_flags = rec->init_flags; fixrec.rest_flags = rec->rest_flags; fixrec.tcp_state = rec->tcp_state; fixrec.application = rec->application; /* append record to buffer */ if (!fBufAppend(fbuf, (uint8_t *)&fixrec, sizeof(fixrec), err)) { return FALSE; } /* all done */ return TRUE; }