/* Web Polygraph http://www.web-polygraph.org/
* (C) 2003-2006 The Measurement Factory
* Licensed under the Apache License, Version 2.0 */
#include "base/polygraph.h"
#include "xstd/h/os_std.h"
#include "xstd/h/string.h"
#include "xstd/h/signal.h"
#include "xstd/h/netinet.h"
#include "xstd/h/iostream.h"
#include "xstd/h/sstream.h"
#include "xstd/h/iomanip.h"
#include "xstd/Assert.h"
#include "xstd/Socket.h"
#include "xstd/Clock.h"
#include "xstd/Poll.h"
#include "xstd/gadgets.h"
#include "base/polyLogCats.h"
#include "base/polyVersion.h"
#include "runtime/NotifMsg.h"
// maintains info about a host
class Client: public FileScanUser {
public:
Client(Socket aFD, const NetAddr &anAddr);
virtual ~Client();
void append(const StatusNotifMsg &msg, const NetAddr &sndAddr);
virtual void noteWriteReady(int fd);
protected:
void measStr(const char *key, const char *str);
void measTime(const char *key, Time tm);
void measDouble(const char *key, const NetDouble &d);
void measInt(const char *key, int n);
void selfDistruct();
protected:
Socket theSock;
NetAddr theAddr; // unique host address
FileScanReserv theReserv;
char theBuf[16*1024];
Size theSize;
ostream *theMeasBuf; // temporary buffer to build measurement message
};
class TcpServer: public FileScanUser {
public:
TcpServer(int port);
virtual ~TcpServer();
virtual void noteReadReady(int fd);
protected:
Socket theSock;
FileScanReserv theReserv;
};
class XmlServer: public FileScanUser {
public:
XmlServer(int port);
virtual ~XmlServer();
virtual void noteReadReady(int fd);
protected:
Socket theSock;
FileScanReserv theReserv;
};
/* globals */
static Array<Client*> TheClients;
static FileScanner *TheScanner = 0;
static int TheLiveCltCnt = 0;
/* implementation */
/* Client */
Client::Client(Socket aFD, const NetAddr &anAddr):
theSock(aFD), theAddr(anAddr), theSize(0), theMeasBuf(0) {
theSock.blocking(false);
TheLiveCltCnt++;
}
Client::~Client() {
if (theReserv)
TheScanner->clearRes(theReserv);
TheLiveCltCnt--;
}
void Client::append(const StatusNotifMsg &msg, const NetAddr &sndAddr) {
const Size spaceSize = SizeOf(theBuf) - theSize;
if (spaceSize < SizeOf(theBuf) / 2) {
selfDistruct();
return;
}
ofixedstream buf(theBuf + theSize, spaceSize);
configureStream(buf, 4);
theMeasBuf = &buf;
buf << "<message src='" << sndAddr << "'>" << endl;
measStr("label", msg.theLabel);
measTime("start_time", msg.theStartTime);
measTime("send_time", msg.theSndTime);
measTime("response_time", msg.theRespTime);
measDouble("req_rate", msg.theReqRate);
measDouble("rep_rate", msg.theRepRate);
measDouble("bwidth", msg.theBwidth);
measDouble("dhr", msg.theDHR);
measDouble("conn_use", msg.theConnUse);
measDouble("error_ratio", msg.theErrRatio);
measInt("xact_count", msg.theXactTotCnt);
measInt("error_count", msg.theErrTotCnt);
measInt("socket_level", msg.theSockInstCnt);
measStr("side", msg.theCat == lgcCltSide ? "client" : "server");
buf << "</message>" << endl;
theMeasBuf = 0;
if (!buf) {
selfDistruct();
return;
}
theSize += Size(buf.tellp());
if (!theReserv)
theReserv = TheScanner->setFD(theSock.fd(), dirWrite, this);
}
void Client::noteWriteReady(int) {
if (theSize) {
const int sz = theSock.write(theBuf, theSize);
if (sz < 0) {
if (Error::LastExcept(EWOULDBLOCK))
selfDistruct();
return;
}
theSize -= sz;
memmove(theBuf, theBuf+sz, theSize);
}
if (!theSize)
TheScanner->clearRes(theReserv);
}
void Client::selfDistruct() {
for (int i = 0; i < TheClients.count(); ++i)
if (TheClients[i] == this) {
TheClients[i] = 0;
delete this;
return;
}
Assert(0);
}
void Client::measStr(const char *key, const char *str) {
*theMeasBuf << "\t<measurement key='" << key << "'"
<< " type='string' value='" << str << "' />" << endl;
}
void Client::measTime(const char *key, Time tm) {
*theMeasBuf << "\t<measurement key='" << key << "'"
<< " type='time' value='" << tm.secd() << "' />" << endl;
}
void Client::measDouble(const char *key, const NetDouble &d) {
*theMeasBuf << "\t<measurement key='" << key << "'"
<< " type='double' value='" << (double)d << "' />" << endl;
}
void Client::measInt(const char *key, int n) {
*theMeasBuf << "\t<measurement key='" << key << "'"
<< " type='int' value='" << n << "' />" << endl;
}
/* XmlServer */
XmlServer::XmlServer(int port) {
// XXX hard-code IPv4
Must(theSock.create(InAddress::IPvFour().family()));
Must(theSock.blocking(false));
theSock.reuseAddr(true);
// XXX relies on InAddress class to zero address value, making it
// equivalent to INADDR_ANY
if (!theSock.bind(NetAddr(InAddress::IPvFour(), port)) || !theSock.listen()) {
cerr << "cannot listen on TCP port " << port << ": " << Error::Last() << endl;
exit(-2);
}
theReserv = TheScanner->setFD(theSock.fd(), dirRead, this);
}
XmlServer::~XmlServer() {
if (theReserv)
TheScanner->clearRes(theReserv);
}
void XmlServer::noteReadReady(int) {
sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
if (Socket s = theSock.accept((struct sockaddr*)&addr, &addr_len)) {
Client *c = new Client(s, NetAddr(
((sockaddr_in &)addr).sin_addr,
((sockaddr_in &)addr).sin_port));
for (int i = 0; i < TheClients.count(); ++i)
if (!TheClients[i]) {
TheClients[i] = c;
return;
}
TheClients.append(c);
}
}
/* TcpServer */
TcpServer::TcpServer(int port) {
Must(theSock.create(AF_INET, SOCK_DGRAM, 0));
theSock.reuseAddr(true);
// XXX relies on InAddress class to zero address value, making it
// equivalent to INADDR_ANY
if (!theSock.bind(NetAddr(InAddress::IPvFour(), port))) {
cerr << "cannot listen on UDP port " << port << ": " << Error::Last() << endl;
exit(-2);
}
Must(theSock.blocking(false));
theReserv = TheScanner->setFD(theSock.fd(), dirRead, this);
}
TcpServer::~TcpServer() {
if (theReserv)
TheScanner->clearRes(theReserv);
}
void TcpServer::noteReadReady(int) {
StatusNotifMsg msg;
struct sockaddr_storage from;
socklen_t fromlen = sizeof(from);
if (recvfrom(theSock.fd(), (char*)&msg, sizeof(msg), 0, (sockaddr *) &from, &fromlen) == sizeof(msg)) {
msg.ntoh();
for (int i = 0; i < TheClients.count(); ++i)
if (TheClients[i])
TheClients[i]->append(msg, NetAddr(from, -1));
}
}
int main(int argc, char *argv[]) {
(void)PolyVersion();
// note: we assume that TCP and UDP port namespaces are separate
if (argc > 2 || (argc == 2 && !xatoi(argv[1]))) {
cerr << "usage: " << argv[0] << " [udp_and_tcp_port]" << endl;
return -1;
}
const int port = xatoi(argv[1], 18256);
cout << "Starting UDP to TCP dispatcher on port " << port << endl;
// detach ourselves if possible
#if HAVE_FORK
const pid_t pid = fork();
if (pid > 0) // parent
return 0;
if (pid < 0 || setsid() < 0) { // error
cerr << "problems detaching the server process: " << Error::Last() << endl;
return -1;
}
#endif
TheScanner = new Poll();
TheScanner->configure(FD_SETSIZE);
XmlServer *tcpSrv = new XmlServer(port);
TcpServer *udpSrv = new TcpServer(port);
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
while (!TheScanner->idle()) {
Clock::Update();
TheScanner->scan(0);
}
delete udpSrv;
delete tcpSrv;
delete TheScanner;
return 0;
}
syntax highlighted by Code2HTML, v. 0.9.1