/* Web Polygraph       http://www.web-polygraph.org/
 * (C) 2003-2006 The Measurement Factory
 * Licensed under the Apache License, Version 2.0 */

#include "base/polygraph.h"

#include "xstd/h/os_std.h"
#include "xstd/h/string.h"
#include "xstd/h/signal.h"
#include "xstd/h/netinet.h"
#include "xstd/h/iostream.h"
#include "xstd/h/sstream.h"
#include "xstd/h/iomanip.h"

#include "xstd/Assert.h"
#include "xstd/Socket.h"
#include "xstd/Clock.h"
#include "xstd/Poll.h"
#include "xstd/gadgets.h"
#include "base/polyLogCats.h"
#include "base/polyVersion.h"
#include "runtime/NotifMsg.h"

// maintains info about a host
class Client: public FileScanUser {
	public:
		Client(Socket aFD, const NetAddr &anAddr);
		virtual ~Client();

		void append(const StatusNotifMsg &msg, const NetAddr &sndAddr);

		virtual void noteWriteReady(int fd);
		
	protected:
		void measStr(const char *key, const char *str);
		void measTime(const char *key, Time tm);
		void measDouble(const char *key, const NetDouble &d);
		void measInt(const char *key, int n);

		void selfDistruct();

	protected:
		Socket theSock;
		NetAddr theAddr; // unique host address
		FileScanReserv theReserv;
		char theBuf[16*1024];
		Size theSize;

		ostream *theMeasBuf; // temporary buffer to build measurement message
};

class TcpServer: public FileScanUser {
	public:
		TcpServer(int port);
		virtual ~TcpServer();

		virtual void noteReadReady(int fd);

	protected:
		Socket theSock;
		FileScanReserv theReserv;
};

class XmlServer: public FileScanUser {
	public:
		XmlServer(int port);
		virtual ~XmlServer();

		virtual void noteReadReady(int fd);

	protected:
		Socket theSock;
		FileScanReserv theReserv;
};


/* globals */

static Array<Client*> TheClients;
static FileScanner *TheScanner = 0;

static int TheLiveCltCnt = 0;

/* implementation */


/* Client */

Client::Client(Socket aFD, const NetAddr &anAddr): 
	theSock(aFD), theAddr(anAddr), theSize(0), theMeasBuf(0) {
	theSock.blocking(false);
	TheLiveCltCnt++;
}

Client::~Client() {
	if (theReserv)
		TheScanner->clearRes(theReserv);
	TheLiveCltCnt--;
}

void Client::append(const StatusNotifMsg &msg, const NetAddr &sndAddr) {
	const Size spaceSize = SizeOf(theBuf) - theSize;
	if (spaceSize < SizeOf(theBuf) / 2) {
		selfDistruct();
		return;
	}

	ofixedstream buf(theBuf + theSize, spaceSize);
	configureStream(buf, 4);
	theMeasBuf = &buf;
	buf << "<message src='" << sndAddr << "'>" << endl;
	measStr("label", msg.theLabel);
	measTime("start_time", msg.theStartTime);
	measTime("send_time", msg.theSndTime);
	measTime("response_time", msg.theRespTime);
	measDouble("req_rate", msg.theReqRate);
	measDouble("rep_rate", msg.theRepRate);
	measDouble("bwidth", msg.theBwidth);
	measDouble("dhr", msg.theDHR);
	measDouble("conn_use", msg.theConnUse);
	measDouble("error_ratio", msg.theErrRatio);
	measInt("xact_count", msg.theXactTotCnt);
	measInt("error_count", msg.theErrTotCnt);
	measInt("socket_level", msg.theSockInstCnt);
	measStr("side", msg.theCat == lgcCltSide ? "client" : "server");
	buf << "</message>" << endl;
	theMeasBuf = 0;

	if (!buf) {
		selfDistruct();
		return;
	}		

	theSize += Size(buf.tellp());
	if (!theReserv)
		theReserv = TheScanner->setFD(theSock.fd(), dirWrite, this);
}

void Client::noteWriteReady(int) {
	if (theSize) {
		const int sz = theSock.write(theBuf, theSize);
		if (sz < 0) {
			if (Error::LastExcept(EWOULDBLOCK))
				selfDistruct();
			return;
		}
		theSize -= sz;
		memmove(theBuf, theBuf+sz, theSize);
	}

	if (!theSize)
		TheScanner->clearRes(theReserv);
}

void Client::selfDistruct() {
	for (int i = 0; i < TheClients.count(); ++i)
		if (TheClients[i] == this) {
			TheClients[i] = 0;
			delete this;
			return;
		}
	Assert(0);
}

void Client::measStr(const char *key, const char *str) {
	*theMeasBuf << "\t<measurement key='" << key << "'"
		<< " type='string' value='" << str << "' />" << endl;
}

void Client::measTime(const char *key, Time tm) {
	*theMeasBuf << "\t<measurement key='" << key << "'"
		<< " type='time' value='" << tm.secd() << "' />" << endl;
}

void Client::measDouble(const char *key, const NetDouble &d) {
	*theMeasBuf << "\t<measurement key='" << key << "'"
		<< " type='double' value='" << (double)d << "' />" << endl;
}

void Client::measInt(const char *key, int n) {
	*theMeasBuf << "\t<measurement key='" << key << "'"
		<< " type='int' value='" << n << "' />" << endl;
}

/* XmlServer */

XmlServer::XmlServer(int port) {
	// XXX hard-code IPv4
	Must(theSock.create(InAddress::IPvFour().family()));
	Must(theSock.blocking(false));
	theSock.reuseAddr(true);
	// XXX relies on InAddress class to zero address value, making it
	// equivalent to INADDR_ANY
	if (!theSock.bind(NetAddr(InAddress::IPvFour(), port)) || !theSock.listen()) {
		cerr << "cannot listen on TCP port " << port << ": " << Error::Last() << endl;
		exit(-2);
	}

	theReserv = TheScanner->setFD(theSock.fd(), dirRead, this);
}

XmlServer::~XmlServer() {
	if (theReserv)
		TheScanner->clearRes(theReserv);
}

void XmlServer::noteReadReady(int) {
	sockaddr_in addr;
	socklen_t addr_len = sizeof(addr);
	if (Socket s = theSock.accept((struct sockaddr*)&addr, &addr_len)) {
		Client *c = new Client(s, NetAddr(
			((sockaddr_in &)addr).sin_addr,
			((sockaddr_in &)addr).sin_port));

		for (int i = 0; i < TheClients.count(); ++i)
			if (!TheClients[i]) {
				TheClients[i] = c;
				return;
			}
		TheClients.append(c);
	}
}


/* TcpServer */

TcpServer::TcpServer(int port) {
	Must(theSock.create(AF_INET, SOCK_DGRAM, 0));
	theSock.reuseAddr(true);
	// XXX relies on InAddress class to zero address value, making it
	// equivalent to INADDR_ANY
	if (!theSock.bind(NetAddr(InAddress::IPvFour(), port))) {
		cerr << "cannot listen on UDP port " << port << ": " << Error::Last() << endl;
		exit(-2);
	}
	Must(theSock.blocking(false));

	theReserv = TheScanner->setFD(theSock.fd(), dirRead, this);
}

TcpServer::~TcpServer() {
	if (theReserv)
		TheScanner->clearRes(theReserv);
}

void TcpServer::noteReadReady(int) {
	StatusNotifMsg msg;
	struct sockaddr_storage from;
	socklen_t fromlen = sizeof(from);
	if (recvfrom(theSock.fd(), (char*)&msg, sizeof(msg), 0, (sockaddr *) &from, &fromlen) == sizeof(msg)) {
		msg.ntoh();
		for (int i = 0; i < TheClients.count(); ++i)
			if (TheClients[i])
				TheClients[i]->append(msg, NetAddr(from, -1));
	}
}


int main(int argc, char *argv[]) {
	(void)PolyVersion();

	// note: we assume that TCP and UDP port namespaces are separate
	if (argc > 2 || (argc == 2 && !xatoi(argv[1]))) {
		cerr << "usage: " << argv[0] << " [udp_and_tcp_port]" << endl;
		return -1;
	}

	const int port = xatoi(argv[1], 18256);

	cout << "Starting UDP to TCP dispatcher on port " << port << endl;

	// detach ourselves if possible
#if HAVE_FORK
	const pid_t pid = fork();

	if (pid > 0) // parent
		return 0;

    if (pid < 0 || setsid() < 0) { // error
        cerr << "problems detaching the server process: " << Error::Last() << endl;
		return -1;
	}
#endif

	TheScanner = new Poll();
	TheScanner->configure(FD_SETSIZE);

	XmlServer *tcpSrv = new XmlServer(port);
	TcpServer *udpSrv = new TcpServer(port);

	signal(SIGPIPE, SIG_IGN);
	signal(SIGHUP, SIG_IGN);

	while (!TheScanner->idle()) {
		Clock::Update();
		TheScanner->scan(0);
	}

	delete udpSrv;
	delete tcpSrv;
	delete TheScanner;

	return 0;
}


syntax highlighted by Code2HTML, v. 0.9.1