/* Web Polygraph http://www.web-polygraph.org/
* (C) 2003-2006 The Measurement Factory
* Licensed under the Apache License, Version 2.0 */
#include "base/polygraph.h"
#include "runtime/Connection.h"
#include "base/ObjId.h"
#include "runtime/HttpDate.h"
#include "runtime/httpText.h"
#include "cache/DistrPoint.h"
#include "csm/oid2Url.h"
#include "client/Client.h"
#include "proxy/PxyCltXact.h"
void PxyCltXact::reset() {
CltXact::reset();
CacheWriter::reset();
}
Cache *PxyCltXact::cache() {
return theOwner->cache();
}
void PxyCltXact::cacheDistrPoint(DistrPoint *dp) {
Assert(!theDistrPoint);
theDistrPoint = dp;
theDistrPoint->addWriter(this);
}
const RepHdr *PxyCltXact::origRepHdrs() const {
return &theRepHdr;
}
const UniqId &PxyCltXact::reqId() const {
if (!theDistrPoint || !theDistrPoint->reader())
return CltXact::reqId(); // no proxying, standard client request?
Assert(theDistrPoint->reader()->origReqHdrs());
return theDistrPoint->reader()->origReqHdrs()->theXactId;
}
void PxyCltXact::noteAbort() {
if (theDistrPoint)
theDistrPoint->delWriter(this);
CltXact::noteAbort();
}
// repeats original headers
void PxyCltXact::makeEndToEndHdrs(ostream &os) {
if (!theDistrPoint || !theDistrPoint->reader()) {
CltXact::makeEndToEndHdrs(os); // no proxying, standard client request?
return;
}
Assert(theDistrPoint->reader()->origReqHdrs());
const ReqHdr &req = *theDistrPoint->reader()->origReqHdrs();
Oid2UrlPath(theOid, os);
os << rlsHttp1p1; // end-of-request-line
/* request-header fields */
os
<< hfAccept
<< hfpHost << req.theUri.host.addrA() << ':' << req.theUri.host.port() << crlf
;
if (req.theIms >= 0) // check: httpHdrs should set oid.ims()
HttpDatePrint(os << hfpIMS, req.theIms) << crlf;
if (req.theUri.oid.reload())
os << hfReload;
/* entity-header fields */
os << hfpXXact << req.theGroupId << ' ' << req.theXactId << crlf;
// send public world info
if (req.theLocWorld)
os << hfpXLocWorld << req.theLocWorld << crlf;
if (req.theRemWorld)
os << hfpXRemWorld << req.theRemWorld << crlf;
os << hfpXTarget << req.theTarget << crlf;
// report our readiness to change phase if needed
if (Should(req.thePhaseSyncPos >= 0)) // XXX: remove Should!
os << hfpXPhaseSyncPos << req.thePhaseSyncPos << crlf;
}
// also called from noteCacheReady
void PxyCltXact::consume(Size size) {
Assert(theDistrPoint);
if (!theConsumedSize && size) { // just parsed the headers
RepHdr &rep = theDistrPoint->repHdr();
Assert(theRepHdr.theContSize >= 0);
// XXX: gross, should move to RepHdr?
rep.theDate = theRepHdr.theDate;
rep.theExpires = theRepHdr.theExpires;
rep.theLMT = theRepHdr.theLMT;
rep.theContSize = theRepHdr.theContSize;
rep.theTarget = theRepHdr.theTarget;
rep.theGroupId = theRepHdr.theGroupId;
rep.theXactId = theRepHdr.theXactId;
rep.thePhaseSyncPos = theRepHdr.thePhaseSyncPos;
}
CltXact::consume(size);
if (theConsumedSize) {
//cerr << here << theDistrPoint << ": ready: " << theConsumedSize - theRepHdr.theHdrSize << endl;
theDistrPoint->noteDataReady(
Max(Size(0), theConsumedSize - theRepHdr.theHdrSize));
}
}
void PxyCltXact::firstHandSync() {
}
void PxyCltXact::noteReaderLeft() {
// here is the place to implement quick_abort
}
void PxyCltXact::newState(State aState) {
CltXact::newState(aState);
if (theState == stHdrWaiting) {
// on a miss, dp could be already set in cacheDistrPoint
if (!theDistrPoint) {
if (CacheEntry *e = cache()->cached(theOid))
cache()->purgeEntry(e); // we are going to update it
theDistrPoint = cache()->addWriter(theOid, this);
Assert(theDistrPoint);
}
} else
if (theState == stDone) {
if (theDistrPoint) {
CacheEntry *e = theDistrPoint->entry();
Assert(e->objSize() < 0);
// XXX: also need to purge on error
// (need a single point where errors are delivered)
if (!theRepSize.expected().known() || theRepSize.expectToGetMore()) {
if (e->cached())
cache()->purgeEntry(e);
} else {
// or should we use .expected() here?
const Size sz = theRepSize.actual() - theRepHdr.theHdrSize;
Assert(sz >= 0);
e->objSize(sz);
if (!e->cached() && theOid.cachable())
cache()->cacheEntry(e);
}
theDistrPoint->delWriter(this);
}
}
}
syntax highlighted by Code2HTML, v. 0.9.1