//=========================================================================== // @(#) $Name: cflowd-2-1-b1 $ // @(#) $Id: CflowdRawFlow.cc,v 1.20 2000/01/08 00:57:33 dwm Exp $ //=========================================================================== // CAIDA Copyright Notice // // By accessing this software, cflowd++, you are duly informed // of and agree to be bound by the conditions described below in this // notice: // // This software product, cflowd++, is developed by Daniel W. McRobb, and // copyrighted(C) 1998 by the University of California, San Diego // (UCSD), with all rights reserved. UCSD administers the CAIDA grant, // NCR-9711092, under which part of this code was developed. // // There is no charge for cflowd++ software. You can redistribute it // and/or modify it under the terms of the GNU General Public License, // v. 2 dated June 1991 which is incorporated by reference herein. // cflowd++ is distributed WITHOUT ANY WARRANTY, IMPLIED OR EXPRESS, OF // MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE or that the use // of it will not infringe on any third party's intellectual property // rights. // // You should have received a copy of the GNU GPL along with cflowd++. // Copies can also be obtained from: // // http://www.gnu.org/copyleft/gpl.html // // or by writing to: // // University of California, San Diego // // SDSC/CAIDA // 9500 Gilman Dr., MS-0505 // La Jolla, CA 92093 - 0505 USA // // Or contact: // // info@caida.org //=========================================================================== extern "C" { #include "aclocal.h" #include } using namespace std; #include #include "CflowdRawFlow.hh" #include "ArtsPrimitive.hh" extern ArtsPrimitive g_CfdArtsPrimitive; static const string rcsid = "@(#) $Name: cflowd-2-1-b1 $ $Id: CflowdRawFlow.cc,v 1.20 2000/01/08 00:57:33 dwm Exp $"; //------------------------------------------------------------------------- // CflowdRawFlow::CflowdRawFlow() //......................................................................... // //------------------------------------------------------------------------- CflowdRawFlow::CflowdRawFlow() { memset(&(this->data),0,sizeof(this->data)); this->data._isHostOrder = true; } //------------------------------------------------------------------------- // CflowdRawFlow::CflowdRawFlow(ipv4addr_t ciscoIp, // const CiscoFlowHeaderV1_t * flowHeader, // const CiscoFlowEntryV1_t * flowEntry) //......................................................................... // //------------------------------------------------------------------------- CflowdRawFlow::CflowdRawFlow(ipv4addr_t ciscoIp, const CiscoFlowHeaderV1_t * flowHeader, const CiscoFlowEntryV1_t * flowEntry) { this->Init(ciscoIp,flowHeader,flowEntry); } //------------------------------------------------------------------------- // void CflowdRawFlow::Init(ipv4addr_t ciscoIp, // const CiscoFlowHeaderV1_t * flowHeader, // const CiscoFlowEntryV1_t * flowEntry) //......................................................................... // //------------------------------------------------------------------------- void CflowdRawFlow::Init(ipv4addr_t ciscoIp, const CiscoFlowHeaderV1_t * flowHeader, const CiscoFlowEntryV1_t * flowEntry) { this->data._index = 0; this->data._isHostOrder = true; this->Version(ntohs(flowHeader->version)); assert(this->Version() == 1); this->Router(ciscoIp); this->StartTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->first)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->EndTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->last)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->IpNextHop(flowEntry->nexthop); this->SrcIpAddr(flowEntry->srcaddr); this->DstIpAddr(flowEntry->dstaddr); this->InputIfIndex(ntohs(flowEntry->input)); this->OutputIfIndex(ntohs(flowEntry->output)); this->SrcPort(ntohs(flowEntry->srcport)); this->DstPort(ntohs(flowEntry->dstport)); this->Protocol(flowEntry->prot); this->Tos(flowEntry->tos); this->Pkts(ntohl(flowEntry->pkts)); this->Bytes(ntohl(flowEntry->bytes)); return; } //--------------------------------------------------------------------------- // CflowdRawFlow::CflowdRawFlow(ipv4addr_t ciscoIp, // const CiscoFlowHeaderV6_t * flowHeader, // const CiscoFlowEntryV6_t * flowEntry) //........................................................................... // //--------------------------------------------------------------------------- CflowdRawFlow::CflowdRawFlow(ipv4addr_t ciscoIp, const CiscoFlowHeaderV6_t * flowHeader, const CiscoFlowEntryV6_t * flowEntry) { this->data._index = 0; this->data._isHostOrder = true; this->Version(ntohs(flowHeader->version)); assert(this->Version() == 6); this->Router(ciscoIp); this->StartTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->first)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->EndTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->last)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->IpNextHop(flowEntry->nexthop); this->SrcIpAddr(flowEntry->srcaddr); this->DstIpAddr(flowEntry->dstaddr); this->InputIfIndex(ntohs(flowEntry->input)); this->OutputIfIndex(ntohs(flowEntry->output)); this->SrcPort(ntohs(flowEntry->srcport)); this->DstPort(ntohs(flowEntry->dstport)); this->Protocol(flowEntry->prot); this->Tos(flowEntry->tos); this->TcpFlags(flowEntry->tcp_flags); this->SrcAs(ntohs(flowEntry->src_as)); this->DstAs(ntohs(flowEntry->dst_as)); this->SrcMaskLen(flowEntry->src_mask); this->DstMaskLen(flowEntry->dst_mask); this->Pkts(ntohl(flowEntry->pkts)); this->Bytes(ntohl(flowEntry->bytes)); this->InputEncap(flowEntry->in_encaps); this->OutputEncap(flowEntry->out_encaps); this->PeerNextHop(flowEntry->peer_nexthop); this->EngineType(flowHeader->engine_type); this->EngineId(flowHeader->engine_id); } //------------------------------------------------------------------------- // void CflowdRawFlow::Init(ipv4addr_t ciscoIp, // const CiscoFlowHeaderV6_t * flowHeader, // const CiscoFlowEntryV6_t * flowEntry) //......................................................................... // //------------------------------------------------------------------------- void CflowdRawFlow::Init(ipv4addr_t ciscoIp, const CiscoFlowHeaderV6_t * flowHeader, const CiscoFlowEntryV6_t * flowEntry) { this->data._index = 0; this->data._isHostOrder = true; this->Version(ntohs(flowHeader->version)); assert(this->Version() == 6); this->Router(ciscoIp); this->StartTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->first)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->EndTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->last)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->IpNextHop(flowEntry->nexthop); this->SrcIpAddr(flowEntry->srcaddr); this->DstIpAddr(flowEntry->dstaddr); this->InputIfIndex(ntohs(flowEntry->input)); this->OutputIfIndex(ntohs(flowEntry->output)); this->SrcPort(ntohs(flowEntry->srcport)); this->DstPort(ntohs(flowEntry->dstport)); this->Protocol(flowEntry->prot); this->Tos(flowEntry->tos); this->TcpFlags(flowEntry->tcp_flags); this->SrcAs(ntohs(flowEntry->src_as)); this->DstAs(ntohs(flowEntry->dst_as)); this->SrcMaskLen(flowEntry->src_mask); this->DstMaskLen(flowEntry->dst_mask); this->Pkts(ntohl(flowEntry->pkts)); this->Bytes(ntohl(flowEntry->bytes)); this->InputEncap(flowEntry->in_encaps); this->OutputEncap(flowEntry->out_encaps); this->PeerNextHop(flowEntry->peer_nexthop); this->EngineType(flowHeader->engine_type); this->EngineId(flowHeader->engine_id); return; } //---------------------------------------------------------------------------- // void CflowdRawFlow::Init(ipv4addr_t ciscoIp, // const CiscoFlowHeaderV8_t * flowHeader, // const CiscoFlowEntryV8AsAggV2_t * flowEntry) //............................................................................ // //---------------------------------------------------------------------------- void CflowdRawFlow::Init(ipv4addr_t ciscoIp, const CiscoFlowHeaderV8_t * flowHeader, const CiscoFlowEntryV8AsAggV2_t * flowEntry) { assert(flowHeader->agg_method == k_CiscoV8FlowExportASAggType); this->data._index = 0; this->data._isHostOrder = true; this->Version(ntohs(flowHeader->version)); assert(this->Version() == 8); this->Router(ciscoIp); this->StartTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->first)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->EndTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->last)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->InputIfIndex(ntohs(flowEntry->input)); this->OutputIfIndex(ntohs(flowEntry->output)); this->SrcAs(ntohs(flowEntry->src_as)); this->DstAs(ntohs(flowEntry->dst_as)); this->Pkts(ntohl(flowEntry->pkts)); this->Bytes(ntohl(flowEntry->bytes)); this->EngineType(flowHeader->engine_type); this->EngineId(flowHeader->engine_id); return; } //---------------------------------------------------------------------------- // void CflowdRawFlow::Init(ipv4addr_t ciscoIp, // const CiscoFlowHeaderV8_t * flowHeader, // const CiscoFlowEntryV8ProtocolPortAggV2_t * flowEntry) //............................................................................ // //---------------------------------------------------------------------------- void CflowdRawFlow::Init(ipv4addr_t ciscoIp, const CiscoFlowHeaderV8_t * flowHeader, const CiscoFlowEntryV8ProtocolPortAggV2_t * flowEntry) { assert(flowHeader->agg_method == k_CiscoV8FlowExportProtocolPortAggType); this->data._index = 0; this->data._isHostOrder = true; this->Version(ntohs(flowHeader->version)); assert(this->Version() == 8); this->Router(ciscoIp); this->StartTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->first)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->EndTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->last)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->SrcPort(ntohs(flowEntry->srcport)); this->DstPort(ntohs(flowEntry->dstport)); this->Protocol(flowEntry->prot); this->Pkts(ntohl(flowEntry->pkts)); this->Bytes(ntohl(flowEntry->bytes)); this->EngineType(flowHeader->engine_type); this->EngineId(flowHeader->engine_id); return; } //---------------------------------------------------------------------------- // void CflowdRawFlow::Init(ipv4addr_t ciscoIp, // const CiscoFlowHeaderV8_t * flowHeader, // const CiscoFlowEntryV8NetMatrixAggV2_t * flowEntry) //............................................................................ // //---------------------------------------------------------------------------- void CflowdRawFlow::Init(ipv4addr_t ciscoIp, const CiscoFlowHeaderV8_t * flowHeader, const CiscoFlowEntryV8NetMatrixAggV2_t * flowEntry) { assert(flowHeader->agg_method == k_CiscoV8FlowExportNetMatrixAggType); this->data._index = 0; this->data._isHostOrder = true; this->Version(ntohs(flowHeader->version)); assert(this->Version() == 8); this->Router(ciscoIp); this->StartTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->first)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->EndTime(ntohl(flowHeader->unix_secs) + (ntohl(flowEntry->last)/1000 - ntohl(flowHeader->sysUptime)/1000)); this->InputIfIndex(ntohs(flowEntry->input)); this->OutputIfIndex(ntohs(flowEntry->output)); this->SrcIpAddr(flowEntry->srcnet); this->DstIpAddr(flowEntry->dstnet); this->SrcMaskLen(flowEntry->src_mask); this->DstMaskLen(flowEntry->dst_mask); this->SrcAs(ntohs(flowEntry->src_as)); this->DstAs(ntohs(flowEntry->dst_as)); this->Pkts(ntohl(flowEntry->pkts)); this->Bytes(ntohl(flowEntry->bytes)); this->EngineType(flowHeader->engine_type); this->EngineId(flowHeader->engine_id); } //------------------------------------------------------------------------- // istream & CflowdRawFlow::Read(istream & is) //......................................................................... // //------------------------------------------------------------------------- istream & CflowdRawFlow::Read(istream & is) { index_type flowIndex; this->data._isHostOrder = false; is.read((char *)&(this->data._index),sizeof(this->data._index)); if (! is) return(is); flowIndex = ntohl(this->data._index); if (flowIndex & CflowdRawFlow::k_routerMask) { is.read((char *)&(this->data._router),sizeof(this->data._router)); } if (flowIndex & CflowdRawFlow::k_srcIpAddrMask) { is.read((char *)&(this->data._srcIpAddr),sizeof(this->data._srcIpAddr)); } if (flowIndex & CflowdRawFlow::k_dstIpAddrMask) { is.read((char *)&(this->data._dstIpAddr),sizeof(this->data._dstIpAddr)); } if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) { is.read((char *)&(this->data._inputIfIndex),sizeof(this->data._inputIfIndex)); } if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) { is.read((char *)&(this->data._outputIfIndex),sizeof(this->data._outputIfIndex)); } if (flowIndex & CflowdRawFlow::k_srcPortMask) { is.read((char *)&(this->data._srcPort),sizeof(this->data._srcPort)); } if (flowIndex & CflowdRawFlow::k_dstPortMask) { is.read((char *)&(this->data._dstPort),sizeof(this->data._dstPort)); } if (flowIndex & CflowdRawFlow::k_pktsMask) { is.read((char *)&(this->data._pkts),sizeof(this->data._pkts)); } if (flowIndex & CflowdRawFlow::k_bytesMask) { is.read((char *)&(this->data._bytes),sizeof(this->data._bytes)); } if (flowIndex & CflowdRawFlow::k_ipNextHopMask) { is.read((char *)&(this->data._ipNextHop),sizeof(this->data._ipNextHop)); } if (flowIndex & CflowdRawFlow::k_startTimeMask) { is.read((char *)&(this->data._startTime),sizeof(this->data._startTime)); } if (flowIndex & CflowdRawFlow::k_endTimeMask) { is.read((char *)&(this->data._endTime),sizeof(this->data._endTime)); } if (flowIndex & CflowdRawFlow::k_protocolMask) { is.read((char *)&(this->data._protocol),sizeof(this->data._protocol)); } if (flowIndex & CflowdRawFlow::k_tosMask) { is.read((char *)&(this->data._tos),sizeof(this->data._tos)); } if (flowIndex & CflowdRawFlow::k_srcAsMask) { is.read((char *)&(this->data._srcAs),sizeof(this->data._srcAs)); } if (flowIndex & CflowdRawFlow::k_dstAsMask) { is.read((char *)&(this->data._dstAs),sizeof(this->data._dstAs)); } if (flowIndex & CflowdRawFlow::k_srcMaskLenMask) { is.read((char *)&(this->data._srcMaskLen),sizeof(this->data._srcMaskLen)); } if (flowIndex & CflowdRawFlow::k_dstMaskLenMask) { is.read((char *)&(this->data._dstMaskLen),sizeof(this->data._dstMaskLen)); } if (flowIndex & CflowdRawFlow::k_tcpFlagsMask) { is.read((char *)&(this->data._tcpFlags),sizeof(this->data._tcpFlags)); } if (flowIndex & CflowdRawFlow::k_inputEncapMask) { is.read((char *)&(this->data._inputEncap),sizeof(this->data._inputEncap)); } if (flowIndex & CflowdRawFlow::k_outputEncapMask) { is.read((char *)&(this->data._outputEncap),sizeof(this->data._outputEncap)); } if (flowIndex & CflowdRawFlow::k_peerNextHopMask) { is.read((char *)&(this->data._peerNextHop),sizeof(this->data._peerNextHop)); } if (flowIndex & CflowdRawFlow::k_engineTypeMask) { is.read((char *)&(this->data._engineType),sizeof(this->data._engineType)); } if (flowIndex & CflowdRawFlow::k_engineIdMask) { is.read((char *)&(this->data._engineId),sizeof(this->data._engineId)); } // convert to host byte order this->ToHostByteOrder(); return(is); } #ifndef CANT_GATHER_SCATTER_SOCKIO //------------------------------------------------------------------------- // int CflowdRawFlow::Read(int fd) //......................................................................... // //------------------------------------------------------------------------- int CflowdRawFlow::Read(int fd) { struct iovec iov[sizeof(CflowdRawFlow::index_type) * 8]; int rc; int bytesRead = 0; int iovecCount = 0; ssize_t readvBytesExpected = 0; index_type flowIndex; this->data._isHostOrder = false; // ideally I'd read the index, then do a scatter read (readv()). rc = ::read(fd,&(this->data._index),sizeof(this->data._index)); if (rc < (int)sizeof(this->data._index)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._index),sizeof(this->data._index),__FILE__,__LINE__); return(-1); } bytesRead += rc; flowIndex = ntohl(this->data._index); // now set up my iovec if (flowIndex & CflowdRawFlow::k_routerMask) { iov[iovecCount].iov_base = (char *)&(this->data._router); iov[iovecCount].iov_len = sizeof(this->data._router); readvBytesExpected += sizeof(this->data._router); iovecCount++; } if (flowIndex & CflowdRawFlow::k_srcIpAddrMask) { iov[iovecCount].iov_base = (char *)&(this->data._srcIpAddr); iov[iovecCount].iov_len = sizeof(this->data._srcIpAddr); readvBytesExpected += sizeof(this->data._srcIpAddr); iovecCount++; } if (flowIndex & CflowdRawFlow::k_dstIpAddrMask) { iov[iovecCount].iov_base = (char *)&(this->data._dstIpAddr); iov[iovecCount].iov_len = sizeof(this->data._dstIpAddr); readvBytesExpected += sizeof(this->data._dstIpAddr); iovecCount++; } if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) { iov[iovecCount].iov_base = (char *)&(this->data._inputIfIndex); iov[iovecCount].iov_len = sizeof(this->data._inputIfIndex); readvBytesExpected += sizeof(this->data._inputIfIndex); iovecCount++; } if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) { iov[iovecCount].iov_base = (char *)&(this->data._outputIfIndex); iov[iovecCount].iov_len = sizeof(this->data._outputIfIndex); readvBytesExpected += sizeof(this->data._outputIfIndex); iovecCount++; } if (flowIndex & CflowdRawFlow::k_srcPortMask) { iov[iovecCount].iov_base = (char *)&(this->data._srcPort); iov[iovecCount].iov_len = sizeof(this->data._srcPort); readvBytesExpected += sizeof(this->data._srcPort); iovecCount++; } if (flowIndex & CflowdRawFlow::k_dstPortMask) { iov[iovecCount].iov_base = (char *)&(this->data._dstPort); iov[iovecCount].iov_len = sizeof(this->data._dstPort); readvBytesExpected += sizeof(this->data._dstPort); iovecCount++; } if (flowIndex & CflowdRawFlow::k_pktsMask) { iov[iovecCount].iov_base = (char *)&(this->data._pkts); iov[iovecCount].iov_len = sizeof(this->data._pkts); readvBytesExpected += sizeof(this->data._pkts); iovecCount++; } if (flowIndex & CflowdRawFlow::k_bytesMask) { iov[iovecCount].iov_base = (char *)&(this->data._bytes); iov[iovecCount].iov_len = sizeof(this->data._bytes); readvBytesExpected += sizeof(this->data._bytes); iovecCount++; } if (flowIndex & CflowdRawFlow::k_ipNextHopMask) { iov[iovecCount].iov_base = (char *)&(this->data._ipNextHop); iov[iovecCount].iov_len = sizeof(this->data._ipNextHop); readvBytesExpected += sizeof(this->data._ipNextHop); iovecCount++; } if (flowIndex & CflowdRawFlow::k_startTimeMask) { iov[iovecCount].iov_base = (char *)&(this->data._startTime); iov[iovecCount].iov_len = sizeof(this->data._startTime); readvBytesExpected += sizeof(this->data._startTime); iovecCount++; } if (flowIndex & CflowdRawFlow::k_endTimeMask) { iov[iovecCount].iov_base = (char *)&(this->data._endTime); iov[iovecCount].iov_len = sizeof(this->data._endTime); readvBytesExpected += sizeof(this->data._endTime); iovecCount++; } if (flowIndex & CflowdRawFlow::k_protocolMask) { iov[iovecCount].iov_base = (char *)&(this->data._protocol); iov[iovecCount].iov_len = sizeof(this->data._protocol); readvBytesExpected += sizeof(this->data._protocol); iovecCount++; } if (flowIndex & CflowdRawFlow::k_tosMask) { iov[iovecCount].iov_base = (char *)&(this->data._tos); iov[iovecCount].iov_len = sizeof(this->data._tos); readvBytesExpected += sizeof(this->data._tos); iovecCount++; } if (flowIndex & CflowdRawFlow::k_srcAsMask) { iov[iovecCount].iov_base = (char *)&(this->data._srcAs); iov[iovecCount].iov_len = sizeof(this->data._srcAs); readvBytesExpected += sizeof(this->data._srcAs); iovecCount++; } if (flowIndex & CflowdRawFlow::k_dstAsMask) { iov[iovecCount].iov_base = (char *)&(this->data._dstAs); iov[iovecCount].iov_len = sizeof(this->data._dstAs); readvBytesExpected += sizeof(this->data._dstAs); iovecCount++; } if (flowIndex & CflowdRawFlow::k_srcMaskLenMask) { iov[iovecCount].iov_base = (char *)&(this->data._srcMaskLen); iov[iovecCount].iov_len = sizeof(this->data._srcMaskLen); readvBytesExpected += sizeof(this->data._srcMaskLen); iovecCount++; } if (flowIndex & CflowdRawFlow::k_dstMaskLenMask) { iov[iovecCount].iov_base = (char *)&(this->data._dstMaskLen); iov[iovecCount].iov_len = sizeof(this->data._dstMaskLen); readvBytesExpected += sizeof(this->data._dstMaskLen); iovecCount++; } if (flowIndex & CflowdRawFlow::k_tcpFlagsMask) { iov[iovecCount].iov_base = (char *)&(this->data._tcpFlags); iov[iovecCount].iov_len = sizeof(this->data._tcpFlags); readvBytesExpected += sizeof(this->data._tcpFlags); iovecCount++; } if (flowIndex & CflowdRawFlow::k_inputEncapMask) { iov[iovecCount].iov_base = (char *)&(this->data._inputEncap); iov[iovecCount].iov_len = sizeof(this->data._inputEncap); readvBytesExpected += sizeof(this->data._inputEncap); iovecCount++; } if (flowIndex & CflowdRawFlow::k_outputEncapMask) { iov[iovecCount].iov_base = (char *)&(this->data._outputEncap); iov[iovecCount].iov_len = sizeof(this->data._outputEncap); readvBytesExpected += sizeof(this->data._outputEncap); iovecCount++; } if (flowIndex & CflowdRawFlow::k_peerNextHopMask) { iov[iovecCount].iov_base = (char *)&(this->data._peerNextHop); iov[iovecCount].iov_len = sizeof(this->data._peerNextHop); readvBytesExpected += sizeof(this->data._peerNextHop); iovecCount++; } if (flowIndex & CflowdRawFlow::k_engineTypeMask) { iov[iovecCount].iov_base = (char *)&(this->data._engineType); iov[iovecCount].iov_len = sizeof(this->data._engineType); readvBytesExpected += sizeof(this->data._engineType); iovecCount++; } if (flowIndex & CflowdRawFlow::k_engineIdMask) { iov[iovecCount].iov_base = (char *)&(this->data._engineId); iov[iovecCount].iov_len = sizeof(this->data._engineId); readvBytesExpected += sizeof(this->data._engineId); iovecCount++; } // Now I'm ready to read. int readvRc; if ((readvRc = readv(fd,iov,iovecCount)) < readvBytesExpected) { syslog(LOG_ERR, "[E] readv(%d,iov,%d) expected %d (in %d targets), got %d: " "%m {%s:%d}", fd,iovecCount,readvBytesExpected,iovecCount,readvRc, __FILE__,__LINE__); this->ToHostByteOrder(); return(-1); } bytesRead += readvBytesExpected; // convert to host byte order this->ToHostByteOrder(); return(bytesRead); } #else // We can't use readv() on our named UNIX stream socket. :-( //------------------------------------------------------------------------- // int CflowdRawFlow::Read(int fd) //......................................................................... // //------------------------------------------------------------------------- int CflowdRawFlow::Read(int fd) { uint8_t buf[2048], *bufPtr; int rc; int bytesRead = 0; index_type flowIndex; this->data._isHostOrder = false; rc = ::read(fd,&(this->data._index),sizeof(this->data._index)); if (rc < sizeof(this->data._index)) { return(-1); } bytesRead += rc; flowIndex = ntohl(this->data._index); if (flowIndex & CflowdRawFlow::k_routerMask) { if ((rc = ::read(fd,&(this->data._router),sizeof(this->data._router))) < sizeof(this->data._router)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._router),sizeof(this->data._router), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._router); } if (flowIndex & CflowdRawFlow::k_srcIpAddrMask) { if ((rc = ::read(fd,&(this->data._srcIpAddr),sizeof(this->data._srcIpAddr))) < sizeof(this->data._srcIpAddr)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._srcIpAddr),sizeof(this->data._srcIpAddr), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._srcIpAddr); } if (flowIndex & CflowdRawFlow::k_dstIpAddrMask) { if ((rc = ::read(fd,&(this->data._dstIpAddr),sizeof(this->data._dstIpAddr))) < sizeof(this->data._dstIpAddr)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._dstIpAddr),sizeof(this->data._dstIpAddr), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._dstIpAddr); } if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) { if ((rc = ::read(fd,&(this->data._inputIfIndex),sizeof(this->data._inputIfIndex))) < sizeof(this->data._inputIfIndex)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._inputIfIndex),sizeof(this->data._inputIfIndex), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._inputIfIndex); } if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) { if ((rc = ::read(fd,&(this->data._outputIfIndex), sizeof(this->data._outputIfIndex))) < sizeof(this->data._outputIfIndex)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._outputIfIndex),sizeof(this->data._outputIfIndex), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._outputIfIndex); } if (flowIndex & CflowdRawFlow::k_srcPortMask) { if ((rc = ::read(fd,&(this->data._srcPort),sizeof(this->data._srcPort))) < sizeof(this->data._srcPort)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._srcPort),sizeof(this->data._srcPort), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._srcPort); } if (flowIndex & CflowdRawFlow::k_dstPortMask) { if ((rc = ::read(fd,&(this->data._dstPort),sizeof(this->data._dstPort))) < sizeof(this->data._dstPort)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._dstPort),sizeof(this->data._dstPort), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._dstPort); } if (flowIndex & CflowdRawFlow::k_pktsMask) { if ((rc = ::read(fd,&(this->data._pkts),sizeof(this->data._pkts))) < sizeof(this->data._pkts)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._pkts),sizeof(this->data._pkts), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._pkts); } if (flowIndex & CflowdRawFlow::k_bytesMask) { if ((rc = ::read(fd,&(this->data._bytes),sizeof(this->data._bytes))) < sizeof(this->data._bytes)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._bytes),sizeof(this->data._bytes), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._bytes); } if (flowIndex & CflowdRawFlow::k_ipNextHopMask) { if ((rc = ::read(fd,&(this->data._ipNextHop),sizeof(this->data._ipNextHop))) < sizeof(this->data._ipNextHop)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._ipNextHop),sizeof(this->data._ipNextHop), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._ipNextHop); } if (flowIndex & CflowdRawFlow::k_startTimeMask) { if ((rc = ::read(fd,&(this->data._startTime),sizeof(this->data._startTime))) < sizeof(this->data._startTime)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._startTime),sizeof(this->data._startTime), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._startTime); } if (flowIndex & CflowdRawFlow::k_endTimeMask) { if ((rc = ::read(fd,&(this->data._endTime),sizeof(this->data._endTime))) < sizeof(this->data._endTime)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._endTime),sizeof(this->data._endTime), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._endTime); } if (flowIndex & CflowdRawFlow::k_protocolMask) { if ((rc = ::read(fd,&(this->data._protocol),sizeof(this->data._protocol))) < sizeof(this->data._protocol)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._protocol),sizeof(this->data._protocol), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._protocol); } if (flowIndex & CflowdRawFlow::k_tosMask) { if ((rc = ::read(fd,&(this->data._tos),sizeof(this->data._tos))) < sizeof(this->data._tos)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._tos),sizeof(this->data._tos), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._tos); } if (flowIndex & CflowdRawFlow::k_srcAsMask) { if ((rc = ::read(fd,&(this->data._srcAs),sizeof(this->data._srcAs))) < sizeof(this->data._srcAs)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._srcAs),sizeof(this->data._srcAs), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._srcAs); } if (flowIndex & CflowdRawFlow::k_dstAsMask) { if ((rc = ::read(fd,&(this->data._dstAs),sizeof(this->data._dstAs))) < sizeof(this->data._dstAs)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._dstAs),sizeof(this->data._dstAs), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._dstAs); } if (flowIndex & CflowdRawFlow::k_srcMaskLenMask) { if ((rc = ::read(fd,&(this->data._srcMaskLen),sizeof(this->data._srcMaskLen))) < sizeof(this->data._srcMaskLen)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._srcMaskLen),sizeof(this->data._srcMaskLen), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._srcMaskLen); } if (flowIndex & CflowdRawFlow::k_dstMaskLenMask) { if ((rc = ::read(fd,&(this->data._dstMaskLen),sizeof(this->data._dstMaskLen))) < sizeof(this->data._dstMaskLen)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._dstMaskLen),sizeof(this->data._dstMaskLen), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._dstMaskLen); } if (flowIndex & CflowdRawFlow::k_tcpFlagsMask) { if ((rc = ::read(fd,&(this->data._tcpFlags),sizeof(this->data._tcpFlags))) < sizeof(this->data._tcpFlags)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._tcpFlags),sizeof(this->data._tcpFlags), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._tcpFlags); } if (flowIndex & CflowdRawFlow::k_inputEncapMask) { if ((rc = ::read(fd,&(this->data._inputEncap),sizeof(this->data._inputEncap))) < sizeof(this->data._inputEncap)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._inputEncap),sizeof(this->data._inputEncap), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._inputEncap); } if (flowIndex & CflowdRawFlow::k_outputEncapMask) { if ((rc = ::read(fd,&(this->data._outputEncap),sizeof(this->data._outputEncap))) < sizeof(this->data._outputEncap)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._outputEncap),sizeof(this->data._outputEncap), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._outputEncap); } if (flowIndex & CflowdRawFlow::k_peerNextHopMask) { if ((rc = ::read(fd,&(this->data._peerNextHop),sizeof(this->data._peerNextHop))) < sizeof(this->data._peerNextHop)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._peerNextHop),sizeof(this->data._peerNextHop), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._peerNextHop); } if (flowIndex & CflowdRawFlow::k_engineTypeMask) { if ((rc = ::read(fd,&(this->data._engineType),sizeof(this->data._engineType))) < sizeof(this->data._engineType)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._engineType),sizeof(this->data._engineType), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._engineType); } if (flowIndex & CflowdRawFlow::k_engineIdMask) { if ((rc = ::read(fd,&(this->data._engineId),sizeof(this->data._engineId))) < sizeof(this->data._engineId)) { syslog(LOG_ERR,"[E] read(%d,%p,%d) failed: %m {%s:%d}", fd,&(this->data._engineId),sizeof(this->data._engineId), __FILE__,__LINE__); return(-1); } bytesRead += sizeof(this->data._engineId); } // convert to host byte order this->ToHostByteOrder(); return(bytesRead); } #endif // CANT_GATHER_SCATTER_SOCKIO //------------------------------------------------------------------------- // void CflowdRawFlow::ToHostByteOrder() //......................................................................... // //------------------------------------------------------------------------- void CflowdRawFlow::ToHostByteOrder() { if (this->data._isHostOrder) return; this->data._index = ntohl(this->data._index); if (this->Index() & CflowdRawFlow::k_inputIfIndexMask) this->data._inputIfIndex = ntohs(this->data._inputIfIndex); if (this->Index() & CflowdRawFlow::k_outputIfIndexMask) this->data._outputIfIndex = ntohs(this->data._outputIfIndex); if (this->Index() & CflowdRawFlow::k_srcPortMask) this->data._srcPort = ntohs(this->data._srcPort); if (this->Index() & CflowdRawFlow::k_dstPortMask) this->data._dstPort = ntohs(this->data._dstPort); if (this->Index() & CflowdRawFlow::k_pktsMask) this->data._pkts = ntohl(this->data._pkts); if (this->Index() & CflowdRawFlow::k_bytesMask) this->data._bytes = ntohl(this->data._bytes); if (this->Index() & CflowdRawFlow::k_startTimeMask) this->data._startTime = ntohl(this->data._startTime); if (this->Index() & CflowdRawFlow::k_endTimeMask) this->data._endTime = ntohl(this->data._endTime); if (this->Index() & CflowdRawFlow::k_srcAsMask) this->data._srcAs = ntohs(this->data._srcAs); if (this->Index() & CflowdRawFlow::k_dstAsMask) this->data._dstAs = ntohs(this->data._dstAs); this->data._isHostOrder = true; return; } //------------------------------------------------------------------------- // void CflowdRawFlow::ToNetworkByteOrder() //......................................................................... // //------------------------------------------------------------------------- void CflowdRawFlow::ToNetworkByteOrder() { index_type flowIndex; if (! this->data._isHostOrder) return; flowIndex = this->Index(); this->data._index = htonl(this->data._index); if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) this->data._inputIfIndex = htons(this->data._inputIfIndex); if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) this->data._outputIfIndex = htons(this->data._outputIfIndex); if (flowIndex & CflowdRawFlow::k_srcPortMask) this->data._srcPort = htons(this->data._srcPort); if (flowIndex & CflowdRawFlow::k_dstPortMask) this->data._dstPort = htons(this->data._dstPort); if (flowIndex & CflowdRawFlow::k_pktsMask) this->data._pkts = htonl(this->data._pkts); if (flowIndex & CflowdRawFlow::k_bytesMask) this->data._bytes = htonl(this->data._bytes); if (flowIndex & CflowdRawFlow::k_startTimeMask) this->data._startTime = htonl(this->data._startTime); if (flowIndex & CflowdRawFlow::k_endTimeMask) this->data._endTime = htonl(this->data._endTime); if (flowIndex & CflowdRawFlow::k_srcAsMask) this->data._srcAs = htons(this->data._srcAs); if (flowIndex & CflowdRawFlow::k_dstAsMask) this->data._dstAs = htons(this->data._dstAs); this->data._isHostOrder = false; return; } //------------------------------------------------------------------------- // ostream & CflowdRawFlow::Write(ostream & os) const //......................................................................... // //------------------------------------------------------------------------- ostream & CflowdRawFlow::Write(ostream & os) const { CflowdRawFlow rawFlow = *this; const _data_t *dataPtr = rawFlow.DataPtr(); index_type flowIndex = rawFlow.Index(); rawFlow.ToNetworkByteOrder(); os.write((char *)&(dataPtr->_index),sizeof(dataPtr->_index)); if (flowIndex & CflowdRawFlow::k_routerMask) os.write((char *)&(dataPtr->_router),sizeof(dataPtr->_router)); if (flowIndex & CflowdRawFlow::k_srcIpAddrMask) os.write((char *)&(dataPtr->_srcIpAddr),sizeof(dataPtr->_srcIpAddr)); if (flowIndex & CflowdRawFlow::k_dstIpAddrMask) os.write((char *)&(dataPtr->_dstIpAddr),sizeof(dataPtr->_dstIpAddr)); if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) os.write((char *)&(dataPtr->_inputIfIndex),sizeof(dataPtr->_inputIfIndex)); if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) os.write((char *)&(dataPtr->_outputIfIndex),sizeof(dataPtr->_outputIfIndex)); if (flowIndex & CflowdRawFlow::k_srcPortMask) os.write((char *)&(dataPtr->_srcPort),sizeof(dataPtr->_srcPort)); if (flowIndex & CflowdRawFlow::k_dstPortMask) os.write((char *)&(dataPtr->_dstPort),sizeof(dataPtr->_dstPort)); if (flowIndex & CflowdRawFlow::k_pktsMask) os.write((char *)&(dataPtr->_pkts),sizeof(dataPtr->_pkts)); if (flowIndex & CflowdRawFlow::k_bytesMask) os.write((char *)&(dataPtr->_bytes),sizeof(dataPtr->_bytes)); if (flowIndex & CflowdRawFlow::k_ipNextHopMask) os.write((char *)&(dataPtr->_ipNextHop),sizeof(dataPtr->_ipNextHop)); if (flowIndex & CflowdRawFlow::k_startTimeMask) os.write((char *)&(dataPtr->_startTime),sizeof(dataPtr->_startTime)); if (flowIndex & CflowdRawFlow::k_endTimeMask) os.write((char *)&(dataPtr->_endTime),sizeof(dataPtr->_endTime)); if (flowIndex & CflowdRawFlow::k_protocolMask) os.write((char *)&(dataPtr->_protocol),sizeof(dataPtr->_protocol)); if (flowIndex & CflowdRawFlow::k_tosMask) os.write((char *)&(dataPtr->_tos),sizeof(dataPtr->_tos)); if (flowIndex & CflowdRawFlow::k_srcAsMask) os.write((char *)&(dataPtr->_srcAs),sizeof(dataPtr->_srcAs)); if (flowIndex & CflowdRawFlow::k_dstAsMask) os.write((char *)&(dataPtr->_dstAs),sizeof(dataPtr->_dstAs)); if (flowIndex & CflowdRawFlow::k_srcMaskLenMask) os.write((char *)&(dataPtr->_srcMaskLen),sizeof(dataPtr->_srcMaskLen)); if (flowIndex & CflowdRawFlow::k_dstMaskLenMask) os.write((char *)&(dataPtr->_dstMaskLen),sizeof(dataPtr->_dstMaskLen)); if (flowIndex & CflowdRawFlow::k_tcpFlagsMask) os.write((char *)&(dataPtr->_tcpFlags),sizeof(dataPtr->_tcpFlags)); if (flowIndex & CflowdRawFlow::k_inputEncapMask) os.write((char *)&(dataPtr->_inputEncap),sizeof(dataPtr->_inputEncap)); if (flowIndex & CflowdRawFlow::k_outputEncapMask) os.write((char *)&(dataPtr->_outputEncap),sizeof(dataPtr->_outputEncap)); if (flowIndex & CflowdRawFlow::k_peerNextHopMask) os.write((char *)&(dataPtr->_peerNextHop),sizeof(dataPtr->_peerNextHop)); if (flowIndex & CflowdRawFlow::k_engineTypeMask) os.write((char *)&(dataPtr->_engineType),sizeof(dataPtr->_engineType)); if (flowIndex & CflowdRawFlow::k_engineIdMask) os.write((char *)&(dataPtr->_engineId),sizeof(dataPtr->_engineId)); return(os); } #ifndef CANT_GATHER_SCATTER_SOCKIO //------------------------------------------------------------------------- // int CflowdRawFlow::Write(int fd) const //......................................................................... // //------------------------------------------------------------------------- int CflowdRawFlow::Write(int fd) const { struct iovec iov[sizeof(CflowdRawFlow::index_type) * 8 + 1]; int rc; int bytesWritten = 0; int iovecCount = 0; ssize_t writevBytesExpected = 0; index_type flowIndex; CflowdRawFlow rawFlow = *this; const _data_t *dataPtr = rawFlow.DataPtr(); flowIndex = htonl(dataPtr->_index); rc = ::write(fd,&flowIndex,sizeof(flowIndex)); if (rc < (int)sizeof(flowIndex)) { syslog(LOG_ERR,"[E] write(%d,%p,%d) failed: %m {%s:%d}", fd,&flowIndex,sizeof(flowIndex),__FILE__,__LINE__); return(-1); } flowIndex = this->Index(); // now set up my iovec if (flowIndex & CflowdRawFlow::k_routerMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_router); iov[iovecCount].iov_len = sizeof(dataPtr->_router); writevBytesExpected += sizeof(dataPtr->_router); iovecCount++; } if (flowIndex & CflowdRawFlow::k_srcIpAddrMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_srcIpAddr); iov[iovecCount].iov_len = sizeof(dataPtr->_srcIpAddr); writevBytesExpected += sizeof(dataPtr->_srcIpAddr); iovecCount++; } if (flowIndex & CflowdRawFlow::k_dstIpAddrMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_dstIpAddr); iov[iovecCount].iov_len = sizeof(dataPtr->_dstIpAddr); writevBytesExpected += sizeof(dataPtr->_dstIpAddr); iovecCount++; } if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_inputIfIndex); iov[iovecCount].iov_len = sizeof(dataPtr->_inputIfIndex); writevBytesExpected += sizeof(dataPtr->_inputIfIndex); iovecCount++; } if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_outputIfIndex); iov[iovecCount].iov_len = sizeof(dataPtr->_outputIfIndex); writevBytesExpected += sizeof(dataPtr->_outputIfIndex); iovecCount++; } if (flowIndex & CflowdRawFlow::k_srcPortMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_srcPort); iov[iovecCount].iov_len = sizeof(dataPtr->_srcPort); writevBytesExpected += sizeof(dataPtr->_srcPort); iovecCount++; } if (flowIndex & CflowdRawFlow::k_dstPortMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_dstPort); iov[iovecCount].iov_len = sizeof(dataPtr->_dstPort); writevBytesExpected += sizeof(dataPtr->_dstPort); iovecCount++; } if (flowIndex & CflowdRawFlow::k_pktsMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_pkts); iov[iovecCount].iov_len = sizeof(dataPtr->_pkts); writevBytesExpected += sizeof(dataPtr->_pkts); iovecCount++; } if (flowIndex & CflowdRawFlow::k_bytesMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_bytes); iov[iovecCount].iov_len = sizeof(dataPtr->_bytes); writevBytesExpected += sizeof(dataPtr->_bytes); iovecCount++; } if (flowIndex & CflowdRawFlow::k_ipNextHopMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_ipNextHop); iov[iovecCount].iov_len = sizeof(dataPtr->_ipNextHop); writevBytesExpected += sizeof(dataPtr->_ipNextHop); iovecCount++; } if (flowIndex & CflowdRawFlow::k_startTimeMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_startTime); iov[iovecCount].iov_len = sizeof(dataPtr->_startTime); writevBytesExpected += sizeof(dataPtr->_startTime); iovecCount++; } if (flowIndex & CflowdRawFlow::k_endTimeMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_endTime); iov[iovecCount].iov_len = sizeof(dataPtr->_endTime); writevBytesExpected += sizeof(dataPtr->_endTime); iovecCount++; } if (flowIndex & CflowdRawFlow::k_protocolMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_protocol); iov[iovecCount].iov_len = sizeof(dataPtr->_protocol); writevBytesExpected += sizeof(dataPtr->_protocol); iovecCount++; } if (flowIndex & CflowdRawFlow::k_tosMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_tos); iov[iovecCount].iov_len = sizeof(dataPtr->_tos); writevBytesExpected += sizeof(dataPtr->_tos); iovecCount++; } if (flowIndex & CflowdRawFlow::k_srcAsMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_srcAs); iov[iovecCount].iov_len = sizeof(dataPtr->_srcAs); writevBytesExpected += sizeof(dataPtr->_srcAs); iovecCount++; } if (flowIndex & CflowdRawFlow::k_dstAsMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_dstAs); iov[iovecCount].iov_len = sizeof(dataPtr->_dstAs); writevBytesExpected += sizeof(dataPtr->_dstAs); iovecCount++; } if (flowIndex & CflowdRawFlow::k_srcMaskLenMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_srcMaskLen); iov[iovecCount].iov_len = sizeof(dataPtr->_srcMaskLen); writevBytesExpected += sizeof(dataPtr->_srcMaskLen); iovecCount++; } if (flowIndex & CflowdRawFlow::k_dstMaskLenMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_dstMaskLen); iov[iovecCount].iov_len = sizeof(dataPtr->_dstMaskLen); writevBytesExpected += sizeof(dataPtr->_dstMaskLen); iovecCount++; } if (flowIndex & CflowdRawFlow::k_tcpFlagsMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_tcpFlags); iov[iovecCount].iov_len = sizeof(dataPtr->_tcpFlags); writevBytesExpected += sizeof(dataPtr->_tcpFlags); iovecCount++; } if (flowIndex & CflowdRawFlow::k_inputEncapMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_inputEncap); iov[iovecCount].iov_len = sizeof(dataPtr->_inputEncap); writevBytesExpected += sizeof(dataPtr->_inputEncap); iovecCount++; } if (flowIndex & CflowdRawFlow::k_outputEncapMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_outputEncap); iov[iovecCount].iov_len = sizeof(dataPtr->_outputEncap); writevBytesExpected += sizeof(dataPtr->_outputEncap); iovecCount++; } if (flowIndex & CflowdRawFlow::k_peerNextHopMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_peerNextHop); iov[iovecCount].iov_len = sizeof(dataPtr->_peerNextHop); writevBytesExpected += sizeof(dataPtr->_peerNextHop); iovecCount++; } if (flowIndex & CflowdRawFlow::k_engineTypeMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_engineType); iov[iovecCount].iov_len = sizeof(dataPtr->_engineType); writevBytesExpected += sizeof(dataPtr->_engineType); iovecCount++; } if (flowIndex & CflowdRawFlow::k_engineIdMask) { iov[iovecCount].iov_base = (char *)&(dataPtr->_engineId); iov[iovecCount].iov_len = sizeof(dataPtr->_engineId); writevBytesExpected += sizeof(dataPtr->_engineId); iovecCount++; } // Now I'm ready to write and return. int writevRc; rawFlow.ToNetworkByteOrder(); if ((writevRc = writev(fd,iov,iovecCount)) < writevBytesExpected) { syslog(LOG_ERR,"[E] writev(%d,iov,%d) failed: %m {%s:%d}", fd,iovecCount,__FILE__,__LINE__); return(-1); } else { bytesWritten += writevBytesExpected; return(bytesWritten); } } #else // We can't use writev() on a socket. :-( //------------------------------------------------------------------------- // int CflowdRawFlow::Write(int fd) const //......................................................................... // //------------------------------------------------------------------------- int CflowdRawFlow::Write(int fd) const { uint8_t buf[2048]; uint8_t *bufPtr = buf; int rc; int bytesWritten = 0; int iovecCount = 0; ssize_t writeBytesExpected = 0; index_type flowIndex; CflowdRawFlow rawFlow = *this; CflowdRawFlow::_data_t *dataPtr; dataPtr = (CflowdRawFlow::_data_t *)rawFlow.DataPtr(); flowIndex = this->Index(); rawFlow.ToNetworkByteOrder(); memcpy(bufPtr,&dataPtr->_index,sizeof(dataPtr->_index)); bufPtr += sizeof(dataPtr->_index); // now set up my iovec if (flowIndex & CflowdRawFlow::k_routerMask) { memcpy(bufPtr,&dataPtr->_router,sizeof(dataPtr->_router)); bufPtr += sizeof(dataPtr->_router); } if (flowIndex & CflowdRawFlow::k_srcIpAddrMask) { memcpy(bufPtr,&(dataPtr->_srcIpAddr),sizeof(dataPtr->_srcIpAddr)); bufPtr += sizeof(dataPtr->_srcIpAddr); } if (flowIndex & CflowdRawFlow::k_dstIpAddrMask) { memcpy(bufPtr,&(dataPtr->_dstIpAddr),sizeof(dataPtr->_dstIpAddr)); bufPtr += sizeof(dataPtr->_dstIpAddr); } if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) { memcpy(bufPtr,&(dataPtr->_inputIfIndex),sizeof(dataPtr->_inputIfIndex)); bufPtr += sizeof(dataPtr->_inputIfIndex); } if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) { memcpy(bufPtr,&(dataPtr->_outputIfIndex),sizeof(dataPtr->_outputIfIndex)); bufPtr += sizeof(dataPtr->_outputIfIndex); } if (flowIndex & CflowdRawFlow::k_srcPortMask) { memcpy(bufPtr,&(dataPtr->_srcPort),sizeof(dataPtr->_srcPort)); bufPtr += sizeof(dataPtr->_srcPort); } if (flowIndex & CflowdRawFlow::k_dstPortMask) { memcpy(bufPtr,&(dataPtr->_dstPort),sizeof(dataPtr->_dstPort)); bufPtr += sizeof(dataPtr->_dstPort); } if (flowIndex & CflowdRawFlow::k_pktsMask) { memcpy(bufPtr,&(dataPtr->_pkts),sizeof(dataPtr->_pkts)); bufPtr += sizeof(dataPtr->_pkts); } if (flowIndex & CflowdRawFlow::k_bytesMask) { memcpy(bufPtr,&(dataPtr->_bytes),sizeof(dataPtr->_bytes)); bufPtr += sizeof(dataPtr->_bytes); } if (flowIndex & CflowdRawFlow::k_ipNextHopMask) { memcpy(bufPtr,&(dataPtr->_ipNextHop),sizeof(dataPtr->_ipNextHop)); bufPtr += sizeof(dataPtr->_ipNextHop); } if (flowIndex & CflowdRawFlow::k_startTimeMask) { memcpy(bufPtr,&(dataPtr->_startTime),sizeof(dataPtr->_startTime)); bufPtr += sizeof(dataPtr->_startTime); } if (flowIndex & CflowdRawFlow::k_endTimeMask) { memcpy(bufPtr,&(dataPtr->_endTime),sizeof(dataPtr->_endTime)); bufPtr += sizeof(dataPtr->_endTime); } if (flowIndex & CflowdRawFlow::k_protocolMask) { memcpy(bufPtr,&(dataPtr->_protocol),sizeof(dataPtr->_protocol)); bufPtr += sizeof(dataPtr->_protocol); } if (flowIndex & CflowdRawFlow::k_tosMask) { memcpy(bufPtr,&(dataPtr->_tos),sizeof(dataPtr->_tos)); bufPtr += sizeof(dataPtr->_tos); } if (flowIndex & CflowdRawFlow::k_srcAsMask) { memcpy(bufPtr,&(dataPtr->_srcAs),sizeof(dataPtr->_srcAs)); bufPtr += sizeof(dataPtr->_srcAs); } if (flowIndex & CflowdRawFlow::k_dstAsMask) { memcpy(bufPtr,&(dataPtr->_dstAs),sizeof(dataPtr->_dstAs)); bufPtr += sizeof(dataPtr->_dstAs); } if (flowIndex & CflowdRawFlow::k_srcMaskLenMask) { memcpy(bufPtr,&(dataPtr->_srcMaskLen),sizeof(dataPtr->_srcMaskLen)); bufPtr += sizeof(dataPtr->_srcMaskLen); } if (flowIndex & CflowdRawFlow::k_dstMaskLenMask) { memcpy(bufPtr,&(dataPtr->_dstMaskLen),sizeof(dataPtr->_dstMaskLen)); bufPtr += sizeof(dataPtr->_dstMaskLen); } if (flowIndex & CflowdRawFlow::k_tcpFlagsMask) { memcpy(bufPtr,&(dataPtr->_tcpFlags),sizeof(dataPtr->_tcpFlags)); bufPtr += sizeof(dataPtr->_tcpFlags); } if (flowIndex & CflowdRawFlow::k_inputEncapMask) { memcpy(bufPtr,&(dataPtr->_inputEncap),sizeof(dataPtr->_inputEncap)); bufPtr += sizeof(dataPtr->_inputEncap); } if (flowIndex & CflowdRawFlow::k_outputEncapMask) { memcpy(bufPtr,&(dataPtr->_outputEncap),sizeof(dataPtr->_outputEncap)); bufPtr += sizeof(dataPtr->_outputEncap); } if (flowIndex & CflowdRawFlow::k_peerNextHopMask) { memcpy(bufPtr,&(dataPtr->_peerNextHop),sizeof(dataPtr->_peerNextHop)); bufPtr += sizeof(dataPtr->_peerNextHop); } if (flowIndex & CflowdRawFlow::k_engineTypeMask) { memcpy(bufPtr,&(dataPtr->_engineType),sizeof(dataPtr->_engineType)); bufPtr += sizeof(dataPtr->_engineType); } if (flowIndex & CflowdRawFlow::k_engineIdMask) { memcpy(bufPtr,&(dataPtr->_engineId),sizeof(dataPtr->_engineId)); bufPtr += sizeof(dataPtr->_engineId); } // Now I'm ready to write and return. if ((rc = g_CfdArtsPrimitive.FdWrite(fd,buf,bufPtr-buf)) < (bufPtr-buf)) { syslog(LOG_ERR,"[E] FdWrite(%d,%p,%d) failed: %m {%s:%d}", fd,buf,bufPtr - buf,__FILE__,__LINE__); return(-1); } return(bufPtr - buf); } #endif // CANT_GATHER_SCATTER_SOCKIO //------------------------------------------------------------------------- // int CflowdRawFlow::Write(caddr_t & memoryAddr) const //......................................................................... // //------------------------------------------------------------------------- int CflowdRawFlow::Write(caddr_t & memoryAddr) const { index_type flowIndex = this->Index(); caddr_t origMemoryAddr = memoryAddr; CflowdRawFlow rawFlow = *this; const _data_t *dataPtr = rawFlow.DataPtr(); rawFlow.ToNetworkByteOrder(); memcpy(memoryAddr,&(dataPtr->_index),sizeof(dataPtr->_index)); memoryAddr += sizeof(dataPtr->_index); if (flowIndex & CflowdRawFlow::k_routerMask) { memcpy(memoryAddr,&(dataPtr->_router),sizeof(dataPtr->_router)); memoryAddr += sizeof(dataPtr->_router); } if (flowIndex & CflowdRawFlow::k_srcIpAddrMask) { memcpy(memoryAddr,&(dataPtr->_srcIpAddr),sizeof(dataPtr->_srcIpAddr)); memoryAddr += sizeof(dataPtr->_srcIpAddr); } if (flowIndex & CflowdRawFlow::k_dstIpAddrMask) { memcpy(memoryAddr,&(dataPtr->_dstIpAddr),sizeof(dataPtr->_dstIpAddr)); memoryAddr += sizeof(dataPtr->_dstIpAddr); } if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) { memcpy(memoryAddr,&(dataPtr->_inputIfIndex), sizeof(dataPtr->_inputIfIndex)); memoryAddr += sizeof(dataPtr->_inputIfIndex); } if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) { memcpy(memoryAddr,&(dataPtr->_outputIfIndex), sizeof(dataPtr->_outputIfIndex)); memoryAddr += sizeof(dataPtr->_outputIfIndex); } if (flowIndex & CflowdRawFlow::k_srcPortMask) { memcpy(memoryAddr,&(dataPtr->_srcPort),sizeof(dataPtr->_srcPort)); memoryAddr += sizeof(dataPtr->_srcPort); } if (flowIndex & CflowdRawFlow::k_dstPortMask) { memcpy(memoryAddr,&(dataPtr->_dstPort),sizeof(dataPtr->_dstPort)); memoryAddr += sizeof(dataPtr->_dstPort); } if (flowIndex & CflowdRawFlow::k_pktsMask) { memcpy(memoryAddr,&(dataPtr->_pkts),sizeof(dataPtr->_pkts)); memoryAddr += sizeof(dataPtr->_pkts); } if (flowIndex & CflowdRawFlow::k_bytesMask) { memcpy(memoryAddr,&(dataPtr->_bytes),sizeof(dataPtr->_bytes)); memoryAddr += sizeof(dataPtr->_bytes); } if (flowIndex & CflowdRawFlow::k_ipNextHopMask) { memcpy(memoryAddr,&(dataPtr->_ipNextHop),sizeof(dataPtr->_ipNextHop)); memoryAddr += sizeof(dataPtr->_ipNextHop); } if (flowIndex & CflowdRawFlow::k_startTimeMask) { memcpy(memoryAddr,&(dataPtr->_startTime),sizeof(dataPtr->_startTime)); memoryAddr += sizeof(dataPtr->_startTime); } if (flowIndex & CflowdRawFlow::k_endTimeMask) { memcpy(memoryAddr,&(dataPtr->_endTime),sizeof(dataPtr->_endTime)); memoryAddr += sizeof(dataPtr->_endTime); } if (flowIndex & CflowdRawFlow::k_protocolMask) { memcpy(memoryAddr,&(dataPtr->_protocol),sizeof(dataPtr->_protocol)); memoryAddr += sizeof(dataPtr->_protocol); } if (flowIndex & CflowdRawFlow::k_tosMask) { memcpy(memoryAddr,&(dataPtr->_tos),sizeof(dataPtr->_tos)); memoryAddr += sizeof(dataPtr->_tos); } if (flowIndex & CflowdRawFlow::k_srcAsMask) { memcpy(memoryAddr,&(dataPtr->_srcAs),sizeof(dataPtr->_srcAs)); memoryAddr += sizeof(dataPtr->_srcAs); } if (flowIndex & CflowdRawFlow::k_dstAsMask) { memcpy(memoryAddr,&(dataPtr->_dstAs),sizeof(dataPtr->_dstAs)); memoryAddr += sizeof(dataPtr->_dstAs); } if (flowIndex & CflowdRawFlow::k_srcMaskLenMask) { memcpy(memoryAddr,&(dataPtr->_srcMaskLen), sizeof(dataPtr->_srcMaskLen)); memoryAddr += sizeof(dataPtr->_srcMaskLen); } if (flowIndex & CflowdRawFlow::k_dstMaskLenMask) { memcpy(memoryAddr,&(dataPtr->_dstMaskLen), sizeof(dataPtr->_dstMaskLen)); memoryAddr += sizeof(dataPtr->_dstMaskLen); } if (flowIndex & CflowdRawFlow::k_tcpFlagsMask) { memcpy(memoryAddr,&(dataPtr->_tcpFlags),sizeof(dataPtr->_tcpFlags)); memoryAddr += sizeof(dataPtr->_tcpFlags); } if (flowIndex & CflowdRawFlow::k_inputEncapMask) { memcpy(memoryAddr,&(dataPtr->_inputEncap), sizeof(dataPtr->_inputEncap)); memoryAddr += sizeof(dataPtr->_inputEncap); } if (flowIndex & CflowdRawFlow::k_outputEncapMask) { memcpy(memoryAddr,&(dataPtr->_outputEncap), sizeof(dataPtr->_outputEncap)); memoryAddr += sizeof(dataPtr->_outputEncap); } if (flowIndex & CflowdRawFlow::k_peerNextHopMask) { memcpy(memoryAddr,&(dataPtr->_peerNextHop), sizeof(dataPtr->_peerNextHop)); memoryAddr += sizeof(dataPtr->_peerNextHop); } if (flowIndex & CflowdRawFlow::k_engineTypeMask) { memcpy(memoryAddr,&(dataPtr->_engineType), sizeof(dataPtr->_engineType)); memoryAddr += sizeof(dataPtr->_engineType); } if (flowIndex & CflowdRawFlow::k_engineIdMask) { memcpy(memoryAddr,&(dataPtr->_engineId),sizeof(dataPtr->_engineId)); memoryAddr += sizeof(dataPtr->_engineId); } return(memoryAddr - origMemoryAddr); } //------------------------------------------------------------------------- // ostream& operator << (ostream& os, // const CflowdRawFlow & flow) //......................................................................... // //------------------------------------------------------------------------- ostream& operator << (ostream& os, const CflowdRawFlow & flow) { CflowdRawFlow::index_type flowIndex = flow.Index(); struct in_addr inAddr; os << "FLOW" << endl << " index: 0x" << hex << flow.Index() << dec << endl; if (flowIndex & CflowdRawFlow::k_routerMask) { inAddr.s_addr = flow.Router(); os << " router: " << inet_ntoa(inAddr) << endl; } if (flowIndex & CflowdRawFlow::k_srcIpAddrMask) { inAddr.s_addr = flow.SrcIpAddr(); os << " src IP: " << inet_ntoa(inAddr) << endl; } if (flowIndex & CflowdRawFlow::k_dstIpAddrMask) { inAddr.s_addr = flow.DstIpAddr(); os << " dst IP: " << inet_ntoa(inAddr) << endl; } if (flowIndex & CflowdRawFlow::k_inputIfIndexMask) os << " input ifIndex: " << flow.InputIfIndex() << endl; if (flowIndex & CflowdRawFlow::k_outputIfIndexMask) os << " output ifIndex: " << flow.OutputIfIndex() << endl; if (flowIndex & CflowdRawFlow::k_srcPortMask) os << " src port: " << flow.SrcPort() << endl; if (flowIndex & CflowdRawFlow::k_dstPortMask) os << " dst port: " << flow.DstPort() << endl; if (flowIndex & CflowdRawFlow::k_pktsMask) os << " pkts: " << flow.Pkts() << endl; if (flowIndex & CflowdRawFlow::k_bytesMask) os << " bytes: " << flow.Bytes() << endl; if (flowIndex & CflowdRawFlow::k_ipNextHopMask) { inAddr.s_addr = flow.IpNextHop(); os << " IP nexthop: " << inet_ntoa(inAddr) << endl; } if (flowIndex & CflowdRawFlow::k_startTimeMask) { time_t unixTime = flow.StartTime(); os << " start time: " << ctime(&unixTime); } if (flowIndex & CflowdRawFlow::k_endTimeMask) { time_t unixTime = flow.EndTime(); os << " end time: " << ctime(&unixTime); } if (flowIndex & CflowdRawFlow::k_protocolMask) os << " protocol: " << (uint16_t)flow.Protocol() << endl; if (flowIndex & CflowdRawFlow::k_tosMask) os << " tos: " << (uint16_t)flow.Tos() << endl; if (flowIndex & CflowdRawFlow::k_srcAsMask) os << " src AS: " << flow.SrcAs() << endl; if (flowIndex & CflowdRawFlow::k_dstAsMask) os << " dst AS: " << flow.DstAs() << endl; if (flowIndex & CflowdRawFlow::k_srcMaskLenMask) os << " src masklen: " << (uint16_t)flow.SrcMaskLen() << endl; if (flowIndex & CflowdRawFlow::k_dstMaskLenMask) os << " dst masklen: " << (uint16_t)flow.DstMaskLen() << endl; if (flowIndex & CflowdRawFlow::k_tcpFlagsMask) os << " TCP flags: 0x" << hex << (uint16_t)flow.TcpFlags() << dec << endl; if (flowIndex & CflowdRawFlow::k_inputEncapMask) os << " input encaps: " << (uint16_t)flow.InputEncap() << endl; if (flowIndex & CflowdRawFlow::k_outputEncapMask) os << " output encaps: " << (uint16_t)flow.OutputEncap() << endl; if (flowIndex & CflowdRawFlow::k_peerNextHopMask) { inAddr.s_addr = flow.PeerNextHop(); os << " peer nexthop: " << inet_ntoa(inAddr) << endl; } if (flowIndex & CflowdRawFlow::k_engineTypeMask) os << " engine type: " << (uint16_t)flow.EngineType() << endl; if (flowIndex & CflowdRawFlow::k_engineIdMask) os << " engine id: " << (uint16_t)flow.EngineId() << endl; return(os); }