/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "runtime/Connection.h" #include "base/ObjId.h" #include "runtime/HttpDate.h" #include "runtime/httpText.h" #include "cache/DistrPoint.h" #include "csm/oid2Url.h" #include "client/Client.h" #include "proxy/PxyCltXact.h" void PxyCltXact::reset() { CltXact::reset(); CacheWriter::reset(); } Cache *PxyCltXact::cache() { return theOwner->cache(); } void PxyCltXact::cacheDistrPoint(DistrPoint *dp) { Assert(!theDistrPoint); theDistrPoint = dp; theDistrPoint->addWriter(this); } const RepHdr *PxyCltXact::origRepHdrs() const { return &theRepHdr; } const UniqId &PxyCltXact::reqId() const { if (!theDistrPoint || !theDistrPoint->reader()) return CltXact::reqId(); // no proxying, standard client request? Assert(theDistrPoint->reader()->origReqHdrs()); return theDistrPoint->reader()->origReqHdrs()->theXactId; } void PxyCltXact::noteAbort() { if (theDistrPoint) theDistrPoint->delWriter(this); CltXact::noteAbort(); } // repeats original headers void PxyCltXact::makeEndToEndHdrs(ostream &os) { if (!theDistrPoint || !theDistrPoint->reader()) { CltXact::makeEndToEndHdrs(os); // no proxying, standard client request? return; } Assert(theDistrPoint->reader()->origReqHdrs()); const ReqHdr &req = *theDistrPoint->reader()->origReqHdrs(); Oid2UrlPath(theOid, os); os << rlsHttp1p1; // end-of-request-line /* request-header fields */ os << hfAccept << hfpHost << req.theUri.host.addrA() << ':' << req.theUri.host.port() << crlf ; if (req.theIms >= 0) // check: httpHdrs should set oid.ims() HttpDatePrint(os << hfpIMS, req.theIms) << crlf; if (req.theUri.oid.reload()) os << hfReload; /* entity-header fields */ os << hfpXXact << req.theGroupId << ' ' << req.theXactId << crlf; // send public world info if (req.theLocWorld) os << hfpXLocWorld << req.theLocWorld << crlf; if (req.theRemWorld) os << hfpXRemWorld << req.theRemWorld << crlf; os << hfpXTarget << req.theTarget << crlf; // report our readiness to change phase if needed if (Should(req.thePhaseSyncPos >= 0)) // XXX: remove Should! os << hfpXPhaseSyncPos << req.thePhaseSyncPos << crlf; } // also called from noteCacheReady void PxyCltXact::consume(Size size) { Assert(theDistrPoint); if (!theConsumedSize && size) { // just parsed the headers RepHdr &rep = theDistrPoint->repHdr(); Assert(theRepHdr.theContSize >= 0); // XXX: gross, should move to RepHdr? rep.theDate = theRepHdr.theDate; rep.theExpires = theRepHdr.theExpires; rep.theLMT = theRepHdr.theLMT; rep.theContSize = theRepHdr.theContSize; rep.theTarget = theRepHdr.theTarget; rep.theGroupId = theRepHdr.theGroupId; rep.theXactId = theRepHdr.theXactId; rep.thePhaseSyncPos = theRepHdr.thePhaseSyncPos; } CltXact::consume(size); if (theConsumedSize) { //cerr << here << theDistrPoint << ": ready: " << theConsumedSize - theRepHdr.theHdrSize << endl; theDistrPoint->noteDataReady( Max(Size(0), theConsumedSize - theRepHdr.theHdrSize)); } } void PxyCltXact::firstHandSync() { } void PxyCltXact::noteReaderLeft() { // here is the place to implement quick_abort } void PxyCltXact::newState(State aState) { CltXact::newState(aState); if (theState == stHdrWaiting) { // on a miss, dp could be already set in cacheDistrPoint if (!theDistrPoint) { if (CacheEntry *e = cache()->cached(theOid)) cache()->purgeEntry(e); // we are going to update it theDistrPoint = cache()->addWriter(theOid, this); Assert(theDistrPoint); } } else if (theState == stDone) { if (theDistrPoint) { CacheEntry *e = theDistrPoint->entry(); Assert(e->objSize() < 0); // XXX: also need to purge on error // (need a single point where errors are delivered) if (!theRepSize.expected().known() || theRepSize.expectToGetMore()) { if (e->cached()) cache()->purgeEntry(e); } else { // or should we use .expected() here? const Size sz = theRepSize.actual() - theRepHdr.theHdrSize; Assert(sz >= 0); e->objSize(sz); if (!e->cached() && theOid.cachable()) cache()->cacheEntry(e); } theDistrPoint->delWriter(this); } } }