/* Web Polygraph http://www.web-polygraph.org/
* (C) 2003-2006 The Measurement Factory
* Licensed under the Apache License, Version 2.0 */
#include "base/polygraph.h"
#include "xstd/h/math.h"
#include "xstd/h/iomanip.h"
#include "xstd/Clock.h"
#include "base/Progress.h"
#include "base/polyLogCats.h"
#include "base/polyLogTags.h"
#include "pgl/PglCodeSym.h"
#include "pgl/StatsSampleCfg.h"
#include "pgl/StatsSampleSym.h"
#include "pgl/GoalSym.h"
#include "pgl/PhaseSym.h"
#include "runtime/Connection.h"
#include "runtime/Goal.h"
#include "runtime/httpHdrs.h"
#include "runtime/Xaction.h"
#include "runtime/IcpXaction.h"
#include "runtime/LogComment.h"
#include "runtime/PageInfo.h"
#include "runtime/SharedOpts.h"
#include "runtime/PolyOLog.h"
#include "runtime/Rptmstat.h"
#include "runtime/Script.h"
#include "runtime/DutWatchdog.h"
#include "runtime/StatsSampleMgr.h"
#include "runtime/StatPhaseMgr.h"
#include "runtime/StatPhaseSync.h"
#include "runtime/StatPhase.h"
#include "runtime/globals.h"
#include "runtime/polyErrors.h"
#include "runtime/polyBcastChannels.h"
class CondCallSym;
// XXX: lots of client-side specific logic; split
static enum { wssIgnore, wssWaitFreeze, wssFrozen } TheWssState = wssIgnore;
StatPhase::StatPhase(): theRecs(lgcEnd), theMgr(0),
theRptmstat(0), theScript(0),
thePopulusFactor(this, "populus"), theLoadFactor(this, "load"),
theRecurFactor(this, "recurrence"),
theSpecialMsgFactor(this, "special message"),
wasUsed(false), logStats(true), waitWssFreeze(false), doSynchronize(true),
readyToStop(false), wasStopped(false), theLockCount(0) {
for (int i = 0; i < theRecs.capacity(); ++i)
theRecs.push(new StatPhaseRec);
theChannels.append(TheInfoChannel);
}
StatPhase::~StatPhase() {
delete theRptmstat; theRptmstat = 0;
delete theScript; theScript = 0;
while (theRecs.count()) delete theRecs.pop();
while (theDutWatchdogs.count()) delete theDutWatchdogs.pop();
}
void StatPhase::configure(const PhaseSym *cfg, const StatPhase *prevPh) {
Assert(cfg);
Assert(cfg->goal());
name(cfg->name());
theGoal.configure(*cfg->goal());
for (int i = 0; i < theRecs.count(); ++i)
theRecs[i]->theGoal = theGoal;
cfg->logStats(logStats);
cfg->waitWssFreeze(waitWssFreeze);
cfg->synchronize(doSynchronize); // true by default
double b = -1;
double e = -1;
cfg->populusFactorBeg(b);
cfg->populusFactorEnd(e);
thePopulusFactor.configure(b, e, prevPh ? &prevPh->thePopulusFactor : 0);
checkFactor(thePopulusFactor, "populus");
b = -1;
e = -1;
cfg->loadFactorBeg(b);
cfg->loadFactorEnd(e);
theLoadFactor.configure(b, e, prevPh ? &prevPh->theLoadFactor : 0);
checkFactor(theLoadFactor, "load");
b = -1;
e = -1;
cfg->recurFactorBeg(b);
cfg->recurFactorEnd(e);
theRecurFactor.configure(b, e, prevPh ? &prevPh->theRecurFactor : 0);
checkFactor(theRecurFactor, "recurrence");
b = -1;
e = -1;
cfg->specialMsgFactorBeg(b);
cfg->specialMsgFactorEnd(e);
theSpecialMsgFactor.configure(b, e, prevPh ? &prevPh->theSpecialMsgFactor : 0);
checkFactor(theSpecialMsgFactor, "special_req");
if (RptmstatSym *s = cfg->rptmstat()) {
theRptmstat = new Rptmstat;
theRptmstat->configure(*s);
}
if (const CodeSym *code = cfg->script())
theScript = new Script(*code);
configureSamples(cfg);
}
void StatPhase::configureSamples(const PhaseSym *cfg) {
Array<const StatsSampleSym*> ssyms;
if (cfg->statsSamples(ssyms)) {
for (int i = 0; i < ssyms.count(); ++i) {
const StatsSampleSym &ssym = *ssyms[i];
StatsSampleCfg cfg;
// XXX: append counter to phase names
cfg.name = ssym.name() ? ssym.name() : name();
Assert(ssym.capacity(cfg.capacity));
cfg.start = ssym.start();
if (cfg.start >= 0) {
TheStatsSampleMgr.addSample(cfg);
} else {
theSamples.append(new StatsSampleCfg(cfg));
TheStatsSampleMgr.willAddSample();
}
}
}
// no support for multiple pending samples for now
Assert(theSamples.count() <= 1);
}
void StatPhase::addWatchdog(DutWatchdog *dog) {
theDutWatchdogs.append(dog);
}
void StatPhase::checkFactor(const TransFactor &f, const String &label) const {
// varying load is possible only if goal is specified
const bool varyF = f.beg() != f.end();
if (!theGoal && varyF) {
cerr << "phase named `" << theName
<< "' has " << label
<< " factors but has no positive goal" << endl;
exit(-2);
} else
if (f.flat() && varyF) {
cerr << "internal error: no change coefficient for " << label
<< " factors for the phase named '" << theName << "'" << endl;
exit(-2);
}
}
void StatPhase::lock() {
theLockCount++;
}
void StatPhase::unlock() {
if (locked()) {
theLockCount--;
if (!locked())
checkpoint();
} else {
Comment(0) << "internal error: attempt to unlock an unlocked phase (salvaged)" << endc;
}
}
bool StatPhase::locked() const {
return theLockCount > 0;
}
Time StatPhase::duration() const {
return TheClock.time() >= theIntvlStart ?
TheClock - theIntvlStart : Time(0,0);
}
int StatPhase::xactCnt() const {
int count = 0;
for (int i = 0; i < theRecs.count(); ++i)
count += theRecs[i]->xactCnt();
return count;
}
BigSize StatPhase::fillSz() const {
BigSize fill(0);
for (int i = 0; i < theRecs.count(); ++i)
fill += theRecs[i]->totFillSize();
return fill;
}
int StatPhase::fillCnt() const {
int fill = 0;
for (int i = 0; i < theRecs.count(); ++i)
fill += theRecs[i]->totFillCount();
return fill;
}
int StatPhase::xactErrCnt() const {
int count = 0;
for (int i = 0; i < theRecs.count(); ++i)
count += theRecs[i]->theXactErrCnt;
return count;
}
void StatPhase::start(StatPhaseMgr *aMgr, const StatPhase *prevPhase) {
Assert(!theMgr);
theMgr = aMgr;
if (prevPhase) {
for (int i = 0; i < theRecs.count(); ++i)
theRecs[i]->keepLevels(*prevPhase->theRecs[i]);
}
// XXX: will not lock if fill phase finishes before we get ieWssFill
if (waitWssFreeze && TheWssState == wssWaitFreeze)
lock();
if (theRptmstat)
theRptmstat->start(this);
if (theScript)
theScript->run();
TheOLog << bege(lgStatPhaseBeg, lgcAll) << name() << ende;
startListen();
theIntvlStart = TheClock;
for (int i = 0; i < theSamples.count(); ++i) {
StatsSampleCfg &cfg = *theSamples[i];
if (cfg.start < 0) {
cfg.start = theIntvlStart;
if (theGoal.duration() > 0)
cfg.start += theGoal.duration()/2; // middle of the phase
cfg.start -= Clock::TheStartTime;
}
TheStatsSampleMgr.addSample(cfg);
}
// manager should be careful with immediate "returns"
if (!checkpoint()) {
// not an immediate exit
wasUsed = true;
if (theGoal.duration() > 0)
sleepFor(theGoal.duration());
}
}
void StatPhase::noteConnEvent(BcastChannel *ch, const Connection *c) {
if (ch == TheConnCloseChannel && !c->bad()) {
StatPhaseRec &rec = *theRecs[c->logCat()];
const int depth = c->useLevelMax();
if (depth > 1)
rec.theConnPipelineDepths.record(depth);
const Time ttl = TheClock - c->openTime();
rec.theConnClose.record(c->closeKind(), ttl, c->useCnt());
}
StatIntvl::noteConnEvent(ch, c);
}
void StatPhase::noteXactEvent(BcastChannel *ch, const Xaction *x) {
if (ch == TheXactEndChannel) {
StatPhaseRec &rec = *theRecs[x->logCat()];
const ObjId &oid = x->oid();
const Time repTime = x->lifeTime();
const Size repSize = x->repSize().actual();
// stats must be recorded in only one category for totals to work!
if (oid.basic()) {
rec.theBasicXacts.record(repTime, repSize, oid.hit());
rec.theContType.record(oid.foreignUrl() ? 0 : oid.type(), repSize);
} else
if (oid.repToRedir())
rec.theRepToRedirXacts.record(repTime, repSize);
else
if (oid.rediredReq())
rec.theRediredReqXacts.record(repTime, repSize);
else
if (oid.imsAny())
rec.theImsXacts.record(repTime, repSize, x->httpStatus() == RepHdr::sc200_OK);
else
if (oid.reload())
rec.theReloadXacts.record(repTime, repSize);
else
if (oid.head())
rec.theHeadXacts.record(repTime, repSize);
else
if (oid.post())
rec.thePostXacts.record(repTime, repSize);
else
if (oid.put())
rec.thePutXacts.record(repTime, repSize);
else
if (oid.aborted())
; // do nothing, StatIntvl has enough(?) stats for these
else {
Assert(false); // all categories should be accounted for
}
}
StatIntvl::noteXactEvent(ch, x);
}
void StatPhase::noteIcpXactEvent(BcastChannel *ch, const IcpXaction *x) {
if (ch == TheIcpXactEndChannel) {
StatPhaseRec &rec = *theRecs[x->logCat()];
if (!x->timedout())
rec.theIcpXacts.record(x->lifeTime(), x->repSize(), x->hit());
} else {
Assert(false);
}
StatIntvl::noteIcpXactEvent(ch, x);
}
void StatPhase::notePageEvent(BcastChannel *ch, const PageInfo *p) {
if (ch == ThePageEndChannel) {
StatPhaseRec &rec = *theRecs[lgcCltSide];
rec.thePageHist.record(p->lifeTime, p->size);
} else {
Assert(false);
}
StatIntvl::notePageEvent(ch, p);
}
void StatPhase::noteInfoEvent(BcastChannel *ch, InfoEvent ev) {
Assert(ch == TheInfoChannel);
if (ev == ieWssFill) {
if (waitWssFreeze)
lock();
TheWssState = wssWaitFreeze;
} else
if (ev == ieWssFreeze) {
if (waitWssFreeze)
unlock();
TheWssState = wssFrozen;
} else
if (ev == ieReportProgress) {
Comment(7) << "fyi: phase progress:" << endl;
theGoal.reportProgress(Comment, *this);
if (TheWssState == wssWaitFreeze)
Comment << "\twaiting for WSS to freeze" << endl;
else
if (TheWssState == wssFrozen)
Comment << "\tWSS frozen" << endl;
Comment << endc;
}
}
void StatPhase::name(const String &aName) {
theName = aName;
if (!theName)
theName = "";
for (int i = 0; i < theRecs.count(); ++i)
theRecs[i]->theName = theName;
}
OidGenStat &StatPhase::oidGenStat() {
return theRecs[lgcCltSide]->theOidGen;
}
ErrorStat &StatPhase::errors(int logCat) {
return theRecs[logCat]->theErrors;
}
void StatPhase::statsLogged(bool are) {
logStats = are;
}
void StatPhase::wakeUp(const Alarm &alarm) {
StatIntvl::wakeUp(alarm);
// checkpoint() may lock the phase
Assert(locked() || checkpoint() || locked());
}
void StatPhase::flush() {
stop();
if (logStats)
storeAll(TheOLog, lgStatPhaseRec);
}
// the caller should not use the phase if checkpoint returns true
bool StatPhase::checkpoint() {
if (!locked() && theGoal.reached(*this)) {
if (doSynchronize) {
if (!readyToStop) {
readyToStop = true; // must be before waitGroupCount()
Comment(5) << "fyi: local phase `" << name()
<< "' reached synchronization point" << endc;
if (int rCount = TheStatPhaseSync.waitGroupCount()) {
Comment(10) << "waiting for " << rCount
<< " more remote sync messages" << endc;
lock(); // somebody needs to unlock us to move on
return false;
}
}
}
stop();
report();
Assert(theMgr);
StatPhaseMgr *mgr = theMgr;
theMgr = 0;
mgr->noteDone(this);
return true;
}
return false;
}
void StatPhase::stop() {
Assert(theIntvlStart >= 0);
if (wasStopped) // already stopped
return;
wasStopped = true;
if (theRptmstat)
theRptmstat->stop(this);
for (int d = 0; d < theDutWatchdogs.count(); ++d)
theDutWatchdogs[d]->stop();
setDuration(theIntvlStart);
stopListen();
TheAlarmClock.cancelAll(this);
TheOLog << bege(lgStatPhaseEnd, lgcAll) << name() << ende;
}
StatIntvlRec &StatPhase::getRec(int cat) {
Assert(0 < cat && cat < theRecs.count());
StatIntvlRec &stats = *theRecs[cat];
stats.theDuration = duration();
return stats;
}
const StatIntvlRec &StatPhase::getRec(int cat) const {
Assert(0 < cat && cat < theRecs.count());
return *theRecs[cat];
}
// XXX: we probably should report _all_ active sides
void StatPhase::report() const {
ostream &repAll = Comment(1) << "p-" << name();
if (TheReportCat > 0)
theRecs[TheReportCat]->linePrintAll(repAll, false);
repAll << endc;
if (TheReportCat > 0 && theRecs[TheReportCat]->theSslStat.active()) {
ostream &repSsl = Comment(2) << "p-" << name();
theRecs[TheReportCat]->linePrintSsl(repSsl, false);
repSsl << endc;
}
}
void StatPhase::reportCfg(ostream &os) const {
os
<< setw(10) << name()
<< ' ' << setw(8) << thePopulusFactor.beg()
<< ' ' << setw(8) << thePopulusFactor.end()
<< ' ' << setw(8) << theLoadFactor.beg()
<< ' ' << setw(8) << theLoadFactor.end()
<< ' ' << setw(8) << theRecurFactor.beg()
<< ' ' << setw(8) << theRecurFactor.end()
<< ' ' << setw(8) << theSpecialMsgFactor.beg()
<< ' ' << setw(8) << theSpecialMsgFactor.end()
<< ' ' << setw(5) << theGoal
<< "\t"
<< (logStats ? "" : " !log")
<< (waitWssFreeze ? " wwss" : "")
<< (doSynchronize ? "" : " !sync")
<< endl;
}
syntax highlighted by Code2HTML, v. 0.9.1