/* Web Polygraph http://www.web-polygraph.org/
* (C) 2003-2006 The Measurement Factory
* Licensed under the Apache License, Version 2.0 */
#include "base/polygraph.h"
#include <ctype.h>
#include "xstd/Clock.h"
#include "base/ObjId.h"
#include "base/RndPermut.h"
#include "base/Progress.h"
#include "base/polyLogCats.h"
#include "base/polyLogTags.h"
#include "runtime/Agent.h"
#include "runtime/ErrorMgr.h"
#include "runtime/SharedOpts.h"
#include "runtime/PolyOLog.h"
#include "runtime/httpHdrs.h"
#include "runtime/Xaction.h"
#include "runtime/polyBcastChannels.h"
#include "runtime/polyErrors.h"
#include "runtime/globals.h"
#include "csm/oid2Url.h"
int Xaction::TheSampleDebt = 0;
int Xaction::TheCount = 0;
void Xaction::reset() {
theConn = 0;
theId.clear();
theStartTime = theLifeTime = Time();
theRepSize.reset();
theReqSize = -1;
theAbortSize = -1;
theAbortCoord.reset();
theCheckAlg.reset();
theHttpStatus = RepHdr::scUnknown;
theOid.reset();
theLogCat = lgcAll;
theState = stNone;
}
void Xaction::start(Connection *aConn) {
Assert(aConn);
Should(!theConn || theConn == aConn);
theConn = aConn;
theId.create();
theStartTime = theConn->useStart();
Broadcast(TheXactBegChannel, this);
}
void Xaction::finish(Error err) {
Assert(theConn);
theConn->theWr.stop(this);
theConn->theRd.stop(this);
theLifeTime = TheClock - theStartTime;
if (err) {
theConn->bad(true);
if (needRetry()) {
Broadcast(TheXactRetrChannel, this);
} else {
// low level errors should be reported when they occur
// and propogated to us as "-1"; do not count/report them twice
if (err != errOther) {
if (ReportError(err) && TheOpts.theDumpFlags(dumpErr, dumpAny))
printMsg(theConn->theRdBuf);
}
countFailure();
}
} else {
countSuccess();
if (TheSampleDebt) {
TheSampleDebt--;
logStats();
}
if (TheOpts.theDumpFlags(dumpSum, dumpAny)) {
printXactLogEntry();
}
}
newState(stDone);
}
void Xaction::countSuccess() {
if (sslActive())
TheProgress.sslSuccess();
TheProgress.success();
Broadcast(TheXactEndChannel, this);
}
void Xaction::countFailure() {
// note: may be called by an Agent before the transaction started
// if, for example, launch failed
TheProgress.failure();
Broadcast(TheXactErrChannel, this);
}
void Xaction::newState(State aState) {
theState = aState;
}
// returns true iff the transaction should abort
bool Xaction::abortBeforeIo() const {
return theAbortSize == 0;
}
// returns true iff the transaction should abort
bool Xaction::abortAfterIo(Size size) {
if (!Should(theAbortSize != 0))
return false; // internal error, but be robust
if (theAbortSize < 0)
return false; // abort is disabled
if (size <= 0)
return false; // xactions will handle errors and eof on their own
if (theAbortSize <= size) {
theAbortSize = 0;
return true; // time to abort
}
theAbortSize -= size;
return false; // not yet
}
// returns true iff finish() was called
bool Xaction::abortIo(Connection::IoMethod m, Size *size) {
if (abortBeforeIo()) {
abortNow();
return true;
}
if (theAbortSize >= 0)
theConn->decMaxIoSize(theAbortSize);
const Size sz = (theConn->*m)();
if (size)
*size = sz;
if (abortAfterIo(sz)) {
abortNow();
return true;
}
return false;
}
void Xaction::abortNow() {
theConn->bad(true); // so that the manager will close
theOid.aborted(true);
finish(0); // abort is not an error, config asked for it
}
RndDistr *Xaction::seedOidDistr(RndDistr *raw, int globSeed) {
if (raw) {
const int seed = GlbPermut(theOid.hash(), globSeed);
raw->rndGen()->seed(seed);
}
return raw;
}
void Xaction::logStats() {
Assert(TheSmplOLog);
(*TheSmplOLog) << bege(lgXactStats, logCat());
logStats(*TheSmplOLog);
(*TheSmplOLog) << ende;
}
void Xaction::logStats(OLog &ol) const {
ol
<< theId << theConn->seqId()
<< theStartTime
<< (int)theEnqueTime.msec()
<< (int)theLifeTime.msec()
<< theOid
<< theRepSize
<< theHttpStatus
;
}
void Xaction::printMsg(const IOBuf &buf) const {
printMsg(buf, buf.contSize());
}
void Xaction::printMsg(const IOBuf &buf, Size maxSize) const {
if (buf.contSize() <= 0)
printMsg(0, 0);
else
printMsg(buf.content(), maxSize);
}
inline
bool myIsPrint(char c) {
return (c && isprint(c)) || c == '\r' || c == '\n';
}
// print buf content up to the first non-printable character,
// and no more than maxSize characters
void Xaction::printMsg(const char *buf, Size maxSize) const {
cout << TheClock << '#';
if (theOid) {
Oid2Url(theOid, cout << " obj: ");
theOid.printFlags(cout << " flags: ");
}
cout << " size: " << (int)theRepSize.actual() << '/' << (int)theRepSize.expected();
if (theId)
cout << " xact: " << theId;
cout << endl;
if (!buf) {
cout << "[no data to dump]" << endl;
return;
}
if (maxSize > TheOpts.theDumpSize)
maxSize = TheOpts.theDumpSize;
const char *p = buf;
while ((p-buf) < maxSize && myIsPrint(*p)) ++p;
if (p > buf) {
cout.write(buf, p - buf);
cout << endl;
}
}
void Xaction::printXactLogEntry() const {
// XXX: should dump to a file, not cout and use squid or common format+?
cout << "xact: " << TheClock << ' ';
cout << ' ' << theLifeTime.msec();
cout << ' ' << ((Xaction*)this)->owner()->host(); // XXX: ugly cast
cout << " -/" << theHttpStatus;
cout << ' ' << (int)theRepSize.actual();
cout << " -";
if (theOid)
Oid2Url(theOid, cout << ' ');
else
cout << " -";
if (theCheckAlg.sum().set())
theCheckAlg.sum().print(cout << ' ');
else
cout << " -";
cout << endl;
}
syntax highlighted by Code2HTML, v. 0.9.1