/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include #include "xstd/Clock.h" #include "base/ObjId.h" #include "base/RndPermut.h" #include "base/Progress.h" #include "base/polyLogCats.h" #include "base/polyLogTags.h" #include "runtime/Agent.h" #include "runtime/ErrorMgr.h" #include "runtime/SharedOpts.h" #include "runtime/PolyOLog.h" #include "runtime/httpHdrs.h" #include "runtime/Xaction.h" #include "runtime/polyBcastChannels.h" #include "runtime/polyErrors.h" #include "runtime/globals.h" #include "csm/oid2Url.h" int Xaction::TheSampleDebt = 0; int Xaction::TheCount = 0; void Xaction::reset() { theConn = 0; theId.clear(); theStartTime = theLifeTime = Time(); theRepSize.reset(); theReqSize = -1; theAbortSize = -1; theAbortCoord.reset(); theCheckAlg.reset(); theHttpStatus = RepHdr::scUnknown; theOid.reset(); theLogCat = lgcAll; theState = stNone; } void Xaction::start(Connection *aConn) { Assert(aConn); Should(!theConn || theConn == aConn); theConn = aConn; theId.create(); theStartTime = theConn->useStart(); Broadcast(TheXactBegChannel, this); } void Xaction::finish(Error err) { Assert(theConn); theConn->theWr.stop(this); theConn->theRd.stop(this); theLifeTime = TheClock - theStartTime; if (err) { theConn->bad(true); if (needRetry()) { Broadcast(TheXactRetrChannel, this); } else { // low level errors should be reported when they occur // and propogated to us as "-1"; do not count/report them twice if (err != errOther) { if (ReportError(err) && TheOpts.theDumpFlags(dumpErr, dumpAny)) printMsg(theConn->theRdBuf); } countFailure(); } } else { countSuccess(); if (TheSampleDebt) { TheSampleDebt--; logStats(); } if (TheOpts.theDumpFlags(dumpSum, dumpAny)) { printXactLogEntry(); } } newState(stDone); } void Xaction::countSuccess() { if (sslActive()) TheProgress.sslSuccess(); TheProgress.success(); Broadcast(TheXactEndChannel, this); } void Xaction::countFailure() { // note: may be called by an Agent before the transaction started // if, for example, launch failed TheProgress.failure(); Broadcast(TheXactErrChannel, this); } void Xaction::newState(State aState) { theState = aState; } // returns true iff the transaction should abort bool Xaction::abortBeforeIo() const { return theAbortSize == 0; } // returns true iff the transaction should abort bool Xaction::abortAfterIo(Size size) { if (!Should(theAbortSize != 0)) return false; // internal error, but be robust if (theAbortSize < 0) return false; // abort is disabled if (size <= 0) return false; // xactions will handle errors and eof on their own if (theAbortSize <= size) { theAbortSize = 0; return true; // time to abort } theAbortSize -= size; return false; // not yet } // returns true iff finish() was called bool Xaction::abortIo(Connection::IoMethod m, Size *size) { if (abortBeforeIo()) { abortNow(); return true; } if (theAbortSize >= 0) theConn->decMaxIoSize(theAbortSize); const Size sz = (theConn->*m)(); if (size) *size = sz; if (abortAfterIo(sz)) { abortNow(); return true; } return false; } void Xaction::abortNow() { theConn->bad(true); // so that the manager will close theOid.aborted(true); finish(0); // abort is not an error, config asked for it } RndDistr *Xaction::seedOidDistr(RndDistr *raw, int globSeed) { if (raw) { const int seed = GlbPermut(theOid.hash(), globSeed); raw->rndGen()->seed(seed); } return raw; } void Xaction::logStats() { Assert(TheSmplOLog); (*TheSmplOLog) << bege(lgXactStats, logCat()); logStats(*TheSmplOLog); (*TheSmplOLog) << ende; } void Xaction::logStats(OLog &ol) const { ol << theId << theConn->seqId() << theStartTime << (int)theEnqueTime.msec() << (int)theLifeTime.msec() << theOid << theRepSize << theHttpStatus ; } void Xaction::printMsg(const IOBuf &buf) const { printMsg(buf, buf.contSize()); } void Xaction::printMsg(const IOBuf &buf, Size maxSize) const { if (buf.contSize() <= 0) printMsg(0, 0); else printMsg(buf.content(), maxSize); } inline bool myIsPrint(char c) { return (c && isprint(c)) || c == '\r' || c == '\n'; } // print buf content up to the first non-printable character, // and no more than maxSize characters void Xaction::printMsg(const char *buf, Size maxSize) const { cout << TheClock << '#'; if (theOid) { Oid2Url(theOid, cout << " obj: "); theOid.printFlags(cout << " flags: "); } cout << " size: " << (int)theRepSize.actual() << '/' << (int)theRepSize.expected(); if (theId) cout << " xact: " << theId; cout << endl; if (!buf) { cout << "[no data to dump]" << endl; return; } if (maxSize > TheOpts.theDumpSize) maxSize = TheOpts.theDumpSize; const char *p = buf; while ((p-buf) < maxSize && myIsPrint(*p)) ++p; if (p > buf) { cout.write(buf, p - buf); cout << endl; } } void Xaction::printXactLogEntry() const { // XXX: should dump to a file, not cout and use squid or common format+? cout << "xact: " << TheClock << ' '; cout << ' ' << theLifeTime.msec(); cout << ' ' << ((Xaction*)this)->owner()->host(); // XXX: ugly cast cout << " -/" << theHttpStatus; cout << ' ' << (int)theRepSize.actual(); cout << " -"; if (theOid) Oid2Url(theOid, cout << ' '); else cout << " -"; if (theCheckAlg.sum().set()) theCheckAlg.sum().print(cout << ' '); else cout << " -"; cout << endl; }