/* Web Polygraph http://www.web-polygraph.org/
* (C) 2003-2006 The Measurement Factory
* Licensed under the Apache License, Version 2.0 */
#include "base/polygraph.h"
#include "runtime/globals.h"
#include "client/CltXact.h"
#include "client/PipelinedCxm.h"
PipelinedCxm::PipelinedCxm() {
reset();
}
void PipelinedCxm::reset() {
theWrSize = 0;
theConn = 0;
CltXactMgr::reset();
}
bool PipelinedCxm::idle() const {
return theFillers.empty() && theWriters.empty() && theReaders.empty();
}
Connection *PipelinedCxm::conn() {
Assert(theConn);
return theConn;
}
void PipelinedCxm::assumeReadControl(CltXact *x, CltXactMgr *oldMgr) {
theConn = x->conn();
theConn->theRd.changeUser(oldMgr, this);
theReaders.enqueue(x);
}
void PipelinedCxm::control(CltXact *x) {
theFillers.enqueue(x);
if (!conn()->theWr.theReserv)
conn()->theWr.start(this);
}
void PipelinedCxm::rewind(CltXact *) {
// CONNECT requests should not be pipelined
Assert(false);
}
void PipelinedCxm::noteAbort(CltXact *x) {
abortLines(x);
conn()->theWr.stop(this);
conn()->theRd.stop(this);
}
// we assume that done transactions are all readers
void PipelinedCxm::noteDone(CltXact *x) {
Assert(theReaders.count() > 0);
theReaders.dequeue(x);
if (theReaders.count() == 0)
conn()->theRd.stop(this);
else
kickNextRead(); // kick the next one
}
void PipelinedCxm::noteLastXaction(CltXact *x) {
// for simplicity, assume that the current reader found the end
if (!Should(theReaders.count() > 0 && x == theReaders.begin()))
return;
abortLines(x); // deletes x so we reinsert it
theReaders.enqueue(x);
conn()->theWr.stop(this);
}
void PipelinedCxm::noteReadReady(int) {
Assert(theReaders.count() > 0);
// will call us back
theReaders.begin()->controlledMasterRead();
}
void PipelinedCxm::noteWriteReady(int) {
CltXact *next = 0;
for (CltXact *x = theFillers.begin(); x != theFillers.end() && !conn()->theWrBuf.full(); x = next) {
bool needMoreFill = false;
if (!x->controlledFill(needMoreFill))
return;
if (needMoreFill)
break;
next = theFillers.next(x);
theFillers.dequeue(x);
theWriters.enqueue(x);
}
// write once
if (theWriters.count() > 0) {
if (!theWriters.begin()->controlledMasterWrite(theWrSize))
return;
}
// one or more transactions may move on after a write
for (CltXact *w = theWriters.begin(); w != theWriters.end() && theWrSize > 0; w = next) {
bool needMoreWrite = false;
if (!w->controlledPostWrite(theWrSize, needMoreWrite))
return;
if (needMoreWrite) {
if (!conn()->theWr.theReserv)
conn()->theWr.start(this);
return;
}
next = theWriters.next(w);
theWriters.dequeue(w);
if (theFillers.count() == 0 && theWriters.count() == 0)
conn()->theWr.stop(this);
prepReading(w);
if (theReaders.count() == 1)
kickNextRead();
}
}
void PipelinedCxm::kickNextRead() {
if (!theReaders.empty() && conn()->theRdBuf.contSize() > 0) {
CltXact *x = theReaders.begin();
x->controlledPostRead(); // will call us back
}
}
void PipelinedCxm::prepReading(CltXact *x) {
theReaders.enqueue(x);
if (!conn()->theRd.theReserv) {
conn()->theRd.start(this);
TheFileScanner->setPriority(conn()->sock().fd(), fsupBestEffort);
}
}
void PipelinedCxm::abortLine(Line &line, CltXact *cause) {
while (!line.empty()) {
CltXact *x = line.begin();
if (x != cause)
x->controlledAbort();
line.dequeue(x); // including cause
}
}
void PipelinedCxm::abortLines(CltXact *cause) {
abortLine(theFillers, cause);
abortLine(theWriters, cause);
abortLine(theReaders, cause);
}
syntax highlighted by Code2HTML, v. 0.9.1