/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "xstd/h/os_std.h" #include "xstd/h/string.h" #include "xstd/h/signal.h" #include "xstd/h/netinet.h" #include "xstd/h/iostream.h" #include "xstd/h/sstream.h" #include "xstd/h/iomanip.h" #include "xstd/Assert.h" #include "xstd/Socket.h" #include "xstd/Clock.h" #include "xstd/Poll.h" #include "xstd/gadgets.h" #include "base/polyLogCats.h" #include "base/polyVersion.h" #include "runtime/NotifMsg.h" // maintains info about a host class Client: public FileScanUser { public: Client(Socket aFD, const NetAddr &anAddr); virtual ~Client(); void append(const StatusNotifMsg &msg, const NetAddr &sndAddr); virtual void noteWriteReady(int fd); protected: void measStr(const char *key, const char *str); void measTime(const char *key, Time tm); void measDouble(const char *key, const NetDouble &d); void measInt(const char *key, int n); void selfDistruct(); protected: Socket theSock; NetAddr theAddr; // unique host address FileScanReserv theReserv; char theBuf[16*1024]; Size theSize; ostream *theMeasBuf; // temporary buffer to build measurement message }; class TcpServer: public FileScanUser { public: TcpServer(int port); virtual ~TcpServer(); virtual void noteReadReady(int fd); protected: Socket theSock; FileScanReserv theReserv; }; class XmlServer: public FileScanUser { public: XmlServer(int port); virtual ~XmlServer(); virtual void noteReadReady(int fd); protected: Socket theSock; FileScanReserv theReserv; }; /* globals */ static Array TheClients; static FileScanner *TheScanner = 0; static int TheLiveCltCnt = 0; /* implementation */ /* Client */ Client::Client(Socket aFD, const NetAddr &anAddr): theSock(aFD), theAddr(anAddr), theSize(0), theMeasBuf(0) { theSock.blocking(false); TheLiveCltCnt++; } Client::~Client() { if (theReserv) TheScanner->clearRes(theReserv); TheLiveCltCnt--; } void Client::append(const StatusNotifMsg &msg, const NetAddr &sndAddr) { const Size spaceSize = SizeOf(theBuf) - theSize; if (spaceSize < SizeOf(theBuf) / 2) { selfDistruct(); return; } ofixedstream buf(theBuf + theSize, spaceSize); configureStream(buf, 4); theMeasBuf = &buf; buf << "" << endl; measStr("label", msg.theLabel); measTime("start_time", msg.theStartTime); measTime("send_time", msg.theSndTime); measTime("response_time", msg.theRespTime); measDouble("req_rate", msg.theReqRate); measDouble("rep_rate", msg.theRepRate); measDouble("bwidth", msg.theBwidth); measDouble("dhr", msg.theDHR); measDouble("conn_use", msg.theConnUse); measDouble("error_ratio", msg.theErrRatio); measInt("xact_count", msg.theXactTotCnt); measInt("error_count", msg.theErrTotCnt); measInt("socket_level", msg.theSockInstCnt); measStr("side", msg.theCat == lgcCltSide ? "client" : "server"); buf << "" << endl; theMeasBuf = 0; if (!buf) { selfDistruct(); return; } theSize += Size(buf.tellp()); if (!theReserv) theReserv = TheScanner->setFD(theSock.fd(), dirWrite, this); } void Client::noteWriteReady(int) { if (theSize) { const int sz = theSock.write(theBuf, theSize); if (sz < 0) { if (Error::LastExcept(EWOULDBLOCK)) selfDistruct(); return; } theSize -= sz; memmove(theBuf, theBuf+sz, theSize); } if (!theSize) TheScanner->clearRes(theReserv); } void Client::selfDistruct() { for (int i = 0; i < TheClients.count(); ++i) if (TheClients[i] == this) { TheClients[i] = 0; delete this; return; } Assert(0); } void Client::measStr(const char *key, const char *str) { *theMeasBuf << "\t" << endl; } void Client::measTime(const char *key, Time tm) { *theMeasBuf << "\t" << endl; } void Client::measDouble(const char *key, const NetDouble &d) { *theMeasBuf << "\t" << endl; } void Client::measInt(const char *key, int n) { *theMeasBuf << "\t" << endl; } /* XmlServer */ XmlServer::XmlServer(int port) { // XXX hard-code IPv4 Must(theSock.create(InAddress::IPvFour().family())); Must(theSock.blocking(false)); theSock.reuseAddr(true); // XXX relies on InAddress class to zero address value, making it // equivalent to INADDR_ANY if (!theSock.bind(NetAddr(InAddress::IPvFour(), port)) || !theSock.listen()) { cerr << "cannot listen on TCP port " << port << ": " << Error::Last() << endl; exit(-2); } theReserv = TheScanner->setFD(theSock.fd(), dirRead, this); } XmlServer::~XmlServer() { if (theReserv) TheScanner->clearRes(theReserv); } void XmlServer::noteReadReady(int) { sockaddr_in addr; socklen_t addr_len = sizeof(addr); if (Socket s = theSock.accept((struct sockaddr*)&addr, &addr_len)) { Client *c = new Client(s, NetAddr( ((sockaddr_in &)addr).sin_addr, ((sockaddr_in &)addr).sin_port)); for (int i = 0; i < TheClients.count(); ++i) if (!TheClients[i]) { TheClients[i] = c; return; } TheClients.append(c); } } /* TcpServer */ TcpServer::TcpServer(int port) { Must(theSock.create(AF_INET, SOCK_DGRAM, 0)); theSock.reuseAddr(true); // XXX relies on InAddress class to zero address value, making it // equivalent to INADDR_ANY if (!theSock.bind(NetAddr(InAddress::IPvFour(), port))) { cerr << "cannot listen on UDP port " << port << ": " << Error::Last() << endl; exit(-2); } Must(theSock.blocking(false)); theReserv = TheScanner->setFD(theSock.fd(), dirRead, this); } TcpServer::~TcpServer() { if (theReserv) TheScanner->clearRes(theReserv); } void TcpServer::noteReadReady(int) { StatusNotifMsg msg; struct sockaddr_storage from; socklen_t fromlen = sizeof(from); if (recvfrom(theSock.fd(), (char*)&msg, sizeof(msg), 0, (sockaddr *) &from, &fromlen) == sizeof(msg)) { msg.ntoh(); for (int i = 0; i < TheClients.count(); ++i) if (TheClients[i]) TheClients[i]->append(msg, NetAddr(from, -1)); } } int main(int argc, char *argv[]) { (void)PolyVersion(); // note: we assume that TCP and UDP port namespaces are separate if (argc > 2 || (argc == 2 && !xatoi(argv[1]))) { cerr << "usage: " << argv[0] << " [udp_and_tcp_port]" << endl; return -1; } const int port = xatoi(argv[1], 18256); cout << "Starting UDP to TCP dispatcher on port " << port << endl; // detach ourselves if possible #if HAVE_FORK const pid_t pid = fork(); if (pid > 0) // parent return 0; if (pid < 0 || setsid() < 0) { // error cerr << "problems detaching the server process: " << Error::Last() << endl; return -1; } #endif TheScanner = new Poll(); TheScanner->configure(FD_SETSIZE); XmlServer *tcpSrv = new XmlServer(port); TcpServer *udpSrv = new TcpServer(port); signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); while (!TheScanner->idle()) { Clock::Update(); TheScanner->scan(0); } delete udpSrv; delete tcpSrv; delete TheScanner; return 0; }