/* Web Polygraph       http://www.web-polygraph.org/
 * (C) 2003-2006 The Measurement Factory
 * Licensed under the Apache License, Version 2.0 */

#include "base/polygraph.h"

#include <ctype.h>

#include "xstd/Clock.h"
#include "base/ObjId.h"
#include "base/RndPermut.h"
#include "base/Progress.h"
#include "base/polyLogCats.h"
#include "base/polyLogTags.h"
#include "runtime/Agent.h"
#include "runtime/ErrorMgr.h"
#include "runtime/SharedOpts.h"
#include "runtime/PolyOLog.h"
#include "runtime/httpHdrs.h"
#include "runtime/Xaction.h"
#include "runtime/polyBcastChannels.h"
#include "runtime/polyErrors.h"
#include "runtime/globals.h"
#include "csm/oid2Url.h"

int Xaction::TheSampleDebt = 0;
int Xaction::TheCount = 0;


void Xaction::reset() {
	theConn = 0;
	theId.clear();
	theStartTime = theLifeTime = Time();
	theRepSize.reset();
	theReqSize = -1;
	theAbortSize = -1;
	theAbortCoord.reset();
	theCheckAlg.reset();
	theHttpStatus = RepHdr::scUnknown;
	theOid.reset();
	theLogCat = lgcAll;
	theState = stNone;
}

void Xaction::start(Connection *aConn) {
	Assert(aConn);
	Should(!theConn || theConn == aConn);
	theConn = aConn;

	theId.create();

	theStartTime = theConn->useStart();
	Broadcast(TheXactBegChannel, this);
}

void Xaction::finish(Error err) {
	Assert(theConn);
	theConn->theWr.stop(this);
	theConn->theRd.stop(this);
	
	theLifeTime = TheClock - theStartTime;

	if (err) {
		theConn->bad(true);
		if (needRetry()) {
			Broadcast(TheXactRetrChannel, this);
		} else {
			// low level errors should be reported when they occur
			// and propogated to us as "-1"; do not count/report them twice
			if (err != errOther) {
				if (ReportError(err) && TheOpts.theDumpFlags(dumpErr, dumpAny))
					printMsg(theConn->theRdBuf);
			}
			countFailure();
		}
	} else {
		countSuccess();
		if (TheSampleDebt) {
			TheSampleDebt--;
			logStats();
		}
		if (TheOpts.theDumpFlags(dumpSum, dumpAny)) {
			printXactLogEntry();
		}
	}

	newState(stDone);
}

void Xaction::countSuccess() {
	if (sslActive())
		TheProgress.sslSuccess();
	TheProgress.success();
	Broadcast(TheXactEndChannel, this);
}

void Xaction::countFailure() {
	// note: may be called by an Agent before the transaction started
	// if, for example, launch failed
	TheProgress.failure();
	Broadcast(TheXactErrChannel, this);
}

void Xaction::newState(State aState) {
	theState = aState;
}

// returns true iff the transaction should abort
bool Xaction::abortBeforeIo() const {
	return theAbortSize == 0;
}

// returns true iff the transaction should abort
bool Xaction::abortAfterIo(Size size) {
	if (!Should(theAbortSize != 0))
		return false; // internal error, but be robust

	if (theAbortSize < 0)
		return false; // abort is disabled

	if (size <= 0)
		return false; // xactions will handle errors and eof on their own

	if (theAbortSize <= size) {
		theAbortSize = 0;
		return true; // time to abort
	}

	theAbortSize -= size;
	return false; // not yet
}

// returns true iff finish() was called
bool Xaction::abortIo(Connection::IoMethod m, Size *size) {
	if (abortBeforeIo()) {
		abortNow();
		return true;
	}

	if (theAbortSize >= 0)
		theConn->decMaxIoSize(theAbortSize);
	const Size sz = (theConn->*m)();
	if (size)
		*size = sz;

	if (abortAfterIo(sz)) {
		abortNow();
		return true;
	}

	return false;
}

void Xaction::abortNow() {
	theConn->bad(true); // so that the manager will close
	theOid.aborted(true);
	finish(0); // abort is not an error, config asked for it
}

RndDistr *Xaction::seedOidDistr(RndDistr *raw, int globSeed) {
	if (raw) {
		const int seed = GlbPermut(theOid.hash(), globSeed);
		raw->rndGen()->seed(seed);
	}
	return raw;
}

void Xaction::logStats() {
	Assert(TheSmplOLog);
	(*TheSmplOLog) << bege(lgXactStats, logCat());
	logStats(*TheSmplOLog);
	(*TheSmplOLog) << ende;
}

void Xaction::logStats(OLog &ol) const {
	ol
		<< theId << theConn->seqId()
		<< theStartTime
		<< (int)theEnqueTime.msec()
		<< (int)theLifeTime.msec()
		<< theOid
		<< theRepSize
		<< theHttpStatus
		;
}

void Xaction::printMsg(const IOBuf &buf) const {
	printMsg(buf, buf.contSize());
}

void Xaction::printMsg(const IOBuf &buf, Size maxSize) const {
	if (buf.contSize() <= 0)
		printMsg(0, 0);
	else
		printMsg(buf.content(), maxSize);
}

inline
bool myIsPrint(char c) {
	return (c && isprint(c)) || c == '\r' || c == '\n';
}

// print buf content up to the first non-printable character,
// and no more than maxSize characters
void Xaction::printMsg(const char *buf, Size maxSize) const {
	cout << TheClock << '#';

	if (theOid) {
		Oid2Url(theOid, cout << " obj: ");
		theOid.printFlags(cout << " flags: ");
	}

	cout << " size: " << (int)theRepSize.actual() << '/' << (int)theRepSize.expected();

	if (theId)
		cout << " xact: " << theId;

	cout << endl;

	if (!buf) {
		cout << "[no data to dump]" << endl;
		return;
	}

	if (maxSize > TheOpts.theDumpSize)
		maxSize = TheOpts.theDumpSize;

	const char *p = buf;
	while ((p-buf) < maxSize && myIsPrint(*p)) ++p;

	if (p > buf) {
		cout.write(buf, p - buf);
		cout << endl;
	}
}

void Xaction::printXactLogEntry() const {
	// XXX: should dump to a file, not cout and use squid or common format+?
	cout << "xact: " << TheClock << ' ';
	cout << ' ' << theLifeTime.msec();
	cout << ' ' << ((Xaction*)this)->owner()->host(); // XXX: ugly cast
	cout << " -/" << theHttpStatus;
	cout << ' ' << (int)theRepSize.actual();
	cout << " -";

	if (theOid)
		Oid2Url(theOid, cout << ' ');
	else
		cout << " -";

	if (theCheckAlg.sum().set())
		theCheckAlg.sum().print(cout << ' ');
	else
		cout << " -";

	cout << endl;
}


syntax highlighted by Code2HTML, v. 0.9.1