/* Web Polygraph       http://www.web-polygraph.org/
 * (C) 2003-2006 The Measurement Factory
 * Licensed under the Apache License, Version 2.0 */

#include "base/polygraph.h"

#include "runtime/globals.h"
#include "client/CltXact.h"
#include "client/PipelinedCxm.h"


PipelinedCxm::PipelinedCxm() {
	reset();
}

void PipelinedCxm::reset() {
	theWrSize = 0;
	theConn = 0;
	CltXactMgr::reset();
}

bool PipelinedCxm::idle() const {
	return theFillers.empty() && theWriters.empty() && theReaders.empty();
}

Connection *PipelinedCxm::conn() {
	Assert(theConn);
	return theConn;
}

void PipelinedCxm::assumeReadControl(CltXact *x, CltXactMgr *oldMgr) {
	theConn = x->conn();
	theConn->theRd.changeUser(oldMgr, this);
	theReaders.enqueue(x);
}

void PipelinedCxm::control(CltXact *x) {
	theFillers.enqueue(x);
	if (!conn()->theWr.theReserv)
		conn()->theWr.start(this);
}

void PipelinedCxm::rewind(CltXact *) {
	// CONNECT requests should not be pipelined
	Assert(false);
}

void PipelinedCxm::noteAbort(CltXact *x) {
	abortLines(x);
	conn()->theWr.stop(this);
	conn()->theRd.stop(this);
}

// we assume that done transactions are all readers
void PipelinedCxm::noteDone(CltXact *x) {
	Assert(theReaders.count() > 0);
	theReaders.dequeue(x);

	if (theReaders.count() == 0)
		conn()->theRd.stop(this);
	else
		kickNextRead(); // kick the next one
}

void PipelinedCxm::noteLastXaction(CltXact *x) {
	// for simplicity, assume that the current reader found the end
	if (!Should(theReaders.count() > 0 && x == theReaders.begin()))
		return;
	abortLines(x); // deletes x so we reinsert it
	theReaders.enqueue(x);
	conn()->theWr.stop(this);
}



void PipelinedCxm::noteReadReady(int) {
	Assert(theReaders.count() > 0);
	// will call us back
	theReaders.begin()->controlledMasterRead();
}

void PipelinedCxm::noteWriteReady(int) {
	CltXact *next = 0;
	for (CltXact *x = theFillers.begin(); x != theFillers.end() && !conn()->theWrBuf.full(); x = next) {
		bool needMoreFill = false;
		if (!x->controlledFill(needMoreFill))
			return;
		if (needMoreFill)
			break;
		next = theFillers.next(x);
		theFillers.dequeue(x);
		theWriters.enqueue(x);
	}

	// write once
	if (theWriters.count() > 0) {
		if (!theWriters.begin()->controlledMasterWrite(theWrSize))
			return;
	}

	// one or more transactions may move on after a write
	for (CltXact *w = theWriters.begin(); w != theWriters.end() && theWrSize > 0; w = next) {
		bool needMoreWrite = false;
		if (!w->controlledPostWrite(theWrSize, needMoreWrite))
			return;

		if (needMoreWrite) {
			if (!conn()->theWr.theReserv)
				conn()->theWr.start(this);
			return;
		}

		next = theWriters.next(w);

		theWriters.dequeue(w);
		if (theFillers.count() == 0 && theWriters.count() == 0)
			conn()->theWr.stop(this);

		prepReading(w);
		if (theReaders.count() == 1)
			kickNextRead();
	}
}

void PipelinedCxm::kickNextRead() {
	if (!theReaders.empty() && conn()->theRdBuf.contSize() > 0) {
		CltXact *x = theReaders.begin();
		x->controlledPostRead(); // will call us back
	}
}

void PipelinedCxm::prepReading(CltXact *x) {
	theReaders.enqueue(x);
	if (!conn()->theRd.theReserv) {
		conn()->theRd.start(this);
		TheFileScanner->setPriority(conn()->sock().fd(), fsupBestEffort);
	}
}

void PipelinedCxm::abortLine(Line &line, CltXact *cause) {
	while (!line.empty()) {
		CltXact *x = line.begin();
		if (x != cause)
			x->controlledAbort();
		line.dequeue(x); // including cause
	}
}

void PipelinedCxm::abortLines(CltXact *cause) {
	abortLine(theFillers, cause);
	abortLine(theWriters, cause);
	abortLine(theReaders, cause);
}


syntax highlighted by Code2HTML, v. 0.9.1