/* Web Polygraph       http://www.web-polygraph.org/
 * (C) 2003-2006 The Measurement Factory
 * Licensed under the Apache License, Version 2.0 */

#include "base/polygraph.h"

#include "xstd/h/os_std.h"
#include "xstd/h/string.h"
#include "xstd/h/signal.h"
#include "xstd/h/netinet.h"
#include "xstd/h/iostream.h"
#include "xstd/h/iomanip.h"

#include "xstd/Assert.h"
#include "xstd/Socket.h"
#include "xstd/Clock.h"
#include "xstd/Poll.h"
#include "xstd/gadgets.h"
#include "base/polyVersion.h"
#include "runtime/NotifMsg.h"

// maintains info about a host
class Client: public FileScanUser {
	public:
		Client(Socket aFD, const NetAddr &anAddr);
		virtual ~Client();

		void append(const void *buf, int size);

		virtual void noteWriteReady(int fd);
		
	protected:
		void selfDistruct();

	protected:
		Socket theSock;
		NetAddr theAddr; // unique host address
		FileScanReserv theReserv;
		char theBuf[1024];
		int theSize;
};

class TcpServer: public FileScanUser {
	public:
		TcpServer(int port);
		virtual ~TcpServer();

		virtual void noteReadReady(int fd);

	protected:
		Socket theSock;
		FileScanReserv theReserv;
};

class UdpServer: public FileScanUser {
	public:
		UdpServer(int port);
		virtual ~UdpServer();

		virtual void noteReadReady(int fd);

	protected:
		Socket theSock;
		FileScanReserv theReserv;
};


/* globals */

static Array<Client*> TheClients;
static FileScanner *TheScanner = 0;

static int TheLiveCltCnt = 0;

/* implementation */


/* Client */

Client::Client(Socket aFD, const NetAddr &anAddr): 
	theSock(aFD), theAddr(anAddr), theSize(0) {
	theSock.blocking(false);
	TheLiveCltCnt++;
}

Client::~Client() {
	if (theReserv)
		TheScanner->clearRes(theReserv);
	TheLiveCltCnt--;
}

void Client::append(const void *buf, int size) {
	if (size + theSize > (int)sizeof(theBuf)) {
		selfDistruct();
		return;
	}
	memcpy(theBuf + theSize, buf, size);
	theSize += size;
	if (!theReserv)
		theReserv = TheScanner->setFD(theSock.fd(), dirWrite, this);
}

void Client::noteWriteReady(int) {
	if (theSize) {
		const int sz = theSock.write(theBuf, theSize);
		if (sz < 0) {
			if (Error::LastExcept(EWOULDBLOCK))
				selfDistruct();
			return;
		}
		theSize -= sz;
		memmove(theBuf, theBuf+sz, theSize);
	}

	if (!theSize)
		TheScanner->clearRes(theReserv);
}

void Client::selfDistruct() {
	for (int i = 0; i < TheClients.count(); ++i)
		if (TheClients[i] == this) {
			TheClients[i] = 0;
			delete this;
			return;
		}
	Assert(0);
}


/* TcpServer */

TcpServer::TcpServer(int port) {
	Must(theSock.create(InAddress::IPvFour().family()));
	Must(theSock.blocking(false));
	theSock.reuseAddr(true);
	// XXX relies on InAddress class to zero address value, making it
	// equivalent to INADDR_ANY
	if (!theSock.bind(NetAddr(InAddress::IPvFour(), port)) || !theSock.listen()) {
		cerr << "cannot listen on TCP port " << port << ": " << Error::Last() << endl;
		exit(-2);
	}

	theReserv = TheScanner->setFD(theSock.fd(), dirRead, this);
}

TcpServer::~TcpServer() {
	if (theReserv)
		TheScanner->clearRes(theReserv);
}

void TcpServer::noteReadReady(int) {
	sockaddr_in addr;
	socklen_t addr_len = sizeof(addr);
	if (Socket s = theSock.accept((struct sockaddr*)&addr, &addr_len)) {
		Client *c = new Client(s, NetAddr(
			((sockaddr_in &)addr).sin_addr,
			((sockaddr_in &)addr).sin_port));

		for (int i = 0; i < TheClients.count(); ++i)
			if (!TheClients[i]) {
				TheClients[i] = c;
				return;
			}
		TheClients.append(c);
	}
}


/* UdpServer */

UdpServer::UdpServer(int port) {
	// XXX hard-code IPv4
	Must(theSock.create(PF_INET, SOCK_DGRAM, 0));
	theSock.reuseAddr(true);
	// XXX relies on InAddress class to zero address value, making it
	// equivalent to INADDR_ANY
	if (!theSock.bind(NetAddr(InAddress::IPvFour(), port))) {
		cerr << "cannot listen on UDP port " << port << ": " << Error::Last() << endl;
		exit(-2);
	}
	Must(theSock.blocking(false));

	theReserv = TheScanner->setFD(theSock.fd(), dirRead, this);
}

UdpServer::~UdpServer() {
	if (theReserv)
		TheScanner->clearRes(theReserv);
}

void UdpServer::noteReadReady(int) {
	StatusNotifMsg msg;
	struct sockaddr_storage from;
	socklen_t fromlen = sizeof(from);
	if (recvfrom(theSock.fd(), (char*)&msg, sizeof(msg), 0, (sockaddr *) &from, &fromlen) == sizeof(msg)) {
		msg.ntoh();
		StatusFwdMsg fwd(msg, TheClock, NetAddr(from, -1));
		fwd.theCopyCnt = TheLiveCltCnt;
		fwd.hton();
		for (int i = 0; i < TheClients.count(); ++i)
			if (TheClients[i])
				TheClients[i]->append(&fwd, sizeof(fwd));
	}
}


int main(int argc, char *argv[]) {
	(void)PolyVersion();

	// note: we assume that TCP and UDP port namespaces are separate
	if (argc > 2 || (argc == 2 && !xatoi(argv[1]))) {
		cerr << "usage: " << argv[0] << " [udp_and_tcp_port]" << endl;
		return -1;
	}

	const int port = xatoi(argv[1], 18256);

	cout << "Starting UDP to TCP dispatcher on port " << port << endl;

	// detach ourselves if possible
#if HAVE_FORK
	const pid_t pid = fork();

	if (pid > 0) // parent
		return 0;

    if (pid < 0 || setsid() < 0) { // error
        cerr << "problems detaching the server process: " << Error::Last() << endl;
		return -1;
	}
#endif

	TheScanner = new Poll();
	TheScanner->configure(FD_SETSIZE);

	TcpServer *tcpSrv = new TcpServer(port);
	UdpServer *udpSrv = new UdpServer(port);

	signal(SIGPIPE, SIG_IGN);
	signal(SIGHUP, SIG_IGN);

	while (!TheScanner->idle()) {
		Clock::Update();
		TheScanner->scan(0);
	}

	delete udpSrv;
	delete tcpSrv;
	delete TheScanner;

	return 0;
}


syntax highlighted by Code2HTML, v. 0.9.1