/* Web Polygraph       http://www.web-polygraph.org/
 * (C) 2003-2006 The Measurement Factory
 * Licensed under the Apache License, Version 2.0 */

#include "base/polygraph.h"

#include "xstd/h/math.h"
#include "xstd/h/iomanip.h"

#include "xstd/Clock.h"
#include "base/Progress.h"
#include "base/polyLogCats.h"
#include "base/polyLogTags.h"
#include "pgl/PglCodeSym.h"
#include "pgl/StatsSampleCfg.h"
#include "pgl/StatsSampleSym.h"
#include "pgl/GoalSym.h"
#include "pgl/PhaseSym.h"
#include "runtime/Connection.h"
#include "runtime/Goal.h"
#include "runtime/httpHdrs.h"
#include "runtime/Xaction.h"
#include "runtime/IcpXaction.h"
#include "runtime/LogComment.h"
#include "runtime/PageInfo.h"
#include "runtime/SharedOpts.h"
#include "runtime/PolyOLog.h"
#include "runtime/Rptmstat.h"
#include "runtime/Script.h"
#include "runtime/DutWatchdog.h"
#include "runtime/StatsSampleMgr.h"
#include "runtime/StatPhaseMgr.h"
#include "runtime/StatPhaseSync.h"
#include "runtime/StatPhase.h"
#include "runtime/globals.h"
#include "runtime/polyErrors.h"
#include "runtime/polyBcastChannels.h"

class CondCallSym;

// XXX: lots of client-side specific logic; split

static enum { wssIgnore, wssWaitFreeze, wssFrozen } TheWssState = wssIgnore;

StatPhase::StatPhase(): theRecs(lgcEnd), theMgr(0), 
	theRptmstat(0), theScript(0),
	thePopulusFactor(this, "populus"), theLoadFactor(this, "load"), 
	theRecurFactor(this, "recurrence"), 
	theSpecialMsgFactor(this, "special message"),
	wasUsed(false), logStats(true), waitWssFreeze(false), doSynchronize(true),
	readyToStop(false), wasStopped(false), theLockCount(0) {

	for (int i = 0; i < theRecs.capacity(); ++i)
		theRecs.push(new StatPhaseRec);

	theChannels.append(TheInfoChannel);
}

StatPhase::~StatPhase() {
	delete theRptmstat; theRptmstat = 0;
	delete theScript; theScript = 0;
	while (theRecs.count()) delete theRecs.pop();
	while (theDutWatchdogs.count()) delete theDutWatchdogs.pop();
}

void StatPhase::configure(const PhaseSym *cfg, const StatPhase *prevPh) {
	Assert(cfg);
	Assert(cfg->goal());

	name(cfg->name());
	theGoal.configure(*cfg->goal());
	for (int i = 0; i < theRecs.count(); ++i)
		theRecs[i]->theGoal = theGoal;

	cfg->logStats(logStats);
	cfg->waitWssFreeze(waitWssFreeze);
	cfg->synchronize(doSynchronize); // true by default

	double b = -1;
	double e = -1;
	cfg->populusFactorBeg(b);
	cfg->populusFactorEnd(e);
	thePopulusFactor.configure(b, e, prevPh ? &prevPh->thePopulusFactor : 0);
	checkFactor(thePopulusFactor, "populus");

	b = -1;
	e = -1;
	cfg->loadFactorBeg(b);
	cfg->loadFactorEnd(e);
	theLoadFactor.configure(b, e, prevPh ? &prevPh->theLoadFactor : 0);
	checkFactor(theLoadFactor, "load");

	b = -1;
	e = -1;
	cfg->recurFactorBeg(b);
	cfg->recurFactorEnd(e);
	theRecurFactor.configure(b, e, prevPh ? &prevPh->theRecurFactor : 0);
	checkFactor(theRecurFactor, "recurrence");

	b = -1;
	e = -1;
	cfg->specialMsgFactorBeg(b);
	cfg->specialMsgFactorEnd(e);
	theSpecialMsgFactor.configure(b, e, prevPh ? &prevPh->theSpecialMsgFactor : 0);
	checkFactor(theSpecialMsgFactor, "special_req");

	if (RptmstatSym *s = cfg->rptmstat()) {
		theRptmstat = new Rptmstat;
		theRptmstat->configure(*s);
	}

	if (const CodeSym *code = cfg->script())
		theScript = new Script(*code);

	configureSamples(cfg);
}

void StatPhase::configureSamples(const PhaseSym *cfg) {
	Array<const StatsSampleSym*> ssyms;
	if (cfg->statsSamples(ssyms)) {
		for (int i = 0; i < ssyms.count(); ++i) {
			const StatsSampleSym &ssym = *ssyms[i];

			StatsSampleCfg cfg;
			// XXX: append counter to phase names
			cfg.name = ssym.name() ? ssym.name() : name();
			Assert(ssym.capacity(cfg.capacity));
			cfg.start = ssym.start();

			if (cfg.start >= 0) {
				TheStatsSampleMgr.addSample(cfg);
			} else {
				theSamples.append(new StatsSampleCfg(cfg));
				TheStatsSampleMgr.willAddSample();
			}
		}
	}
	// no support for multiple pending samples for now
	Assert(theSamples.count() <= 1); 
}

void StatPhase::addWatchdog(DutWatchdog *dog) {
	theDutWatchdogs.append(dog);
}

void StatPhase::checkFactor(const TransFactor &f, const String &label) const {
	// varying load is possible only if goal is specified
	const bool varyF = f.beg() != f.end();
	if (!theGoal && varyF) {
		cerr << "phase named `" << theName
			<< "' has " << label
			<< " factors but has no positive goal" << endl;
		exit(-2);
	} else
	if (f.flat() && varyF) {
		cerr << "internal error: no change coefficient for " << label
			<< " factors for the phase named '" << theName << "'" << endl;
		exit(-2);
	}
}

void StatPhase::lock() {
	theLockCount++;
}

void StatPhase::unlock() {
	if (locked()) {
		theLockCount--;
		if (!locked())
			checkpoint();
	} else {
		Comment(0) << "internal error: attempt to unlock an unlocked phase (salvaged)" << endc;
	}
}

bool StatPhase::locked() const {
	return theLockCount > 0;
}

Time StatPhase::duration() const {
	return TheClock.time() >= theIntvlStart ?
		TheClock - theIntvlStart : Time(0,0);
}

int StatPhase::xactCnt() const {
	int count = 0;
	for (int i = 0; i < theRecs.count(); ++i)
		count += theRecs[i]->xactCnt();
	return count;
}

BigSize StatPhase::fillSz() const {
	BigSize fill(0);
	for (int i = 0; i < theRecs.count(); ++i)
		fill += theRecs[i]->totFillSize();
	return fill;
}

int StatPhase::fillCnt() const {
	int fill = 0;
	for (int i = 0; i < theRecs.count(); ++i)
		fill += theRecs[i]->totFillCount();
	return fill;
}

int StatPhase::xactErrCnt() const {
	int count = 0;
	for (int i = 0; i < theRecs.count(); ++i)
		count += theRecs[i]->theXactErrCnt;
	return count;
}

void StatPhase::start(StatPhaseMgr *aMgr, const StatPhase *prevPhase) {
	Assert(!theMgr);
	theMgr = aMgr;

	if (prevPhase) {
		for (int i = 0; i < theRecs.count(); ++i)
			theRecs[i]->keepLevels(*prevPhase->theRecs[i]);
	}

	// XXX: will not lock if fill phase finishes before we get ieWssFill
	if (waitWssFreeze && TheWssState == wssWaitFreeze)
		lock();

	if (theRptmstat)
		theRptmstat->start(this);

	if (theScript)
		theScript->run();

	TheOLog << bege(lgStatPhaseBeg, lgcAll) << name() << ende;

	startListen();
	theIntvlStart = TheClock;

	for (int i = 0; i < theSamples.count(); ++i) {
		StatsSampleCfg &cfg = *theSamples[i];
		if (cfg.start < 0) {
			cfg.start = theIntvlStart;
			if (theGoal.duration() > 0)
				cfg.start += theGoal.duration()/2; // middle of the phase
			cfg.start -= Clock::TheStartTime;
		}
		TheStatsSampleMgr.addSample(cfg);
	}

	// manager should be careful with immediate "returns"
	if (!checkpoint()) { 
		// not an immediate exit
		wasUsed = true; 
		if (theGoal.duration() > 0)
			sleepFor(theGoal.duration());
	}
}

void StatPhase::noteConnEvent(BcastChannel *ch, const Connection *c) {
	if (ch == TheConnCloseChannel && !c->bad()) {
		StatPhaseRec &rec = *theRecs[c->logCat()];

		const int depth = c->useLevelMax();
		if (depth > 1)
			rec.theConnPipelineDepths.record(depth);

		const Time ttl = TheClock - c->openTime();
		rec.theConnClose.record(c->closeKind(), ttl, c->useCnt());
	}
	StatIntvl::noteConnEvent(ch, c);
}

void StatPhase::noteXactEvent(BcastChannel *ch, const Xaction *x) {
	if (ch == TheXactEndChannel) {
		StatPhaseRec &rec = *theRecs[x->logCat()];
		const ObjId &oid = x->oid();
		const Time repTime = x->lifeTime();
		const Size repSize = x->repSize().actual();

		// stats must be recorded in only one category for totals to work!
		if (oid.basic()) {
			rec.theBasicXacts.record(repTime, repSize, oid.hit());
			rec.theContType.record(oid.foreignUrl() ? 0 : oid.type(), repSize);
		} else
		if (oid.repToRedir())
			rec.theRepToRedirXacts.record(repTime, repSize);
		else
		if (oid.rediredReq())
			rec.theRediredReqXacts.record(repTime, repSize);
		else
		if (oid.imsAny())
			rec.theImsXacts.record(repTime, repSize, x->httpStatus() == RepHdr::sc200_OK);
		else
		if (oid.reload())
			rec.theReloadXacts.record(repTime, repSize);
		else
		if (oid.head())
			rec.theHeadXacts.record(repTime, repSize);
		else
		if (oid.post())
			rec.thePostXacts.record(repTime, repSize);
		else
		if (oid.put())
			rec.thePutXacts.record(repTime, repSize);
		else
		if (oid.aborted())
			; // do nothing, StatIntvl has enough(?) stats for these
		else {
			Assert(false); // all categories should be accounted for
		}
	}

	StatIntvl::noteXactEvent(ch, x);
}

void StatPhase::noteIcpXactEvent(BcastChannel *ch, const IcpXaction *x) {
	if (ch == TheIcpXactEndChannel) {
		StatPhaseRec &rec = *theRecs[x->logCat()];
		if (!x->timedout())
			rec.theIcpXacts.record(x->lifeTime(), x->repSize(), x->hit());
	} else {
		Assert(false);
	}

	StatIntvl::noteIcpXactEvent(ch, x);
}

void StatPhase::notePageEvent(BcastChannel *ch, const PageInfo *p) {
	if (ch == ThePageEndChannel) {
		StatPhaseRec &rec = *theRecs[lgcCltSide];
		rec.thePageHist.record(p->lifeTime, p->size);
	} else {
		Assert(false);
	}
	StatIntvl::notePageEvent(ch, p);
}

void StatPhase::noteInfoEvent(BcastChannel *ch, InfoEvent ev) {
	Assert(ch == TheInfoChannel);
	if (ev == ieWssFill) {
		if (waitWssFreeze)
			lock();
		TheWssState = wssWaitFreeze;
	} else
	if (ev == ieWssFreeze) {
		if (waitWssFreeze)
			unlock();
		TheWssState = wssFrozen;
	} else
	if (ev == ieReportProgress) {
		Comment(7) << "fyi: phase progress:" << endl;

		theGoal.reportProgress(Comment, *this);

		if (TheWssState == wssWaitFreeze)
			Comment << "\twaiting for WSS to freeze" << endl;
		else
		if (TheWssState == wssFrozen)
			Comment << "\tWSS frozen" << endl;

 		Comment << endc;
	}
}

void StatPhase::name(const String &aName) {
	theName = aName;
	if (!theName)
		theName = "";
	for (int i = 0; i < theRecs.count(); ++i)
		theRecs[i]->theName = theName;
}

OidGenStat &StatPhase::oidGenStat() {
	return theRecs[lgcCltSide]->theOidGen;
}

ErrorStat &StatPhase::errors(int logCat) {
	return theRecs[logCat]->theErrors;
}

void StatPhase::statsLogged(bool are) {
	logStats = are;
}

void StatPhase::wakeUp(const Alarm &alarm) {
	StatIntvl::wakeUp(alarm);
	// checkpoint() may lock the phase
	Assert(locked() || checkpoint() || locked());
}

void StatPhase::flush() {
	stop();

	if (logStats)
		storeAll(TheOLog, lgStatPhaseRec);
}

// the caller should not use the phase if checkpoint returns true
bool StatPhase::checkpoint()  {
	if (!locked() && theGoal.reached(*this)) {

		if (doSynchronize) {
			if (!readyToStop) {
				readyToStop = true; // must be before waitGroupCount()

				Comment(5) << "fyi: local phase `" << name()
					<< "' reached synchronization point" << endc;

				if (int rCount = TheStatPhaseSync.waitGroupCount()) {
					Comment(10) << "waiting for " << rCount 
						<< " more remote sync messages" << endc;
					lock(); // somebody needs to unlock us to move on
					return false;
				}
			}
		}

		stop();
		report();

		Assert(theMgr);
		StatPhaseMgr *mgr = theMgr;
		theMgr = 0;
		mgr->noteDone(this);
		return true;
	}
	return false;
}

void StatPhase::stop() {
	Assert(theIntvlStart >= 0);
	if (wasStopped) // already stopped
		return;
	wasStopped = true;

	if (theRptmstat)
		theRptmstat->stop(this);

	for (int d = 0; d < theDutWatchdogs.count(); ++d)
		theDutWatchdogs[d]->stop();

	setDuration(theIntvlStart);
	stopListen();
	TheAlarmClock.cancelAll(this);

	TheOLog << bege(lgStatPhaseEnd, lgcAll) << name() << ende;
}

StatIntvlRec &StatPhase::getRec(int cat) {
	Assert(0 < cat && cat < theRecs.count());
	StatIntvlRec &stats = *theRecs[cat];
	stats.theDuration = duration();
	return stats;
}

const StatIntvlRec &StatPhase::getRec(int cat) const {
	Assert(0 < cat && cat < theRecs.count());
	return *theRecs[cat];
}

// XXX: we probably should report _all_ active sides
void StatPhase::report() const {
	ostream &repAll = Comment(1) << "p-" << name();
	if (TheReportCat > 0)
		theRecs[TheReportCat]->linePrintAll(repAll, false);
	repAll << endc;

	if (TheReportCat > 0 && theRecs[TheReportCat]->theSslStat.active()) {
		ostream &repSsl = Comment(2) << "p-" << name();
		theRecs[TheReportCat]->linePrintSsl(repSsl, false);
		repSsl << endc;
	}
}

void StatPhase::reportCfg(ostream &os) const {
	os 
		<< setw(10) << name()
		<< ' ' << setw(8) << thePopulusFactor.beg()
		<< ' ' << setw(8) << thePopulusFactor.end()
		<< ' ' << setw(8) << theLoadFactor.beg()
		<< ' ' << setw(8) << theLoadFactor.end()
		<< ' ' << setw(8) << theRecurFactor.beg()
		<< ' ' << setw(8) << theRecurFactor.end()
		<< ' ' << setw(8) << theSpecialMsgFactor.beg()
		<< ' ' << setw(8) << theSpecialMsgFactor.end()
		<< ' ' << setw(5) << theGoal
		<< "\t" 
			<< (logStats ? "" : " !log")
			<< (waitWssFreeze ? " wwss" : "")
			<< (doSynchronize ? "" : " !sync")
		<< endl;
}


syntax highlighted by Code2HTML, v. 0.9.1