//=========================================================================== // $Name: cflowd-2-1-b1 $ // $Id: cflowdmux.cc,v 1.31 2000/10/24 16:18:54 dwm Exp $ //=========================================================================== // CAIDA Copyright Notice // // By accessing this software, cflowd++, you are duly informed // of and agree to be bound by the conditions described below in this // notice: // // This software product, cflowd++, is developed by Daniel W. McRobb, and // copyrighted(C) 1998 by the University of California, San Diego // (UCSD), with all rights reserved. UCSD administers the CAIDA grant, // NCR-9711092, under which part of this code was developed. // // There is no charge for cflowd++ software. You can redistribute it // and/or modify it under the terms of the GNU General Public License, // v. 2 dated June 1991 which is incorporated by reference herein. // cflowd++ is distributed WITHOUT ANY WARRANTY, IMPLIED OR EXPRESS, OF // MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE or that the use // of it will not infringe on any third party's intellectual property // rights. // // You should have received a copy of the GNU GPL along with cflowd++. // Copies can also be obtained from: // // http://www.gnu.org/copyleft/gpl.html // // or by writing to: // // University of California, San Diego // // SDSC/CAIDA // 9500 Gilman Dr., MS-0505 // La Jolla, CA 92093 - 0505 USA // // Or contact: // // info@caida.org //=========================================================================== extern "C" { #include "aclocal.h" #include #ifdef HAVE_SYS_TIME_H #include #endif #include #include #include #include #include #ifdef HAVE_SYS_FILIO_H #define BSD_COMP 1 #include #endif #ifdef HAVE_SYS_TERMIOS_H #include #endif #include } #include "CflowdConfig.hh" #include "CflowdConfigLex.hh" #include "CflowdPacketQueue.hh" #include "CflowdRawFlowConverter.hh" #include "CflowdRawFlowClientList.hh" #include "CflowdVersion.hh" #include "Signal.hh" #include "cflowdmux.hh" static const string rcsid = "@(#) $Name: cflowd-2-1-b1 $ $Id: cflowdmux.cc,v 1.31 2000/10/24 16:18:54 dwm Exp $"; static const CflowdVersion g_cflowdVersion = CflowdVersion(rcsid); static const int k_maxPacketReads = 64; static CflowdRawFlowClientList g_rawFlowClients; static CflowdConfig g_cflowdConfig; static int g_rawClientListenFd = -1; static Signal g_sigTerm(SIGTERM); static char *g_configFilename = NULL; static bool g_cflowdmuxDebug = false; static CflowdPacketQueue g_packetQueue; //------------------------------------------------------------------------- // static int RecvFlowPacket(int fd, uint8_t *pkt, ipv4addr_t *ciscoIpAddr) //......................................................................... // Accepts and validates a flow-export packet, places it in pkt (which // needs to have enough space to hold a packet; 2048 bytes is // sufficient). Places the source address of the flow packet in // ciscoIpAddr. Returns the length of the packet if successful, -1 // on error. //------------------------------------------------------------------------- static int RecvFlowPacket(int fd, uint8_t *pkt, ipv4addr_t *ciscoIpAddr) { struct sockaddr_in ciscoAddr; socklen_t ciscoAddrLen = sizeof(ciscoAddr); int len; uint16_t exportVersion; uint16_t flowCount; uint16_t expectedLength = 0xffff; CiscoFlowHeaderV8_t *hdrPtrV8; len = recvfrom(fd,(char *)pkt,k_maxFlowPacketSize,0, (struct sockaddr *)&ciscoAddr,&ciscoAddrLen); if (len < 0) return(-1); *ciscoIpAddr = ciscoAddr.sin_addr.s_addr; if (len > sizeof(CiscoFlowHeaderV1_t)) { exportVersion = ntohs(*(uint16_t *)pkt); memcpy(&flowCount,&(pkt[2]),sizeof(flowCount)); flowCount = ntohs(flowCount); switch (exportVersion) { case 1: expectedLength = sizeof(CiscoFlowHeaderV1_t) + flowCount * sizeof(CiscoFlowEntryV1_t); break; case 5: expectedLength = sizeof(CiscoFlowHeaderV5_t) + flowCount * sizeof(CiscoFlowEntryV5_t); break; case 6: expectedLength = sizeof(CiscoFlowHeaderV6_t) + flowCount * sizeof(CiscoFlowEntryV6_t); break; case 8: hdrPtrV8 = (CiscoFlowHeaderV8_t *)(&pkt[0]); switch (hdrPtrV8->agg_method) { case k_CiscoV8FlowExportASAggType: expectedLength = (sizeof(CiscoFlowHeaderV8_t) + (flowCount * sizeof(CiscoFlowEntryV8AsAggV2_t))); break; case k_CiscoV8FlowExportProtocolPortAggType: expectedLength = (sizeof(CiscoFlowHeaderV8_t) + (flowCount * sizeof(CiscoFlowEntryV8ProtocolPortAggV2_t))); break; case k_CiscoV8FlowExportNetMatrixAggType: expectedLength = (sizeof(CiscoFlowHeaderV8_t) + (flowCount * sizeof(CiscoFlowEntryV8NetMatrixAggV2_t))); break; case k_CiscoV8FlowExportSrcNetAggType: expectedLength = (sizeof(CiscoFlowHeaderV8_t) + (flowCount * sizeof(CiscoFlowEntryV8SrcNetAggV2_t))); break; case k_CiscoV8FlowExportDstNetAggType: expectedLength = (sizeof(CiscoFlowHeaderV8_t) + (flowCount * sizeof(CiscoFlowEntryV8DstNetAggV2_t))); break; default: syslog(LOG_ERR, "[E] bogus agg_method (%d) for v8 flow export {%s:%d}", hdrPtrV8->agg_method,__FILE__,__LINE__); exportVersion = 0; expectedLength = 0xFFFF; break; } break; case 7: default: if (g_cflowdmuxDebug) { syslog(LOG_DEBUG, "[D] unrecognized flow-export version: %hu {%s:%d}", exportVersion,__FILE__,__LINE__); } exportVersion = 0; expectedLength = 0xFFFF; break; } if (len < expectedLength) { if (exportVersion) { syslog(LOG_ERR, "[E] short flow-export version %hu packet received from %s:" " %d < %hu {%s:%d}",exportVersion,inet_ntoa(ciscoAddr.sin_addr), len,expectedLength,__FILE__,__LINE__); } len = -1; } } else { if (len > 0) { syslog(LOG_ERR,"[E] received unrecognized packet from %s {%s:%d}", inet_ntoa(ciscoAddr.sin_addr),__FILE__,__LINE__); } len = -1; } return(len); } //------------------------------------------------------------------------- // static void SigHupHandler(int signalNumber) //......................................................................... // Stub signal handler for SIGHUP (we actually make SIGHUP be dealt // with synchronously and hence don't do anything special here). //------------------------------------------------------------------------- static void SigHupHandler(int signalNumber) { return; } //------------------------------------------------------------------------- // static int Reload() //......................................................................... // Reload our configuration. //------------------------------------------------------------------------- static int Reload() { g_cflowdConfig.FlowPortList().CloseAll(); g_cflowdConfig.Clear(); LoadConfigFile(g_configFilename,g_cflowdConfig); g_cflowdConfig.FlowPortList().OpenAll(); syslog(LOG_INFO,"[I] reloaded configuration from %s", g_configFilename); return(0); } //------------------------------------------------------------------------- // int MainLoop() //......................................................................... // Loop forever reading packets and placing them in shared memory. //------------------------------------------------------------------------- int MainLoop() { fd_set udpFdSet; fd_set clientFdSet; fd_set errFdSet; CflowdFlowPortList::iterator udpPortIter; Signal sigHup(SIGHUP); struct in_addr addrIn; CflowdCiscoMap::iterator ciscoMapIter; sigHup.InstallHandler(SigHupHandler); for (;;) { int selectRc; struct timeval mtv; int maxFd; sigHup.Unblock(); if (sigHup.Caught()) { Reload(); sigHup.Catch(); } sigHup.Block(); FD_ZERO(&errFdSet); FD_ZERO(&clientFdSet); FD_ZERO(&udpFdSet); for (udpPortIter = g_cflowdConfig.FlowPortList().begin(); udpPortIter != g_cflowdConfig.FlowPortList().end(); udpPortIter++) { int lowWater; int lowWaterLen = sizeof(lowWater); FD_SET((*udpPortIter).fd,&udpFdSet); } mtv.tv_sec = 2; mtv.tv_usec = 0; maxFd = g_cflowdConfig.FlowPortList().MaxFd(); if ((selectRc = select(maxFd+1,&udpFdSet,NULL,NULL,&mtv)) < 0) { if (errno != EINTR) { syslog(LOG_ERR,"[E] select error: %m {%s:%d}",__FILE__,__LINE__); } continue; } if (selectRc == 0) { // select timed out. Don't look at descriptors, but toggle buffers. uint8_t oldBuffer = g_packetQueue.CurrentBuffer(); g_packetQueue.ToggleBuffers(false); g_packetQueue.NumPackets(0); g_packetQueue.ReleaseLock(oldBuffer); continue; } if (time((time_t *)0) > (g_packetQueue.LastToggle() + 2)) { // 2 seconds have passed since we last toggled buffers, so // toggle them (so cflowd won't be stuck waiting on the // semaphore). uint8_t oldBuffer = g_packetQueue.CurrentBuffer(); g_packetQueue.ToggleBuffers(false); g_packetQueue.ReleaseLock(oldBuffer); g_packetQueue.NumPackets(0); } uint8_t packet[k_maxFlowPacketSize]; int dataReady; ipv4addr_t ciscoAddr; int recvRc; for (udpPortIter = g_cflowdConfig.FlowPortList().begin(); udpPortIter != g_cflowdConfig.FlowPortList().end(); udpPortIter++) { if (FD_ISSET((*udpPortIter).fd,&udpFdSet)) { // check for data ready on socket. // ioctl((*udpPortIter).fd,FIONREAD,(char *)&dataReady); int pktReads = 0; while (pktReads < k_maxPacketReads) { pktReads++; // read a flow packet if ((recvRc = RecvFlowPacket((*udpPortIter).fd,packet,&ciscoAddr)) <= 0) { pktReads = k_maxPacketReads; break; } dataReady -= recvRc; addrIn.s_addr = ciscoAddr; ciscoMapIter = g_cflowdConfig.CiscoMap().find(ciscoAddr); if (ciscoMapIter == g_cflowdConfig.CiscoMap().end()) { if (g_cflowdmuxDebug) syslog(LOG_ERR,"[E] received packet from a source for which" " we're not configured (%s)",inet_ntoa(addrIn)); continue; } g_packetQueue.Enqueue(ciscoAddr,packet,recvRc); } // while (pktReads < k_maxPacketReads) } // if (FD_ISSET((*udpPortIter).fd,&udpFdSet)) } // for (udpPortIter = g_cflowdConfig.FlowPortList().begin(); ...) } // for (;;) return(0); // NOT REACHED } //------------------------------------------------------------------------- // void Daemonize() //......................................................................... // //------------------------------------------------------------------------- void Daemonize() { int fd; // fork to run in background, have parent exit if (fork() != 0) exit(0); // disassociate from process group setpgid(0,getpid()); // ignore terminal I/O signals #ifdef SIGTTOU signal(SIGTTOU, SIG_IGN); #endif // disassociate from control terminal if ((fd = open("/dev/tty",O_RDWR)) >= 0) { ioctl(fd,TIOCNOTTY,(char *)0); close(fd); } return; } //------------------------------------------------------------------------- // static void DoNothing(int sigNum) //......................................................................... // //------------------------------------------------------------------------- static void DoNothing(int sigNum) { return; } //---------------------------------------------------------------------------- // static void DestroyPacketQueue() //............................................................................ // //---------------------------------------------------------------------------- static void DestroyPacketQueue() { g_packetQueue.Destroy(); return; } //------------------------------------------------------------------------- // static void HandleSigTerm(int sigNum) //......................................................................... // //------------------------------------------------------------------------- static void HandleSigTerm(int sigNum) { syslog(LOG_INFO,"[I] Received SIGTERM. Exiting."); DestroyPacketQueue(); exit(0); } //------------------------------------------------------------------------- // int main(int argc, char *argv[]) //......................................................................... // //------------------------------------------------------------------------- int main(int argc, char *argv[]) { extern int optind; extern char *optarg; int optChar; while ((optChar = getopt(argc,argv,"D")) != EOF) { switch (optChar) { case 'D': g_cflowdmuxDebug = true; break; default: break; } } argc -= optind; argv += optind; if (argc >= 1) { g_configFilename = argv[argc-1]; } else { g_configFilename = (char *)strdup(CflowdConfig::k_defaultCflowdConfigFile.c_str()); } g_sigTerm.InstallHandler(HandleSigTerm); Daemonize(); LoadConfigFile(g_configFilename,g_cflowdConfig); openlog("cflowdmux",LOG_PID,g_cflowdConfig.LogFacility()); syslog(LOG_INFO,"[I] cflowdmux (version %s) started.", g_cflowdVersion.Name().c_str()); if (g_packetQueue.Create(g_configFilename,g_cflowdConfig.PacketBufSize()) < 0) { syslog(LOG_ALERT,"[A] g_packetQueue.Create(\"%s\",%d) failed: %m {%s:%d}", g_configFilename,g_cflowdConfig.PacketBufSize(),__FILE__,__LINE__); syslog(LOG_ALERT,"[A] failed to create packet queue! Exiting. {%s:%d}", __FILE__,__LINE__); exit(1); } atexit(DestroyPacketQueue); g_cflowdConfig.FlowPortList().OpenAll(); MainLoop(); exit(0); }