/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "runtime/globals.h" #include "client/CltXact.h" #include "client/PipelinedCxm.h" PipelinedCxm::PipelinedCxm() { reset(); } void PipelinedCxm::reset() { theWrSize = 0; theConn = 0; CltXactMgr::reset(); } bool PipelinedCxm::idle() const { return theFillers.empty() && theWriters.empty() && theReaders.empty(); } Connection *PipelinedCxm::conn() { Assert(theConn); return theConn; } void PipelinedCxm::assumeReadControl(CltXact *x, CltXactMgr *oldMgr) { theConn = x->conn(); theConn->theRd.changeUser(oldMgr, this); theReaders.enqueue(x); } void PipelinedCxm::control(CltXact *x) { theFillers.enqueue(x); if (!conn()->theWr.theReserv) conn()->theWr.start(this); } void PipelinedCxm::rewind(CltXact *) { // CONNECT requests should not be pipelined Assert(false); } void PipelinedCxm::noteAbort(CltXact *x) { abortLines(x); conn()->theWr.stop(this); conn()->theRd.stop(this); } // we assume that done transactions are all readers void PipelinedCxm::noteDone(CltXact *x) { Assert(theReaders.count() > 0); theReaders.dequeue(x); if (theReaders.count() == 0) conn()->theRd.stop(this); else kickNextRead(); // kick the next one } void PipelinedCxm::noteLastXaction(CltXact *x) { // for simplicity, assume that the current reader found the end if (!Should(theReaders.count() > 0 && x == theReaders.begin())) return; abortLines(x); // deletes x so we reinsert it theReaders.enqueue(x); conn()->theWr.stop(this); } void PipelinedCxm::noteReadReady(int) { Assert(theReaders.count() > 0); // will call us back theReaders.begin()->controlledMasterRead(); } void PipelinedCxm::noteWriteReady(int) { CltXact *next = 0; for (CltXact *x = theFillers.begin(); x != theFillers.end() && !conn()->theWrBuf.full(); x = next) { bool needMoreFill = false; if (!x->controlledFill(needMoreFill)) return; if (needMoreFill) break; next = theFillers.next(x); theFillers.dequeue(x); theWriters.enqueue(x); } // write once if (theWriters.count() > 0) { if (!theWriters.begin()->controlledMasterWrite(theWrSize)) return; } // one or more transactions may move on after a write for (CltXact *w = theWriters.begin(); w != theWriters.end() && theWrSize > 0; w = next) { bool needMoreWrite = false; if (!w->controlledPostWrite(theWrSize, needMoreWrite)) return; if (needMoreWrite) { if (!conn()->theWr.theReserv) conn()->theWr.start(this); return; } next = theWriters.next(w); theWriters.dequeue(w); if (theFillers.count() == 0 && theWriters.count() == 0) conn()->theWr.stop(this); prepReading(w); if (theReaders.count() == 1) kickNextRead(); } } void PipelinedCxm::kickNextRead() { if (!theReaders.empty() && conn()->theRdBuf.contSize() > 0) { CltXact *x = theReaders.begin(); x->controlledPostRead(); // will call us back } } void PipelinedCxm::prepReading(CltXact *x) { theReaders.enqueue(x); if (!conn()->theRd.theReserv) { conn()->theRd.start(this); TheFileScanner->setPriority(conn()->sock().fd(), fsupBestEffort); } } void PipelinedCxm::abortLine(Line &line, CltXact *cause) { while (!line.empty()) { CltXact *x = line.begin(); if (x != cause) x->controlledAbort(); line.dequeue(x); // including cause } } void PipelinedCxm::abortLines(CltXact *cause) { abortLine(theFillers, cause); abortLine(theWriters, cause); abortLine(theReaders, cause); }