/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "xstd/h/math.h" #include "xstd/h/iomanip.h" #include "xstd/Clock.h" #include "base/Progress.h" #include "base/polyLogCats.h" #include "base/polyLogTags.h" #include "pgl/PglCodeSym.h" #include "pgl/StatsSampleCfg.h" #include "pgl/StatsSampleSym.h" #include "pgl/GoalSym.h" #include "pgl/PhaseSym.h" #include "runtime/Connection.h" #include "runtime/Goal.h" #include "runtime/httpHdrs.h" #include "runtime/Xaction.h" #include "runtime/IcpXaction.h" #include "runtime/LogComment.h" #include "runtime/PageInfo.h" #include "runtime/SharedOpts.h" #include "runtime/PolyOLog.h" #include "runtime/Rptmstat.h" #include "runtime/Script.h" #include "runtime/DutWatchdog.h" #include "runtime/StatsSampleMgr.h" #include "runtime/StatPhaseMgr.h" #include "runtime/StatPhaseSync.h" #include "runtime/StatPhase.h" #include "runtime/globals.h" #include "runtime/polyErrors.h" #include "runtime/polyBcastChannels.h" class CondCallSym; // XXX: lots of client-side specific logic; split static enum { wssIgnore, wssWaitFreeze, wssFrozen } TheWssState = wssIgnore; StatPhase::StatPhase(): theRecs(lgcEnd), theMgr(0), theRptmstat(0), theScript(0), thePopulusFactor(this, "populus"), theLoadFactor(this, "load"), theRecurFactor(this, "recurrence"), theSpecialMsgFactor(this, "special message"), wasUsed(false), logStats(true), waitWssFreeze(false), doSynchronize(true), readyToStop(false), wasStopped(false), theLockCount(0) { for (int i = 0; i < theRecs.capacity(); ++i) theRecs.push(new StatPhaseRec); theChannels.append(TheInfoChannel); } StatPhase::~StatPhase() { delete theRptmstat; theRptmstat = 0; delete theScript; theScript = 0; while (theRecs.count()) delete theRecs.pop(); while (theDutWatchdogs.count()) delete theDutWatchdogs.pop(); } void StatPhase::configure(const PhaseSym *cfg, const StatPhase *prevPh) { Assert(cfg); Assert(cfg->goal()); name(cfg->name()); theGoal.configure(*cfg->goal()); for (int i = 0; i < theRecs.count(); ++i) theRecs[i]->theGoal = theGoal; cfg->logStats(logStats); cfg->waitWssFreeze(waitWssFreeze); cfg->synchronize(doSynchronize); // true by default double b = -1; double e = -1; cfg->populusFactorBeg(b); cfg->populusFactorEnd(e); thePopulusFactor.configure(b, e, prevPh ? &prevPh->thePopulusFactor : 0); checkFactor(thePopulusFactor, "populus"); b = -1; e = -1; cfg->loadFactorBeg(b); cfg->loadFactorEnd(e); theLoadFactor.configure(b, e, prevPh ? &prevPh->theLoadFactor : 0); checkFactor(theLoadFactor, "load"); b = -1; e = -1; cfg->recurFactorBeg(b); cfg->recurFactorEnd(e); theRecurFactor.configure(b, e, prevPh ? &prevPh->theRecurFactor : 0); checkFactor(theRecurFactor, "recurrence"); b = -1; e = -1; cfg->specialMsgFactorBeg(b); cfg->specialMsgFactorEnd(e); theSpecialMsgFactor.configure(b, e, prevPh ? &prevPh->theSpecialMsgFactor : 0); checkFactor(theSpecialMsgFactor, "special_req"); if (RptmstatSym *s = cfg->rptmstat()) { theRptmstat = new Rptmstat; theRptmstat->configure(*s); } if (const CodeSym *code = cfg->script()) theScript = new Script(*code); configureSamples(cfg); } void StatPhase::configureSamples(const PhaseSym *cfg) { Array ssyms; if (cfg->statsSamples(ssyms)) { for (int i = 0; i < ssyms.count(); ++i) { const StatsSampleSym &ssym = *ssyms[i]; StatsSampleCfg cfg; // XXX: append counter to phase names cfg.name = ssym.name() ? ssym.name() : name(); Assert(ssym.capacity(cfg.capacity)); cfg.start = ssym.start(); if (cfg.start >= 0) { TheStatsSampleMgr.addSample(cfg); } else { theSamples.append(new StatsSampleCfg(cfg)); TheStatsSampleMgr.willAddSample(); } } } // no support for multiple pending samples for now Assert(theSamples.count() <= 1); } void StatPhase::addWatchdog(DutWatchdog *dog) { theDutWatchdogs.append(dog); } void StatPhase::checkFactor(const TransFactor &f, const String &label) const { // varying load is possible only if goal is specified const bool varyF = f.beg() != f.end(); if (!theGoal && varyF) { cerr << "phase named `" << theName << "' has " << label << " factors but has no positive goal" << endl; exit(-2); } else if (f.flat() && varyF) { cerr << "internal error: no change coefficient for " << label << " factors for the phase named '" << theName << "'" << endl; exit(-2); } } void StatPhase::lock() { theLockCount++; } void StatPhase::unlock() { if (locked()) { theLockCount--; if (!locked()) checkpoint(); } else { Comment(0) << "internal error: attempt to unlock an unlocked phase (salvaged)" << endc; } } bool StatPhase::locked() const { return theLockCount > 0; } Time StatPhase::duration() const { return TheClock.time() >= theIntvlStart ? TheClock - theIntvlStart : Time(0,0); } int StatPhase::xactCnt() const { int count = 0; for (int i = 0; i < theRecs.count(); ++i) count += theRecs[i]->xactCnt(); return count; } BigSize StatPhase::fillSz() const { BigSize fill(0); for (int i = 0; i < theRecs.count(); ++i) fill += theRecs[i]->totFillSize(); return fill; } int StatPhase::fillCnt() const { int fill = 0; for (int i = 0; i < theRecs.count(); ++i) fill += theRecs[i]->totFillCount(); return fill; } int StatPhase::xactErrCnt() const { int count = 0; for (int i = 0; i < theRecs.count(); ++i) count += theRecs[i]->theXactErrCnt; return count; } void StatPhase::start(StatPhaseMgr *aMgr, const StatPhase *prevPhase) { Assert(!theMgr); theMgr = aMgr; if (prevPhase) { for (int i = 0; i < theRecs.count(); ++i) theRecs[i]->keepLevels(*prevPhase->theRecs[i]); } // XXX: will not lock if fill phase finishes before we get ieWssFill if (waitWssFreeze && TheWssState == wssWaitFreeze) lock(); if (theRptmstat) theRptmstat->start(this); if (theScript) theScript->run(); TheOLog << bege(lgStatPhaseBeg, lgcAll) << name() << ende; startListen(); theIntvlStart = TheClock; for (int i = 0; i < theSamples.count(); ++i) { StatsSampleCfg &cfg = *theSamples[i]; if (cfg.start < 0) { cfg.start = theIntvlStart; if (theGoal.duration() > 0) cfg.start += theGoal.duration()/2; // middle of the phase cfg.start -= Clock::TheStartTime; } TheStatsSampleMgr.addSample(cfg); } // manager should be careful with immediate "returns" if (!checkpoint()) { // not an immediate exit wasUsed = true; if (theGoal.duration() > 0) sleepFor(theGoal.duration()); } } void StatPhase::noteConnEvent(BcastChannel *ch, const Connection *c) { if (ch == TheConnCloseChannel && !c->bad()) { StatPhaseRec &rec = *theRecs[c->logCat()]; const int depth = c->useLevelMax(); if (depth > 1) rec.theConnPipelineDepths.record(depth); const Time ttl = TheClock - c->openTime(); rec.theConnClose.record(c->closeKind(), ttl, c->useCnt()); } StatIntvl::noteConnEvent(ch, c); } void StatPhase::noteXactEvent(BcastChannel *ch, const Xaction *x) { if (ch == TheXactEndChannel) { StatPhaseRec &rec = *theRecs[x->logCat()]; const ObjId &oid = x->oid(); const Time repTime = x->lifeTime(); const Size repSize = x->repSize().actual(); // stats must be recorded in only one category for totals to work! if (oid.basic()) { rec.theBasicXacts.record(repTime, repSize, oid.hit()); rec.theContType.record(oid.foreignUrl() ? 0 : oid.type(), repSize); } else if (oid.repToRedir()) rec.theRepToRedirXacts.record(repTime, repSize); else if (oid.rediredReq()) rec.theRediredReqXacts.record(repTime, repSize); else if (oid.imsAny()) rec.theImsXacts.record(repTime, repSize, x->httpStatus() == RepHdr::sc200_OK); else if (oid.reload()) rec.theReloadXacts.record(repTime, repSize); else if (oid.head()) rec.theHeadXacts.record(repTime, repSize); else if (oid.post()) rec.thePostXacts.record(repTime, repSize); else if (oid.put()) rec.thePutXacts.record(repTime, repSize); else if (oid.aborted()) ; // do nothing, StatIntvl has enough(?) stats for these else { Assert(false); // all categories should be accounted for } } StatIntvl::noteXactEvent(ch, x); } void StatPhase::noteIcpXactEvent(BcastChannel *ch, const IcpXaction *x) { if (ch == TheIcpXactEndChannel) { StatPhaseRec &rec = *theRecs[x->logCat()]; if (!x->timedout()) rec.theIcpXacts.record(x->lifeTime(), x->repSize(), x->hit()); } else { Assert(false); } StatIntvl::noteIcpXactEvent(ch, x); } void StatPhase::notePageEvent(BcastChannel *ch, const PageInfo *p) { if (ch == ThePageEndChannel) { StatPhaseRec &rec = *theRecs[lgcCltSide]; rec.thePageHist.record(p->lifeTime, p->size); } else { Assert(false); } StatIntvl::notePageEvent(ch, p); } void StatPhase::noteInfoEvent(BcastChannel *ch, InfoEvent ev) { Assert(ch == TheInfoChannel); if (ev == ieWssFill) { if (waitWssFreeze) lock(); TheWssState = wssWaitFreeze; } else if (ev == ieWssFreeze) { if (waitWssFreeze) unlock(); TheWssState = wssFrozen; } else if (ev == ieReportProgress) { Comment(7) << "fyi: phase progress:" << endl; theGoal.reportProgress(Comment, *this); if (TheWssState == wssWaitFreeze) Comment << "\twaiting for WSS to freeze" << endl; else if (TheWssState == wssFrozen) Comment << "\tWSS frozen" << endl; Comment << endc; } } void StatPhase::name(const String &aName) { theName = aName; if (!theName) theName = ""; for (int i = 0; i < theRecs.count(); ++i) theRecs[i]->theName = theName; } OidGenStat &StatPhase::oidGenStat() { return theRecs[lgcCltSide]->theOidGen; } ErrorStat &StatPhase::errors(int logCat) { return theRecs[logCat]->theErrors; } void StatPhase::statsLogged(bool are) { logStats = are; } void StatPhase::wakeUp(const Alarm &alarm) { StatIntvl::wakeUp(alarm); // checkpoint() may lock the phase Assert(locked() || checkpoint() || locked()); } void StatPhase::flush() { stop(); if (logStats) storeAll(TheOLog, lgStatPhaseRec); } // the caller should not use the phase if checkpoint returns true bool StatPhase::checkpoint() { if (!locked() && theGoal.reached(*this)) { if (doSynchronize) { if (!readyToStop) { readyToStop = true; // must be before waitGroupCount() Comment(5) << "fyi: local phase `" << name() << "' reached synchronization point" << endc; if (int rCount = TheStatPhaseSync.waitGroupCount()) { Comment(10) << "waiting for " << rCount << " more remote sync messages" << endc; lock(); // somebody needs to unlock us to move on return false; } } } stop(); report(); Assert(theMgr); StatPhaseMgr *mgr = theMgr; theMgr = 0; mgr->noteDone(this); return true; } return false; } void StatPhase::stop() { Assert(theIntvlStart >= 0); if (wasStopped) // already stopped return; wasStopped = true; if (theRptmstat) theRptmstat->stop(this); for (int d = 0; d < theDutWatchdogs.count(); ++d) theDutWatchdogs[d]->stop(); setDuration(theIntvlStart); stopListen(); TheAlarmClock.cancelAll(this); TheOLog << bege(lgStatPhaseEnd, lgcAll) << name() << ende; } StatIntvlRec &StatPhase::getRec(int cat) { Assert(0 < cat && cat < theRecs.count()); StatIntvlRec &stats = *theRecs[cat]; stats.theDuration = duration(); return stats; } const StatIntvlRec &StatPhase::getRec(int cat) const { Assert(0 < cat && cat < theRecs.count()); return *theRecs[cat]; } // XXX: we probably should report _all_ active sides void StatPhase::report() const { ostream &repAll = Comment(1) << "p-" << name(); if (TheReportCat > 0) theRecs[TheReportCat]->linePrintAll(repAll, false); repAll << endc; if (TheReportCat > 0 && theRecs[TheReportCat]->theSslStat.active()) { ostream &repSsl = Comment(2) << "p-" << name(); theRecs[TheReportCat]->linePrintSsl(repSsl, false); repSsl << endc; } } void StatPhase::reportCfg(ostream &os) const { os << setw(10) << name() << ' ' << setw(8) << thePopulusFactor.beg() << ' ' << setw(8) << thePopulusFactor.end() << ' ' << setw(8) << theLoadFactor.beg() << ' ' << setw(8) << theLoadFactor.end() << ' ' << setw(8) << theRecurFactor.beg() << ' ' << setw(8) << theRecurFactor.end() << ' ' << setw(8) << theSpecialMsgFactor.beg() << ' ' << setw(8) << theSpecialMsgFactor.end() << ' ' << setw(5) << theGoal << "\t" << (logStats ? "" : " !log") << (waitWssFreeze ? " wwss" : "") << (doSynchronize ? "" : " !sync") << endl; }