/* Web Polygraph http://www.web-polygraph.org/
* (C) 2003-2006 The Measurement Factory
* Licensed under the Apache License, Version 2.0 */
#include "base/polygraph.h"
#include <limits.h>
#include "xstd/h/sstream.h"
#include "xstd/Clock.h"
#include "xstd/Rnd.h"
#include "xstd/Checksum.h"
#include "xstd/gadgets.h"
#include "base/RndPermut.h"
#include "base/ObjId.h"
#include "base/StatIntvlRec.h"
#include "base/polyLogCats.h"
#include "runtime/Connection.h"
#include "runtime/AddrMap.h"
#include "runtime/HostMap.h"
#include "runtime/PubWorld.h"
#include "runtime/HttpDate.h"
#include "runtime/ErrorMgr.h"
#include "runtime/SharedOpts.h"
#include "runtime/PopModel.h"
#include "runtime/StatPhaseSync.h"
#include "runtime/LogComment.h"
#include "runtime/httpText.h"
#include "runtime/polyBcastChannels.h"
#include "runtime/polyErrors.h"
#include "runtime/globals.h"
#include "csm/ContentMgr.h"
#include "csm/ContentCfg.h"
#include "csm/BodyIter.h"
#include "csm/ObjLifeCycle.h"
#include "csm/oid2Url.h"
#include "server/Server.h"
#include "server/SrvCfg.h"
#include "server/SrvXact.h"
#include "server/SrvOpts.h"
void SrvXact::reset() {
Xaction::reset();
theOwner = 0;
theContentCfg = 0;
theBodyIter = 0;
theReqHdr.reset();
theHttpVersion.reset();
theTimes.reset();
theRepHdrSize = -1;
theConsumedSize = 0;
theLogCat = lgcSrvSide;
}
Agent *SrvXact::owner() {
return theOwner;
}
void SrvXact::exec(Server *anOwner, Connection *aConn, Time delay) {
Assert(anOwner);
theOwner = anOwner;
theConn = aConn;
theHttpVersion = theOwner->httpVersion(); // may be downgraded later
if (delay > 0)
sleepFor(delay);
else
doStart();
}
void SrvXact::doStart() {
start(theConn);
newState(stHdrWaiting);
Assert(!theConn->theRd.theReserv); // cleared in conn mgr
// note: we assume that conn mgr read() on first noteReadReady()
noteDataReady(); // push state machine forward
}
void SrvXact::finish(Error err) {
Xaction::finish(err);
if (theBodyIter)
theContentCfg->putBodyIter(theBodyIter);
theOwner->noteXactDone(this);
}
void SrvXact::wakeUp(const Alarm &a) {
AlarmUser::wakeUp(a);
doStart();
}
void SrvXact::noteReadReady(int) {
if (abortIo(&Connection::read))
return;
if (theConn->bad())
finish(errOther);
else
noteDataReady();
}
void SrvXact::noteDataReady() {
if (theState == stHdrWaiting) {
if (theReqHdr.parse(theConn->theRdBuf.content(), theConn->theRdBuf.contSize())) {
if (const Error err = interpretHeader()) {
finish(err);
return;
}
newState(stBodyWaiting); // if any
} else
if (theConn->theRdBuf.full()) { // header too big
finish(errHugeHdr);
return;
} else
if (theConn->atEof()) { // premature end of headers
finish(errPrematureEoh);
return;
}
}
if (theState == stBodyWaiting) {
const Size unconsumed = theReqSize - theConsumedSize;
consume(Min(unconsumed, theConn->theRdBuf.contSize()));
theConn->theRdBuf.pack();
if (theConsumedSize == theReqSize) { // got everything expected
theConn->theRd.stop(this);
newState(stSpaceWaiting);
} else
if (theConn->atEof()) { // premature eof
if (cfgAbortedReq()) {
theOid.aborted(true);
finish(0); // not an error, configuration told client to abort
return;
} else {
finish(errPrematureEof);
return;
}
}
}
if (theState == stHdrWaiting || theState == stBodyWaiting) {
if (!theConn->theRd.theReserv) // need to read more
theConn->theRd.start(this);
} else
if (theState == stSpaceWaiting) {
if (!theConn->theWr.theReserv) // need to write more
theConn->theWr.start(this);
}
}
void SrvXact::consume(Size size) {
theConn->theRdBuf.consumed(size);
theConsumedSize += size;
}
Error SrvXact::interpretHeader() {
// XXX: pxy server cannot support ignoreUrls; see Server::hostIdx
bool ignoreUrls =
theReqHdr.isHealthCheck ||
(TheSrvOpts.ignoreUrls && theOwner->hostIdx() >= 0);
if (ignoreUrls)
overwriteUrl();
else
if (const String &url = theReqHdr.theUri.oid.foreignUrl())
grokForeignUrl(url, ignoreUrls);
// XXX: report healthchecks once in a while and collect stats
if (theReqHdr.isHealthCheck) {
static bool didOnce = false;
if (!didOnce) {
Comment(6) << "fyi: first health check received from " << theConn->raddr() << ':' << endc;
printMsg(theConn->theRdBuf, theReqHdr.theHdrSize);
didOnce = true;
}
}
// dump request header
static int reqCount = 0;
if (!reqCount++ || TheOpts.theDumpFlags(dumpReq, dumpHdr))
printMsg(theConn->theRdBuf, theReqHdr.theHdrSize);
theOid = theReqHdr.theUri.oid;
if (theOid.foreignUrl())
return errForeignUrl;
// downgrade HTTP version if needed
if (theReqHdr.theHttpVersion < theHttpVersion)
theHttpVersion = theReqHdr.theHttpVersion;
// calculate request size
if (theReqHdr.expectBody()) {
if (theReqHdr.theContSize < 0)
return errReqBodyButNoCLen;
theReqSize = theReqHdr.theHdrSize + theReqHdr.theContSize;
} else {
if (theReqHdr.theContSize >= 0)
return errUnexpectedCLen;
theReqSize = theReqHdr.theHdrSize;
}
if (!ignoreUrls) {
const NetAddr &host = theReqHdr.theUri.host;
if (!host.knownAddr())
return errNoHostName;
if (const Error err = setViserv(host))
return err; // setViserv called finish
if (theOid.target() < 0) {
// currently, servers do not use target but we complain
// do identify prefetch requests and such
if (!theReqHdr.theTarget && ReportError(errNoTarget) &&
TheOpts.theDumpFlags(dumpErr, dumpAny))
printMsg(theConn->theRdBuf, theReqHdr.theHdrSize);
if (const Error err = setTarget(theReqHdr.theTarget))
return err; // setTarget called finish
Assert(theOid.target() >= 0);
}
if (const Error err = checkUri()) // must be done before we consume()
return err;
}
// note: clt and srv code assume we consume only after parse!
consume(theReqHdr.theHdrSize);
theConn->theRdBuf.pack();
// update history for this server
if (theOid.viserv() >= 0 && theReqHdr.theLocWorld)
updatePubWorld(theReqHdr.theLocWorld);
// update client group info
if (theReqHdr.thePhaseSyncPos >= 0 && theReqHdr.theGroupId)
TheStatPhaseSync.notePhaseSync(theReqHdr.theGroupId, theReqHdr.thePhaseSyncPos);
// note if the client will close the connection
theConn->lastUse(!theReqHdr.persistentConnection());
theOid.reload(!theReqHdr.isCachable);
// get content specs for this reply
Assert(theOid.type() > 0);
theContentCfg = TheContentMgr.get(theOid.type());
theRepHdrSize = -1; // so that we know that we need to make a reply
return 0;
}
void SrvXact::overwriteUrl() {
// XXX: theOid.visName() will not be set!
// we have to ignore host settings;
// should we create our own worlds??
static RndGen rng;
theReqHdr.theUri.oid.world().create();
theReqHdr.theUri.oid.target(theOwner->hostIdx());
theReqHdr.theUri.oid.type(1);
theReqHdr.theUri.oid.name(rng.ltrial() | 1);
theReqHdr.theUri.oid.foreignSrc(false);
theReqHdr.theUri.oid.foreignUrl(0);
}
void SrvXact::noteWriteReady(int) {
Assert(theState == stSpaceWaiting);
WrBuf &buf = theConn->theWrBuf;
if (theRepHdrSize < 0)
makeRep(buf);
Assert(theRepSize.actual() > 0 || buf.contSize() > 0);
// keep the write buffer full
if (theBodyIter)
theBodyIter->pour();
Size sz;
if (abortIo(&Connection::write, &sz))
return;
if (theConn->bad()) {
finish(errOther);
return;
}
Assert(sz >= 0);
theRepSize.got(sz);
if (theRepSize.gotAll()) {
Should(!theRepSize.expectToGetLess());
finish(0);
return;
}
Assert(theRepSize.expectToGetMore());
if (!theConn->theWr.theReserv)
theConn->theWr.start(this);
}
// decide what kind of reply to build
void SrvXact::makeRep(WrBuf &buf) {
Assert(theContentCfg);
const bool acceptableCoding =
theContentCfg->calcContentCoding(theOid, theReqHdr);
// timestamps, cachability, aborts may be used for all types of replies
theContentCfg->calcTimes(theOid, theTimes);
theOid.cachable(theContentCfg->calcCachability(theOid));
theOwner->cfg()->selectAbortCoord(theAbortCoord);
theOwner->selectRepType(theOid);
const char *repStart = buf.space();
ofixedstream os(buf.space(), buf.spaceSize());
if (!acceptableCoding)
make406NotAcceptable(os);
else
if (shouldMake302Found() && make302Found(os))
; // nothing to be done here
else
if (shouldMake304NotMod())
make304NotMod(os);
else
make200OK(os);
buf.appended(Size(os.tellp()));
Assert(theRepHdrSize > 0 && theRepSize.expected() >= theRepHdrSize);
theAbortSize = theAbortCoord.pos(theRepHdrSize, theRepSize.expected()-theRepHdrSize);
// dump reply header
static int respCount = 0;
if (!respCount++ || TheOpts.theDumpFlags(dumpRep, dumpHdr))
printMsg(repStart, theRepHdrSize);
}
void SrvXact::make406NotAcceptable(ostream &os) {
ReportError(errNoAcceptableContentCoding);
theHttpStatus = RepHdr::sc406_NotAcceptable;
putResponseLine(os, rls406NotAcceptable);
putStdFields(os);
if (theContentCfg->multipleContentCodings())
os << hfVaryAcceptEncoding;
const Size clen = text406NotAcceptable.len();
os << hfpContLength << (int)clen << crlf;
putXFields(os);
os << crlf; // end-of-headers
theRepHdrSize = os.tellp();
// text for humans
os << text406NotAcceptable;
theRepSize.expect(theRepHdrSize + clen);
}
bool SrvXact::shouldMake302Found() const {
return theOid.repToRedir();
}
bool SrvXact::canMake302Found(ObjId &oid) const {
Assert(theOwner->popModel());
// at this time redirect to the same server only
if (theOid.viserv() < 0)
return false;
// misconfiguration
if (!theOwner->popModel())
return false;
PubWorld *pubWorld = TheHostMap->findPubWorldAt(theOid.viserv());
if (!pubWorld || !pubWorld->canRepeat())
return false;
oid = theOid; // set viserv and such
oid.name(-1);
oid.type(-1); // overwrite name and type
pubWorld->repeat(oid, theOwner->popModel());
oid.type(Oid2ContType(theOid));
return true;
}
bool SrvXact::make302Found(ostream &os) {
ObjId newOid;
// make sure we have an object to redirect to
if (!canMake302Found(newOid)) {
ReportError(errMake302Found);
return false;
}
theHttpStatus = RepHdr::sc302_Found;
putResponseLine(os, rls302Found);
putStdFields(os);
os << hfpLocation;
const Size urlStart = Size(os.tellp());
Oid2Url(newOid, os);
const Size urlLen = Size(os.tellp()) - urlStart;
os << crlf;
const Size clen = urlLen + Size(text302Found.len());
os << hfpContLength << (int)clen << crlf;
putXFields(os);
os << crlf; // end-of-headers
// we determine the type of IMS request later
theRepHdrSize = os.tellp();
// text for humans
Oid2Url(newOid, os << text302Found);
theRepSize.expect(theRepHdrSize + clen);
return true;
}
bool SrvXact::shouldMake304NotMod() const {
return
theReqHdr.theIms >= 0 && theTimes.lmt() >= 0 &&
theTimes.lmt() <= theReqHdr.theIms;
}
void SrvXact::make304NotMod(ostream &os) {
theHttpStatus = RepHdr::sc304_NotModified;
putResponseLine(os, rls304NotModified);
putStdFields(os);
os << crlf; // end-of-headers
theRepHdrSize = os.tellp();
theRepSize.expect(theRepHdrSize);
theOid.ims304(true);
}
// build headers for a happy "200 OK" reply
void SrvXact::make200OK(ostream &os) {
theHttpStatus = RepHdr::sc200_OK;
theBodyIter = theContentCfg->getBodyIter(theOid);
putResponseLine(os, rls200Ok);
put200OkHead(os);
os << crlf; // end-of-headers
theRepHdrSize = os.tellp();
if (theOid.head()) {
// do not send message body for HEAD
theRepSize.expect(theRepHdrSize);
theContentCfg->putBodyIter(theBodyIter);
theBodyIter = 0;
} else {
theRepSize.expect(theRepHdrSize + theBodyIter->contentSize());
theBodyIter->start(&theConn->theWrBuf);
}
// note: non-200 and non-304 responses to IMS requests not covered yet
theOid.ims200(theReqHdr.theIms >= 0);
}
void SrvXact::putResponseLine(ostream &os, const String &suffix) {
if (theHttpVersion <= HttpVersion(1,0))
os << protoHttp1p0;
else
os << protoHttp1p1;
os << suffix;
}
void SrvXact::put200OkHead(ostream &os) {
/* it is "good practice" to send general-header fields first,
* followed by request-header or response-header fields, and
* ending with the entity-header fields. */
/* general-header fields */
os << (theOid.cachable() ? hfCcCachable : hfCcUncachable);
putStdFields(os);
/* other entity-header fields */
if (theOid.cachable() && theTimes.showLmt())
HttpDatePrint(os << hfpLmt, theTimes.lmt()) << crlf;
Assert(theBodyIter);
const Size clen = theBodyIter->contentSize();
Assert(clen.known());
os << hfpContLength << (int)clen << crlf;
if (theContentCfg->theMimeType)
os << hfpContType << theContentCfg->theMimeType << crlf;
if (theOid.gzipContent())
os << hfGzipContentEncoding;
if (theContentCfg->multipleContentCodings())
os << hfVaryAcceptEncoding;
if (theContentCfg->calcChecksumNeed(theOid))
putChecksum(os);
if (theOwner->isCookieSender)
putCookies(os);
/* extention-header fields (they are entity-header fields as well) */
putXFields(os);
}
// put well-known fields acceptable for both 304 and 200 replies
void SrvXact::putStdFields(ostream &os) const {
// general-header fields
HttpDatePrint(os << hfpDate) << crlf;
// persistency indication depends on HTTP version
if (theHttpVersion <= HttpVersion(1,0)) {
if (theConn->reusable())
os << hfConnAliveOrg;
} else {
if (!theConn->reusable())
os << hfConnCloseOrg;
}
// response-header fields (none)
// entity-header fields
if (theOid.cachable() && theTimes.knownExp())
HttpDatePrint(os << hfpExpires, theTimes.exp()) << crlf;
}
// put extension header fields acceptable for both 302 and 200 replies
void SrvXact::putXFields(ostream &os) const {
// put group ids with source/target ids on one line?
os << hfpXTarget << theOwner->host() << crlf;
if (theReqHdr.theXactId)
os << hfpXXact
<< TheGroupId
<< ' ' << theReqHdr.theXactId.genMutant()
<< crlf;
if (theOid.viserv() >= 0 && theReqHdr.theRemWorld)
putRemWorld(os, theReqHdr.theRemWorld);
os << hfpXAbort << ' ' << theAbortCoord.whether()
<< ' ' << theAbortCoord.where() << crlf;
os << hfpXPhaseSyncPos << TheStatPhaseSync.phaseSyncPos() << crlf;
}
// build and put checksum header field
void SrvXact::putChecksum(ostream &os) {
BodyIter *i = theContentCfg->getBodyIter(theOid);
WrBuf buf;
i->start(&buf);
while (*i) {
i->pour();
theCheckAlg.update(buf.content(), buf.contSize());
buf.reset();
}
theCheckAlg.final();
theContentCfg->putBodyIter(i);
os << hfpContMd5;
PrintBase64(os, theCheckAlg.sum().image(), theCheckAlg.sum().size());
os << crlf;
}
void SrvXact::putCookies(ostream &os) {
SrvCfg *cfg = theOwner->cfg();
const int oidSeed = theOid.hash();
// check wether we should send (set) cookies with this response
const int seedSend = GlbPermut(oidSeed, rndCookieSend);
RndGen rng(seedSend);
if (!rng.event(cfg->theCookieSendProb))
return;
// calculate how many cookies we should send
RndDistr *distrCount = seedOidDistr(cfg->theCookieCounts, rndCookieCount);
const int count = (int)MiniMax(1.0, distrCount->trial(), (double)INT_MAX);
RndDistr *sizeDistr = seedOidDistr(cfg->theCookieSizes, rndCookieSize);
Size accSize = 0;
for (int i = 0; i < count; ++i) {
static const String cookieValuePfx = "sess";
static const String cookieValueSfx = "\"";
os << hfpSetCookie << cookieValuePfx << i << "=\"";
WrBuf &buf = theConn->theWrBuf;
const Size cookieSize = (int)MiniMax(0.0, sizeDistr->trial(), (double)INT_MAX);
const Size cookieContentOff = IOBuf::RandomOffset(oidSeed, accSize);
const Size usedSize = (streamoff)os.tellp();
const Size spaceRemaining = buf.spaceSize() - usedSize;
const bool fit = cookieSize + Size(cookieValueSfx.len() + 2) <=
spaceRemaining;
if (!fit && ReportError(errCookiesDontFit)) {
Comment << "cookie size: " << cookieSize <<
" cookies buffered: " << i << '/' << count << " or " <<
accSize << "; space left: " << spaceRemaining << endc;
}
// yes, make buffer (ostream) full
IOBuf::RandomFill(os, cookieContentOff, cookieSize);
accSize += cookieSize;
os << cookieValueSfx << crlf;
if (!fit)
break;
}
}
void SrvXact::grokForeignUrl(const String &url, bool &ignoreUrls) {
TheEmbedStats.foreignUrlRequested++;
if (url.cmp("/pg/embed/", 10) == 0) {
const int commaPos = url.find(',');
ObjId &foid = theReqHdr.theUri.oid;
// use url trailer (as is, base64 encoded, 24 bytes?) for seeds
if (commaPos != String::npos) {
const int seedSize = 3*sizeof(int);
const Area &worldStr = url.area(commaPos+1, seedSize);
const Area &nameStr = url.area(commaPos+1 + seedSize, seedSize);
foid.world() = worldStr.size() >= seedSize ?
UniqId::FromStr(worldStr) : UniqId::Create();
static RndGen rng;
if (nameStr.size() >= seedSize) {
int seed[4] = { 0, 0, 0, 0 };
memcpy(&seed, nameStr.data(), Min((int)sizeof(seed), nameStr.size()));
rng.seed(LclPermut(seed[0] + seed[2], seed[1] + seed[3]));
}
foid.name(rng.ltrial() | 1);
}
const Area &category = url.area(10, commaPos - 10);
if (theOwner->cfg()->setEmbedContType(foid, category)) {
foid.target(theOwner->hostIdx());
foid.foreignSrc(false);
foid.foreignUrl(String());
} else {
static int count = 0; if (++count <= 100) { Comment(5) << "error: bad content type in URL: " << endc; printMsg(theConn->theRdBuf, theReqHdr.theHdrSize); }
ignoreUrls = true;
overwriteUrl(); // or we would not accept it
}
// TheEmbedStats.scriptSeen++;
} else
if (TheOpts.acceptForeignMsgs) {
static bool informed = false;
if (!informed) {
Comment(5) << "fyi: received first foreign request URL: " << endc;
printMsg(theConn->theRdBuf, theReqHdr.theHdrSize);
informed = true;
}
ignoreUrls = true;
overwriteUrl(); // or we would not accept it
}
}
Error SrvXact::setViserv(const NetAddr &name) {
int viserv = -1;
HostCfg *host = TheHostMap->find(name, viserv);
if (!host) {
host = TheHostMap->find(theOwner->host(), viserv);
const bool salvaged = Should(host != 0);
if (ReportError(errForeignHostName)) {
Comment << theOwner->host() << " server received request for " <<
name << " which is not an address of any visible server, " <<
(salvaged ? "salvaged" : "aborting transaction") <<
endc;
}
if (!salvaged)
return errForeignHostName;
}
if (!host->thePubWorld)
PubWorld::Add(host, new PubWorld());
theOid.viserv(viserv);
return 0;
}
Error SrvXact::setTarget(const NetAddr &) {
// XXX: the check below should know about DNS RR, etc.
//if (target != theOwner->host())
// ReportError(errMisdirRequest);
theOid.target(theOwner->hostIdx());
return 0;
}
// merge with CltXact::cfgAbortedReply()?
bool SrvXact::cfgAbortedReq() const {
if (!theReqHdr.theAbortCoord)
return false;
const Size abSz = theReqHdr.theAbortCoord.pos(theReqHdr.theHdrSize, theReqSize-theReqHdr.theHdrSize);
if (!Should(abSz >= 0))
return false;
if (abSz > theConsumedSize)
return false;
return true;
}
// check that requested URI belongs to our server
Error SrvXact::checkUri() {
const int pathLen = theReqHdr.theUri.pathLen;
const bool sane = Should(pathLen >= 0) &&
Should(pathLen <= theConn->theRdBuf.contSize()) &&
Should(theReqHdr.theUri.pathBuf);
if (!sane) { // no point in high-level checks
Comment << "internal error: inconsistent request, salvaged" << endc;
return errOther;
}
// first check if the content type belongs to us
if (!theOwner->cfg()->hasContType(theOid.type())) {
if (ReportError(errMisdirRequest)) {
Comment << "host " << theOwner->host()
<< " does not have content type implied by request URI: ";
Comment.write(theReqHdr.theUri.pathBuf, pathLen);
Comment << endc;
}
return errOther;
}
// we cannot compare Oid2UrlPath() with pathBuf of special URLs
if (pathLen >= 4 && strncmp("/pg/", theReqHdr.theUri.pathBuf, 4) == 0)
return 0;
ostringstream os; // XXX: expensive mallocs
Oid2UrlPath(theOid, os);
os << ends;
const char *buf = os.str().c_str();
const bool res = (pathLen+1 == (int)os.tellp()) &&
strncmp(buf, theReqHdr.theUri.pathBuf, pathLen) == 0;
streamFreeze(os, false);
if (!res) {
if (ReportError(errMisdirRequest)) {
Comment << "host: " << theOwner->host() << endc;
(Comment << "received: ").write(theReqHdr.theUri.pathBuf, pathLen);
Comment << endc;
Oid2UrlPath(theOid, Comment << "expected: ");
Comment << endc;
}
return errOther;
}
return 0;
}
void SrvXact::updatePubWorld(const ObjWorld &newSlice) {
PubWorld &world = *TheHostMap->findPubWorldAt(theOid.viserv());
int sliceIdx;
if (world.find(newSlice.id(), sliceIdx))
world.sliceAt(sliceIdx).update(newSlice);
else
world.add(newSlice);
}
void SrvXact::putRemWorld(ostream &os, const ObjWorld &oldSlice) const {
PubWorld &world = *TheHostMap->findPubWorldAt(theOid.viserv());
int sliceIdx;
if (world.find(oldSlice.id(), sliceIdx)) {
if (const PubWorldSlice *slice = world.newerSlice(oldSlice, sliceIdx))
os << hfpXRemWorld << *slice << crlf;
}
// else client-side sent remote world ID before the same public world ID
}
void SrvXact::logStats(OLog &ol) const {
Xaction::logStats(ol);
ol << theOwner->seqvId();
ol << (int)theReqHdr.theIms.sec();
}
syntax highlighted by Code2HTML, v. 0.9.1