/* Web Polygraph       http://www.web-polygraph.org/
 * (C) 2003-2006 The Measurement Factory
 * Licensed under the Apache License, Version 2.0 */

#include "base/polygraph.h"

#include "xstd/h/iostream.h"
#include "xstd/h/sstream.h"
#include "xstd/h/iomanip.h"
#include "xstd/h/process.h"  /* for _getpid() on W2K */

#include "xstd/h/math.h"
#include "xstd/h/os_std.h"
#include "xstd/h/string.h"
#include "xstd/h/signal.h"
#include "xstd/h/netinet.h"

#include <fstream>

#include "xstd/Assert.h"
#include "xstd/Time.h"
#include "xstd/BitMask.h"
#include "xstd/Socket.h"
#include "xstd/Select.h"
#include "xstd/Poll.h"
#include "xstd/AlarmClock.h"
#include "xstd/History.h"
#include "base/AggrStat.h"
#include "base/polyLogCats.h"
#include "base/polyVersion.h"
#include "runtime/NotifMsg.h"


// handles incoming messages
class MsgMonitor: public FileScanUser {
	public:
		MsgMonitor(Socket aSock);
		virtual ~MsgMonitor();

		virtual void noteReadReady(int fd);

	public:
		Socket theSock;
		FileScanReserv theReserv;

		char theBuf[sizeof(StatusFwdMsg)*64];
		int theSize;
};

// handles periodic operations
class Ticker: public AlarmUser {
	public:
		Ticker(Time aPeriod);
		virtual ~Ticker();

		virtual void wakeUp(const Alarm &alarm);

	public:
		Time thePeriod;
};

struct HostFilter;

// maintains info about a host
class Host {
	public:
		typedef History<StatusFwdMsg> Log;

	public:
		Host(const NetAddr &anAddr);

		const NetAddr &addr() const { return theAddr; }
		// XXX pre-IPv6 code used 's_addr' as an int
		int id() const { return theAddr.addrN().octet(0); }
		int logCat() const;
		// XXX pre-IPv6 code used 's_addr' as an int
		int lna() const { return theAddr.addrN().octet(0); }
		const Log &log() const { return theLog; }
		const char *runLabel() const;

		bool busy() const;
		bool selected() const { return isSelected; }
		bool isClient() const { return logCat() == lgcCltSide; }
		bool isServer() const { return logCat() == lgcSrvSide; }
	
		void noteMsg(const StatusFwdMsg &msg); // { theLog.insert(msg); }
		void selected(bool be) { isSelected = be; }

		bool matches(const HostFilter &sel) const;

	protected:
		NetAddr theAddr; // unique host address
		Log theLog;      // notification messasge history
		bool isSelected;
};

// used to select groups of hosts
struct HostFilter {
	struct {
		int pos;
	} lbl;          // experiment label based selection
	struct {
		int pos;
	} logCat;           // category based selection (aka "side")

	HostFilter() { memset(this, 0, sizeof(*this)); }
};

class MsgSum {
	public:
		MsgSum();

		int hostCount() const { return theReqRate.count(); }

		MsgSum &operator +=(const StatusFwdMsg &msg);

	public:
		String theLabels;
		AggrStat theReqRate;
		AggrStat theRepRate;
		AggrStat theBwidth;
		AggrStat theRespTime;
		AggrStat theDHR;
		AggrStat theConnUse;
		AggrStat theErrRatio;
		AggrStat theErrTotCnt;
		AggrStat theXactTotCnt;
		AggrStat theTotErrRatio;
		AggrStat theSockInstCnt;
};


/* globals */

static NetAddr TheDisp;

static FileScanner *TheScanner = 0;
static bool DoShutdown = false;

static Array<Host*> TheHosts;
static Array<Host*> TheHostIdx; // address -> host map
static int TheBusyHostCnt = 0;
static Array<String*> TheLabels;
static int TheUniqLblCnt = 0;

static void NoteMsg(const StatusFwdMsg &msg);
static void DeleteIdleHosts();
static void RrdUpdate();
static bool AddFirstLabel(const Host *skip, const String &l);
static bool DelLastLabel(const Host *skip, const String &l);

static String DbaseName = "polygraph.rrd";

inline int Dbl2Int(double v) { return (int)rint(v); }


/* MsgMonitor */

MsgMonitor::MsgMonitor(Socket aSock): theSock(aSock), theSize(0) {
	theReserv = TheScanner->setFD(theSock.fd(), dirRead, this);
}

MsgMonitor::~MsgMonitor() {
	if (theReserv)
		TheScanner->clearRes(theReserv);
	if (theSock)
		theSock.close();
}

void MsgMonitor::noteReadReady(int) {
	static const int msgsz = sizeof(StatusFwdMsg);

	const int sz = theSock.read(theBuf + theSize, sizeof(theBuf) - theSize);

	if (sz < 0) {
		if (Error::Last() != EWOULDBLOCK) {
			cerr << "failed to read from dispatcher at " << TheDisp 
				<< ": " << Error::Last() << endl;
			DoShutdown = true;
		}
		return;
	}
 
	if (sz == 0) {
		cerr << "dispatcher at " << TheDisp << " quit." << endl;
		DoShutdown = true;
		return;
	}

	theSize += sz;

	while (theSize >= msgsz) {
		StatusFwdMsg msg;
		memcpy(&msg, theBuf, msgsz);
		theSize -= msgsz;
		memmove(theBuf, theBuf + msgsz, theSize);

		// handle the message
		msg.ntoh();
		msg.theRcvTime = TheClock;
		NoteMsg(msg);
	}
	Assert(theSize >= 0);
}


/* Ticker */

Ticker::Ticker(Time aPeriod): thePeriod(aPeriod) {
	sleepFor(thePeriod);
}

Ticker::~Ticker() {
}

void Ticker::wakeUp(const Alarm &alarm) {
	AlarmUser::wakeUp(alarm);

	DeleteIdleHosts();
	RrdUpdate();

	sleepFor(thePeriod);
}


/* Host */

Host::Host(const NetAddr &anAddr): theAddr(anAddr), theLog(60), isSelected(false) {
}

int Host::logCat() const {
	return theLog.depth() ? theLog[0].theCat : lgcAll;
}

const char *Host::runLabel() const {
	return theLog.depth() ? theLog[0].theLabel : 0;
}

bool Host::busy() const {
	return theLog.depth() && theLog[0].theRcvTime > TheClock.time() - Time::Sec(90);
}

void Host::noteMsg(const StatusFwdMsg &msg) {
	theLog.insert(msg);
}

bool Host::matches(const HostFilter &filter) const {
	if (filter.logCat.pos != lgcAll && filter.logCat.pos != logCat())
		return false;
	if (const int l = filter.lbl.pos)
		return theLog.depth() && *TheLabels[l] == theLog[0].theLabel;
	return true;
}


/* MsgSum */

MsgSum::MsgSum(): theLabels("") {
}

MsgSum &MsgSum::operator +=(const StatusFwdMsg &msg) {
	// XXX: theLabels is not updated
	theReqRate.record(Dbl2Int(msg.theReqRate));
	theRepRate.record(Dbl2Int(msg.theRepRate));
	theBwidth.record(Dbl2Int(msg.theBwidth));
	theRespTime.record(msg.theRespTime.msec());
	theDHR.record(Dbl2Int(msg.theDHR*100));
	theConnUse.record(Dbl2Int(msg.theConnUse));
	theErrRatio.record(Dbl2Int(msg.theErrRatio*100));
	theErrTotCnt.record(msg.theErrTotCnt);
	theXactTotCnt.record(msg.theXactTotCnt);
	theTotErrRatio.record(
		Dbl2Int(Percent(msg.theErrTotCnt, msg.theErrTotCnt+msg.theXactTotCnt)));
	theSockInstCnt.record(msg.theSockInstCnt);
	return *this;
}

/* local routines */

static
Host *AddHost(const NetAddr &addr) {
	Host *host = new Host(addr);
	Host *foundPos = 0;
	for (int i = 0; !foundPos && i < TheHosts.count(); ++i) {
		if (!TheHosts[i])
			foundPos = TheHosts[i] = host;
	}
	if (!foundPos)
		TheHosts.append(host);
	TheBusyHostCnt++;
	return host;
}

static
void NoteMsg(const StatusFwdMsg &msg) {
	//clog << "from " << inet_ntoa(from.sin_addr) << ": " << TheMsg.buf << endl;

	NetAddr from(msg.theSndAddr.addr, msg.theSndAddr.port);

	// find corresponding host
	// XXX pre-IPv6 code used 's_addr' as an int
	const int hidx = from.addrN().octet(0) % TheHostIdx.count();
	Host *host = TheHostIdx[hidx];

	if (host && host->addr() != from) { // collision
		host = 0;
		// find using linear search
		for (int i = 0; !host && i < TheHosts.count(); ++i) {
			if (TheHosts[i] && TheHosts[i]->addr() == from)
				host = TheHosts[i];
		}
	}

	if (!host) {
		host = AddHost(from);
		if (!TheHostIdx[hidx])
			TheHostIdx[hidx] = host;
		AddFirstLabel(host, msg.theLabel);
	}

	host->noteMsg(msg);
}


static
void DeleteIdleHosts() {
	// we will build these from scratch:
	TheHostIdx.memset(0);

	for (int h = 0; h < TheHosts.count(); ++h) {
		if (Host *host = TheHosts[h]) {
			if (host->busy()) {
				// XXX pre-IPv6 code used 's_addr' as an int
				const int idx = host->addr().addrN().octet(0) % TheHostIdx.count();
				TheHostIdx[idx] = host;
			} else {
				TheBusyHostCnt--;
				DelLastLabel(host, host->runLabel());
				delete host;
				TheHosts[h] = 0;
			}
		}
	}
}

static
void RrdUpdate() {
	const Time freshCutOff = TheClock - Time::Sec(60);
	MsgSum snapshot; // last values available
	MsgSum window;   // all fresh values in the logs
	for (int h = 0; h < TheHosts.count(); ++h) {
		Host *host = TheHosts[h];
		if (!host)
			continue;
		if (host->logCat() != lgcCltSide)
			continue;

		// should be subject to freshness rule as well?
		snapshot += host->log()[0];

		for (int l = 0; l < host->log().depth(); ++l) {
			const StatusFwdMsg &msg = host->log()[l];
			if (freshCutOff <= msg.theRcvTime)
				window += msg;
		}
	}

	if (snapshot.hostCount()) {
		ostringstream cmdbuf;
		configureStream(cmdbuf, 2);
		cmdbuf
			<< "rrdtool update " << DbaseName
			<< " --template xact:err:rt:sock:dhr N"
			<< ':' << snapshot.theXactTotCnt.sum()
			<< ':' << snapshot.theErrTotCnt.sum()
			<< ':' << window.theRespTime.mean()
			<< ':' << snapshot.theSockInstCnt.sum()
			<< ':' << window.theDHR.mean()
			<< ends;

		const String cmd = cmdbuf.str().c_str();
		streamFreeze(cmdbuf, false);

		cout << cmd << endl;
		Should(system(cmd.cstr()) == 0);
	}
}

static
bool AddFirstLabel(const Host *skip, const String &l) {
	for (int h = 0; h < TheHosts.count(); ++h) {
		if (TheHosts[h] == skip)
			continue;
		if (TheHosts[h] && l == TheHosts[h]->runLabel())
			return false; // not first label
	}
	TheUniqLblCnt++;
	for (int i = 1; i < TheLabels.count(); ++i) {
		if (!*TheLabels[i]) {
			*TheLabels[i] = l;
			return true;
		}
	}
	TheLabels.append(new String(l));
	return false;
}

static
bool DelLastLabel(const Host *skip, const String &l) {
	for (int h = 0; h < TheHosts.count(); ++h) {
		if (TheHosts[h] == skip)
			continue;
		if (TheHosts[h] && l == TheHosts[h]->runLabel())
			return false; // not last label
	}
	TheUniqLblCnt--;
	for (int i = 1; i < TheLabels.count(); ++i) {
		if (*TheLabels[i] == l) {
			*TheLabels[i] = 0;
			if (i == TheLabels.count()-1)
				delete TheLabels.pop();
			return true;
		}
	}
	Assert(0);
	return false;
}

int main(int argc, char *argv[]) {
	(void)PolyVersion();

	int argOff = 0;
	if (argc > 1 && String(argv[1]) == "--help") {
		cerr << "usage: " << argv[0]
			<< " [--help] [--database <filename>]"
			<< " [udp2tcpd_ip [udp2tcpd_port]]"
			<< endl;
		return 0;
	} else
	if (argc > 1 && String(argv[1]) == "--database") {
		DbaseName = argv[2];
		argOff += 2;
		argc -= 2;
	}

	const char *disph = argc >= 2 ? argv[argOff+1] : "127.0.0.1";
	const int dispp = argc >= 3 ? atoi(argv[argOff+2]) : 18256;
	TheDisp = NetAddr(disph, dispp);

	Socket sock;
	Must(sock.create(TheDisp.addrN().family()));
	if (!sock.connect(TheDisp)) {
		cerr << "failed to connect to udp2tcp dispatcher at " << TheDisp
			<< ": " << Error::Last() << endl;
		return -2;
	}
	Should(sock.blocking(false));

	TheHostIdx.stretch(256);
	TheHostIdx.count(TheHostIdx.capacity());
	TheLabels.append(new String);

	signal(SIGPIPE, SIG_IGN);
	Clock::Update();

	TheScanner = new PG_PREFFERED_FILE_SCANNER;
	TheScanner->configure(FD_SETSIZE);

	MsgMonitor *msgSrv = new MsgMonitor(sock);
	Ticker *ticker = new Ticker(Time::Sec(60));

	const String cmd = String("test -f ") + DbaseName +
		" || rrdtool create " + DbaseName +
		" --step 60 "
		"DS:xact:DERIVE:90:0:U "
		"DS:err:DERIVE:90:0:U "
		"DS:rt:GAUGE:90:0:10000 "	// require 0-10,000 msec
		"DS:sock:GAUGE:90:0:U "
		"DS:dhr:GAUGE:90:0:100 "
		"RRA:AVERAGE:0.50:1:1440 "	// 60 mins per hour
		"RRA:AVERAGE:0.50:5:864 "	// 864 5mins per 3days
		"RRA:AVERAGE:0.50:60:168"	// 168 hours per week
		;

	clog << "executing: " << cmd << endl;
	Must(system(cmd.cstr()) == 0);

	// store our pid
	const String pidFname = "polyrrd.pid";
	{
		ofstream pidFile(pidFname.cstr());
		pidFile << getpid() << endl;
	}

	while (!DoShutdown) {
		Clock::Update();
		Time tout = TheAlarmClock.timeLeft();
		TheScanner->scan(TheAlarmClock.on() ? 0 : &tout);
	}

	delete ticker;
	delete msgSrv;
	delete TheScanner;

	unlink(pidFname.cstr());

	return 0;
}


syntax highlighted by Code2HTML, v. 0.9.1