/* Web Polygraph       http://www.web-polygraph.org/
 * (C) 2003-2006 The Measurement Factory
 * Licensed under the Apache License, Version 2.0 */

#include "base/polygraph.h"

#include <fstream>

#include "xstd/h/signal.h"
#include "xstd/h/sstream.h"
#include "xstd/h/net/if.h"
#include "xstd/h/iostream.h"
#include "xstd/h/iomanip.h"

#include "xstd/Assert.h"
#include "xstd/FileScanner.h"
#include "xstd/Error.h"
#include "xstd/getIfAddrs.h"
#include "xstd/gadgets.h"
#include "xstd/InetIfReq.h"
#include "base/CmdLine.h"
#include "base/opts.h"
#include "base/polyLogTags.h"
#include "pgl/PglNetAddrRange.h"
#include "runtime/polyBcastChannels.h"
#include "probe/ProbeClt.h"
#include "probe/ProbeOpts.h"
#include "probe/ProbeStatMgr.h"
#include "probe/PolyProbe.h"

PolyProbe *ThePolyProbe = 0;
FileScanner *TheFileScanner = 0;

static SchArray<InAddress> TheLocalHosts;

// XXX: should not be needed! remove magic labels/versions from OLog
class MyOLog: public OLog {
	public:
		MyOLog() {}

	protected:
		virtual void putHeader();
};

void MyOLog::putHeader() {
	puti(lgMagic1); puti(lgMagic2); puti(0); // magic
	OLog::putHeader();
}


PolyProbe::PolyProbe(): exchangingStats(false), mustStop(false) {
	Assert(!ThePolyProbe);
	ThePolyProbe = this;
}

PolyProbe::~PolyProbe() {
	while (theAgents.count()) delete theAgents.pop();
}

bool PolyProbe::handleCmdLine(int argc, char *argv[]) {

	Array<OptGrp*> opts;
	opts.append(&TheProbeOpts);

	theCmdLine.configure(opts);
	if (!theCmdLine.parse(argc, argv))
		return false;

	// validate command line params
	for (int i = 0; i < opts.count(); ++i)
		if (!opts[i]->validate())
			return false;

	return true;
}

void PolyProbe::configureLogs(int prec) {
	// redirect console output
	if (TheProbeOpts.theConsFileName && TheProbeOpts.theConsFileName != "-")
		redirectOutput(TheProbeOpts.theConsFileName.cstr());

	configureStream(cout, prec);
	configureStream(cerr, prec);
	configureStream(clog, prec);
}

void PolyProbe::configureHosts() {
	Array<NetAddr*> lclClients;
	configureHosts(TheProbeOpts.theCltHosts.val(), theAllClients, lclClients);

	Array<NetAddr*> lclServers;
	configureHosts(TheProbeOpts.theSrvHosts.val(), theAllServers, lclServers);

	if (!lclClients.count() && !lclServers.count()) {
		cerr << thePrgName << ": no specified host addresses match local addresses" << endl;
		dumpHostSpace(lclClients, lclServers);
		exit(-3);
	}

	// all servers must have ports
	for (int i = 0; i < theAllServers.count(); ++i) {
		if (theAllServers[i]->port() <= 0)
			cerr << *theAllServers[i] << ": server address lacks port number" 
				<< endl << xexit;
	}

	// add servers first so that they are up and running before local clients
	for (int s = 0; s < lclServers.count(); ++s) {
		theServers.append(new ProbeSrv(*lclServers[s]));
		theAgents.append(theServers.last());
	}

	for (int c = 0; c < lclClients.count(); ++c) {
		for (int s = 0; s < theAllServers.count(); ++s) {
			theClients.append(new ProbeClt(*lclClients[c], *theAllServers[s]));
			theAgents.append(theClients.last());
		}
	}

	if (lclServers.count())
		TheProbeStatMgr.incConfigure(theAllClients, lclServers);

	if (lclClients.count())
		TheProbeStatMgr.incConfigure(lclClients, theAllServers);
}

void PolyProbe::configure() {
	configureLogs(2);

	// ignore some signals
	signal(SIGPIPE, SIG_IGN);
	signal(SIGHUP, SIG_IGN);

	setResourceLimits();
	configureHosts();
}

void PolyProbe::start() {
	if (TheProbeOpts.theDuration >= 0)
		sleepFor(TheProbeOpts.theDuration);

	for (int a = 0; a < theAgents.count(); ++a)
		theAgents[a]->exec();
}

// XXX: merge with common stuff in PolyApp::step()
void PolyProbe::step(Time tout) {
	Clock::Update(false);

	Time *toutp = &tout;

	if (!TheAlarmClock.on()) {
		if (tout < 0)
			toutp = 0;
	} else
	if (tout < 0) {
		tout = TheAlarmClock.timeLeft();
	} else {
		tout = Min(tout, TheAlarmClock.timeLeft());
	}

	// avoid multiple micro iterations when alarm time is very close
	if (tout > 0)
		tout = Max(Time::Msec(1), tout);

	const int readyCount = TheFileScanner->scan(toutp);

	if (mustStop)
		return;

	if (readyCount < 0) {
		if (const Error err = Error::LastExcept(EINTR))
			cerr << thePrgName << ": fatal error: " << err << endl << xexit;
	}

	Clock::Update();
}

void PolyProbe::wakeUp(const Alarm &a) {
	AlarmUser::wakeUp(a);
	if (!exchangingStats) {
		if (theAllClients.count()) {
			pullStats();
			sleepFor(TheProbeOpts.theStatExchTout);
		} else {
			// servers do not pull stats
			exchangingStats = true;
			mustStop = true;
		}
	} else {
		// exchange stats timeout
		clog << "stats exchange timeout" << endl;
		mustStop = true;
	}
}

void PolyProbe::setResourceLimits() {
	// set resource limits
	const int rlimit = SetMaxFD(GetMaxFD());
	if (rlimit > FD_SETSIZE)
		cerr << here << "warning: getrlimit(2) system call reports "
			<< rlimit << " file descriptors while FD_SETSIZE #defines only " 
			<< FD_SETSIZE << "; using lower value." << endl;
	const int fdLimit = Min(FD_SETSIZE, rlimit);
	Must(fdLimit > 0);

	TheFileScanner = TheProbeOpts.theFileScanner.val();
	TheFileScanner->configure(fdLimit);
	TheFileScanner->ticker(0);

	// to be safe, cut about 3% from the actual limit
	Socket::TheMaxLevel = Max(0, fdLimit - 10 - fdLimit/33);
}

void PolyProbe::configureHosts(const Array<String*> &ranges, Array<NetAddr*> &allHosts, Array<NetAddr*> &lclHosts) {
	// put command line addresses to allHosts array
	for (int i = 0; i < ranges.count(); ++i) {
		const String &host = *ranges[i];
		PglNetAddrRange hostParser;
		if (!hostParser.parse(host))
			cerr << thePrgName << ": malformed host address: `" << host << endl << xexit;
		hostParser.toAddrs(allHosts);
	}

	// find all local addresses (once)
	Array<InetIfReq> ifaces;
	if (!TheLocalHosts.count() && !GetIfAddrs(ifaces, String()))
		cerr << thePrgName << ": cannot get a list of all available addresses: " << Error::Last() << endl << xexit;

	TheLocalHosts.stretch(ifaces.count());
	for (int l = 0; l < ifaces.count(); ++l)
		TheLocalHosts.append(ifaces[l].addrN());

	// match local and configured addresses
	Array<int> matches;
	for (int h = 0; h < allHosts.count(); ++h) {
		const NetAddr &host = *allHosts[h];

		// check if the host address is local
		bool found = false;
		for (int l = 0; !found && l < TheLocalHosts.count(); ++l)
			found = TheLocalHosts[l] == host.addrN();

		if (found)
			matches.append(h);
	}

	for (int m = 0; m < matches.count(); ++m)
		lclHosts.append(allHosts[matches[m]]);
}

void PolyProbe::dumpHostSpace(const Array<NetAddr*> &clients, const Array<NetAddr*> &servers) {
	TheProbeOpts.theCltHosts.report(cerr << "all client host addresses:");
	cerr << endl;
	dumpHosts(cerr << "local client addresses: ", clients);
	cerr << endl;

	cerr << endl;

	TheProbeOpts.theSrvHosts.report(cerr << "all server host addresses:");
	cerr << endl;
	dumpHosts(cerr << "local server addresses: ", servers);
	cerr << endl;

	cerr << endl;

	dumpHosts(cerr << "all local addresses: ", TheLocalHosts);
	cerr << endl;
}

void PolyProbe::dumpHosts(ostream &os, const Array<InAddress> &hosts) {
	for (int h = 0; h < hosts.count(); ++h)
		os << ' ' << hosts[h].image();
}

void PolyProbe::dumpHosts(ostream &os, const Array<NetAddr*> &hosts) {
	for (int h = 0; h < hosts.count(); ++h)
		os << ' ' << *hosts[h];
}

void PolyProbe::reportCfg() const {
	theCmdLine.reportRaw(clog << thePrgName << ": "); clog << endl;
	theCmdLine.reportParsed(clog << thePrgName << ": "); clog << endl;
}

// called when somebody wants our stats
void PolyProbe::sendStats(Socket &s, const NetAddr &to) {
	Assert(s.fd() >= 0);

	cerr << here << "sending stats to " << to << endl;

	char buf[5*1024*1024];
	ofixedstream *os = new ofixedstream(buf, sizeof(buf));
	MyOLog log;
	log.stream(to.addrA(), os);
	TheProbeStatMgr.exportStats(log);
	log.flush();
	const Size sz = Size(os->tellp()); // XXX: wrong, close() adds more!
	log.close();

	s.blocking(true);
	Should(s.write(buf, sz) == sz && sz < SizeOf(buf));
	s.close();
}

void PolyProbe::pullStats() {
	exchangingStats = true;
	// XXX: should broadcast only once
	Broadcast(ThePhasesEndChannel, BcastRcver::ieNone);

	// pull stats from for all servers
	for (int i = 0; i < theAllServers.count(); step(Time(0,0)), ++i) {
		NetAddr &rhost = *theAllServers[i];

		// blocking ops OK here? no deadlock danger?
		Socket s;
		Must(s.create(rhost.addrN().family()));
		Must(s.blocking(true));
		Must(s.connect(rhost));

		// ask for stats
		{
			const int cmd = htonl(0xFFFFFFFF);
			const Size sz = s.write(&cmd, sizeof(cmd));
			if (!Should(sz == sizeof(cmd)))
				continue;
		}
		
		// read stats
		{
			char buf[5*1024*1024];
			bool err = false;
			Size pos = 0;
			while (!err && pos < SizeOf(buf)) {
				const Size sz = s.read(buf + pos, SizeOf(buf) - pos);
				err = !Should(sz >= 0);
				pos += sz;
				if (!sz)
					break;
			}
			err = err || !Should(pos < SizeOf(buf));
			if (err)
				continue;

			istringstream is(string(buf, pos));
			ILog log;
			log.stream(rhost.addrA(), &is);
			TheProbeStatMgr.importStats(log);
		}
	}
	mustStop = true;
}

void PolyProbe::reportStats() const {
	TheProbeStatMgr.report(cout);
}

int PolyProbe::run(int argc, char *argv[]) {

	thePrgName = argv[0];

	if (!handleCmdLine(argc, argv))
		return -1;

	Clock::Update();

	configure();

	reportCfg();

	start();

	while (!mustStop) {
		step(Time());
	}

	reportStats();

	return 0;
}


int main(int argc, char *argv[]) {
	PolyProbe app;
	return app.run(argc, argv);
}


syntax highlighted by Code2HTML, v. 0.9.1