/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include #include "xstd/h/sstream.h" #include "xstd/Clock.h" #include "xstd/Rnd.h" #include "xstd/Checksum.h" #include "xstd/gadgets.h" #include "base/RndPermut.h" #include "base/ObjId.h" #include "base/StatIntvlRec.h" #include "base/polyLogCats.h" #include "runtime/Connection.h" #include "runtime/AddrMap.h" #include "runtime/HostMap.h" #include "runtime/PubWorld.h" #include "runtime/HttpDate.h" #include "runtime/ErrorMgr.h" #include "runtime/SharedOpts.h" #include "runtime/PopModel.h" #include "runtime/StatPhaseSync.h" #include "runtime/LogComment.h" #include "runtime/httpText.h" #include "runtime/polyBcastChannels.h" #include "runtime/polyErrors.h" #include "runtime/globals.h" #include "csm/ContentMgr.h" #include "csm/ContentCfg.h" #include "csm/BodyIter.h" #include "csm/ObjLifeCycle.h" #include "csm/oid2Url.h" #include "server/Server.h" #include "server/SrvCfg.h" #include "server/SrvXact.h" #include "server/SrvOpts.h" void SrvXact::reset() { Xaction::reset(); theOwner = 0; theContentCfg = 0; theBodyIter = 0; theReqHdr.reset(); theHttpVersion.reset(); theTimes.reset(); theRepHdrSize = -1; theConsumedSize = 0; theLogCat = lgcSrvSide; } Agent *SrvXact::owner() { return theOwner; } void SrvXact::exec(Server *anOwner, Connection *aConn, Time delay) { Assert(anOwner); theOwner = anOwner; theConn = aConn; theHttpVersion = theOwner->httpVersion(); // may be downgraded later if (delay > 0) sleepFor(delay); else doStart(); } void SrvXact::doStart() { start(theConn); newState(stHdrWaiting); Assert(!theConn->theRd.theReserv); // cleared in conn mgr // note: we assume that conn mgr read() on first noteReadReady() noteDataReady(); // push state machine forward } void SrvXact::finish(Error err) { Xaction::finish(err); if (theBodyIter) theContentCfg->putBodyIter(theBodyIter); theOwner->noteXactDone(this); } void SrvXact::wakeUp(const Alarm &a) { AlarmUser::wakeUp(a); doStart(); } void SrvXact::noteReadReady(int) { if (abortIo(&Connection::read)) return; if (theConn->bad()) finish(errOther); else noteDataReady(); } void SrvXact::noteDataReady() { if (theState == stHdrWaiting) { if (theReqHdr.parse(theConn->theRdBuf.content(), theConn->theRdBuf.contSize())) { if (const Error err = interpretHeader()) { finish(err); return; } newState(stBodyWaiting); // if any } else if (theConn->theRdBuf.full()) { // header too big finish(errHugeHdr); return; } else if (theConn->atEof()) { // premature end of headers finish(errPrematureEoh); return; } } if (theState == stBodyWaiting) { const Size unconsumed = theReqSize - theConsumedSize; consume(Min(unconsumed, theConn->theRdBuf.contSize())); theConn->theRdBuf.pack(); if (theConsumedSize == theReqSize) { // got everything expected theConn->theRd.stop(this); newState(stSpaceWaiting); } else if (theConn->atEof()) { // premature eof if (cfgAbortedReq()) { theOid.aborted(true); finish(0); // not an error, configuration told client to abort return; } else { finish(errPrematureEof); return; } } } if (theState == stHdrWaiting || theState == stBodyWaiting) { if (!theConn->theRd.theReserv) // need to read more theConn->theRd.start(this); } else if (theState == stSpaceWaiting) { if (!theConn->theWr.theReserv) // need to write more theConn->theWr.start(this); } } void SrvXact::consume(Size size) { theConn->theRdBuf.consumed(size); theConsumedSize += size; } Error SrvXact::interpretHeader() { // XXX: pxy server cannot support ignoreUrls; see Server::hostIdx bool ignoreUrls = theReqHdr.isHealthCheck || (TheSrvOpts.ignoreUrls && theOwner->hostIdx() >= 0); if (ignoreUrls) overwriteUrl(); else if (const String &url = theReqHdr.theUri.oid.foreignUrl()) grokForeignUrl(url, ignoreUrls); // XXX: report healthchecks once in a while and collect stats if (theReqHdr.isHealthCheck) { static bool didOnce = false; if (!didOnce) { Comment(6) << "fyi: first health check received from " << theConn->raddr() << ':' << endc; printMsg(theConn->theRdBuf, theReqHdr.theHdrSize); didOnce = true; } } // dump request header static int reqCount = 0; if (!reqCount++ || TheOpts.theDumpFlags(dumpReq, dumpHdr)) printMsg(theConn->theRdBuf, theReqHdr.theHdrSize); theOid = theReqHdr.theUri.oid; if (theOid.foreignUrl()) return errForeignUrl; // downgrade HTTP version if needed if (theReqHdr.theHttpVersion < theHttpVersion) theHttpVersion = theReqHdr.theHttpVersion; // calculate request size if (theReqHdr.expectBody()) { if (theReqHdr.theContSize < 0) return errReqBodyButNoCLen; theReqSize = theReqHdr.theHdrSize + theReqHdr.theContSize; } else { if (theReqHdr.theContSize >= 0) return errUnexpectedCLen; theReqSize = theReqHdr.theHdrSize; } if (!ignoreUrls) { const NetAddr &host = theReqHdr.theUri.host; if (!host.knownAddr()) return errNoHostName; if (const Error err = setViserv(host)) return err; // setViserv called finish if (theOid.target() < 0) { // currently, servers do not use target but we complain // do identify prefetch requests and such if (!theReqHdr.theTarget && ReportError(errNoTarget) && TheOpts.theDumpFlags(dumpErr, dumpAny)) printMsg(theConn->theRdBuf, theReqHdr.theHdrSize); if (const Error err = setTarget(theReqHdr.theTarget)) return err; // setTarget called finish Assert(theOid.target() >= 0); } if (const Error err = checkUri()) // must be done before we consume() return err; } // note: clt and srv code assume we consume only after parse! consume(theReqHdr.theHdrSize); theConn->theRdBuf.pack(); // update history for this server if (theOid.viserv() >= 0 && theReqHdr.theLocWorld) updatePubWorld(theReqHdr.theLocWorld); // update client group info if (theReqHdr.thePhaseSyncPos >= 0 && theReqHdr.theGroupId) TheStatPhaseSync.notePhaseSync(theReqHdr.theGroupId, theReqHdr.thePhaseSyncPos); // note if the client will close the connection theConn->lastUse(!theReqHdr.persistentConnection()); theOid.reload(!theReqHdr.isCachable); // get content specs for this reply Assert(theOid.type() > 0); theContentCfg = TheContentMgr.get(theOid.type()); theRepHdrSize = -1; // so that we know that we need to make a reply return 0; } void SrvXact::overwriteUrl() { // XXX: theOid.visName() will not be set! // we have to ignore host settings; // should we create our own worlds?? static RndGen rng; theReqHdr.theUri.oid.world().create(); theReqHdr.theUri.oid.target(theOwner->hostIdx()); theReqHdr.theUri.oid.type(1); theReqHdr.theUri.oid.name(rng.ltrial() | 1); theReqHdr.theUri.oid.foreignSrc(false); theReqHdr.theUri.oid.foreignUrl(0); } void SrvXact::noteWriteReady(int) { Assert(theState == stSpaceWaiting); WrBuf &buf = theConn->theWrBuf; if (theRepHdrSize < 0) makeRep(buf); Assert(theRepSize.actual() > 0 || buf.contSize() > 0); // keep the write buffer full if (theBodyIter) theBodyIter->pour(); Size sz; if (abortIo(&Connection::write, &sz)) return; if (theConn->bad()) { finish(errOther); return; } Assert(sz >= 0); theRepSize.got(sz); if (theRepSize.gotAll()) { Should(!theRepSize.expectToGetLess()); finish(0); return; } Assert(theRepSize.expectToGetMore()); if (!theConn->theWr.theReserv) theConn->theWr.start(this); } // decide what kind of reply to build void SrvXact::makeRep(WrBuf &buf) { Assert(theContentCfg); const bool acceptableCoding = theContentCfg->calcContentCoding(theOid, theReqHdr); // timestamps, cachability, aborts may be used for all types of replies theContentCfg->calcTimes(theOid, theTimes); theOid.cachable(theContentCfg->calcCachability(theOid)); theOwner->cfg()->selectAbortCoord(theAbortCoord); theOwner->selectRepType(theOid); const char *repStart = buf.space(); ofixedstream os(buf.space(), buf.spaceSize()); if (!acceptableCoding) make406NotAcceptable(os); else if (shouldMake302Found() && make302Found(os)) ; // nothing to be done here else if (shouldMake304NotMod()) make304NotMod(os); else make200OK(os); buf.appended(Size(os.tellp())); Assert(theRepHdrSize > 0 && theRepSize.expected() >= theRepHdrSize); theAbortSize = theAbortCoord.pos(theRepHdrSize, theRepSize.expected()-theRepHdrSize); // dump reply header static int respCount = 0; if (!respCount++ || TheOpts.theDumpFlags(dumpRep, dumpHdr)) printMsg(repStart, theRepHdrSize); } void SrvXact::make406NotAcceptable(ostream &os) { ReportError(errNoAcceptableContentCoding); theHttpStatus = RepHdr::sc406_NotAcceptable; putResponseLine(os, rls406NotAcceptable); putStdFields(os); if (theContentCfg->multipleContentCodings()) os << hfVaryAcceptEncoding; const Size clen = text406NotAcceptable.len(); os << hfpContLength << (int)clen << crlf; putXFields(os); os << crlf; // end-of-headers theRepHdrSize = os.tellp(); // text for humans os << text406NotAcceptable; theRepSize.expect(theRepHdrSize + clen); } bool SrvXact::shouldMake302Found() const { return theOid.repToRedir(); } bool SrvXact::canMake302Found(ObjId &oid) const { Assert(theOwner->popModel()); // at this time redirect to the same server only if (theOid.viserv() < 0) return false; // misconfiguration if (!theOwner->popModel()) return false; PubWorld *pubWorld = TheHostMap->findPubWorldAt(theOid.viserv()); if (!pubWorld || !pubWorld->canRepeat()) return false; oid = theOid; // set viserv and such oid.name(-1); oid.type(-1); // overwrite name and type pubWorld->repeat(oid, theOwner->popModel()); oid.type(Oid2ContType(theOid)); return true; } bool SrvXact::make302Found(ostream &os) { ObjId newOid; // make sure we have an object to redirect to if (!canMake302Found(newOid)) { ReportError(errMake302Found); return false; } theHttpStatus = RepHdr::sc302_Found; putResponseLine(os, rls302Found); putStdFields(os); os << hfpLocation; const Size urlStart = Size(os.tellp()); Oid2Url(newOid, os); const Size urlLen = Size(os.tellp()) - urlStart; os << crlf; const Size clen = urlLen + Size(text302Found.len()); os << hfpContLength << (int)clen << crlf; putXFields(os); os << crlf; // end-of-headers // we determine the type of IMS request later theRepHdrSize = os.tellp(); // text for humans Oid2Url(newOid, os << text302Found); theRepSize.expect(theRepHdrSize + clen); return true; } bool SrvXact::shouldMake304NotMod() const { return theReqHdr.theIms >= 0 && theTimes.lmt() >= 0 && theTimes.lmt() <= theReqHdr.theIms; } void SrvXact::make304NotMod(ostream &os) { theHttpStatus = RepHdr::sc304_NotModified; putResponseLine(os, rls304NotModified); putStdFields(os); os << crlf; // end-of-headers theRepHdrSize = os.tellp(); theRepSize.expect(theRepHdrSize); theOid.ims304(true); } // build headers for a happy "200 OK" reply void SrvXact::make200OK(ostream &os) { theHttpStatus = RepHdr::sc200_OK; theBodyIter = theContentCfg->getBodyIter(theOid); putResponseLine(os, rls200Ok); put200OkHead(os); os << crlf; // end-of-headers theRepHdrSize = os.tellp(); if (theOid.head()) { // do not send message body for HEAD theRepSize.expect(theRepHdrSize); theContentCfg->putBodyIter(theBodyIter); theBodyIter = 0; } else { theRepSize.expect(theRepHdrSize + theBodyIter->contentSize()); theBodyIter->start(&theConn->theWrBuf); } // note: non-200 and non-304 responses to IMS requests not covered yet theOid.ims200(theReqHdr.theIms >= 0); } void SrvXact::putResponseLine(ostream &os, const String &suffix) { if (theHttpVersion <= HttpVersion(1,0)) os << protoHttp1p0; else os << protoHttp1p1; os << suffix; } void SrvXact::put200OkHead(ostream &os) { /* it is "good practice" to send general-header fields first, * followed by request-header or response-header fields, and * ending with the entity-header fields. */ /* general-header fields */ os << (theOid.cachable() ? hfCcCachable : hfCcUncachable); putStdFields(os); /* other entity-header fields */ if (theOid.cachable() && theTimes.showLmt()) HttpDatePrint(os << hfpLmt, theTimes.lmt()) << crlf; Assert(theBodyIter); const Size clen = theBodyIter->contentSize(); Assert(clen.known()); os << hfpContLength << (int)clen << crlf; if (theContentCfg->theMimeType) os << hfpContType << theContentCfg->theMimeType << crlf; if (theOid.gzipContent()) os << hfGzipContentEncoding; if (theContentCfg->multipleContentCodings()) os << hfVaryAcceptEncoding; if (theContentCfg->calcChecksumNeed(theOid)) putChecksum(os); if (theOwner->isCookieSender) putCookies(os); /* extention-header fields (they are entity-header fields as well) */ putXFields(os); } // put well-known fields acceptable for both 304 and 200 replies void SrvXact::putStdFields(ostream &os) const { // general-header fields HttpDatePrint(os << hfpDate) << crlf; // persistency indication depends on HTTP version if (theHttpVersion <= HttpVersion(1,0)) { if (theConn->reusable()) os << hfConnAliveOrg; } else { if (!theConn->reusable()) os << hfConnCloseOrg; } // response-header fields (none) // entity-header fields if (theOid.cachable() && theTimes.knownExp()) HttpDatePrint(os << hfpExpires, theTimes.exp()) << crlf; } // put extension header fields acceptable for both 302 and 200 replies void SrvXact::putXFields(ostream &os) const { // put group ids with source/target ids on one line? os << hfpXTarget << theOwner->host() << crlf; if (theReqHdr.theXactId) os << hfpXXact << TheGroupId << ' ' << theReqHdr.theXactId.genMutant() << crlf; if (theOid.viserv() >= 0 && theReqHdr.theRemWorld) putRemWorld(os, theReqHdr.theRemWorld); os << hfpXAbort << ' ' << theAbortCoord.whether() << ' ' << theAbortCoord.where() << crlf; os << hfpXPhaseSyncPos << TheStatPhaseSync.phaseSyncPos() << crlf; } // build and put checksum header field void SrvXact::putChecksum(ostream &os) { BodyIter *i = theContentCfg->getBodyIter(theOid); WrBuf buf; i->start(&buf); while (*i) { i->pour(); theCheckAlg.update(buf.content(), buf.contSize()); buf.reset(); } theCheckAlg.final(); theContentCfg->putBodyIter(i); os << hfpContMd5; PrintBase64(os, theCheckAlg.sum().image(), theCheckAlg.sum().size()); os << crlf; } void SrvXact::putCookies(ostream &os) { SrvCfg *cfg = theOwner->cfg(); const int oidSeed = theOid.hash(); // check wether we should send (set) cookies with this response const int seedSend = GlbPermut(oidSeed, rndCookieSend); RndGen rng(seedSend); if (!rng.event(cfg->theCookieSendProb)) return; // calculate how many cookies we should send RndDistr *distrCount = seedOidDistr(cfg->theCookieCounts, rndCookieCount); const int count = (int)MiniMax(1.0, distrCount->trial(), (double)INT_MAX); RndDistr *sizeDistr = seedOidDistr(cfg->theCookieSizes, rndCookieSize); Size accSize = 0; for (int i = 0; i < count; ++i) { static const String cookieValuePfx = "sess"; static const String cookieValueSfx = "\""; os << hfpSetCookie << cookieValuePfx << i << "=\""; WrBuf &buf = theConn->theWrBuf; const Size cookieSize = (int)MiniMax(0.0, sizeDistr->trial(), (double)INT_MAX); const Size cookieContentOff = IOBuf::RandomOffset(oidSeed, accSize); const Size usedSize = (streamoff)os.tellp(); const Size spaceRemaining = buf.spaceSize() - usedSize; const bool fit = cookieSize + Size(cookieValueSfx.len() + 2) <= spaceRemaining; if (!fit && ReportError(errCookiesDontFit)) { Comment << "cookie size: " << cookieSize << " cookies buffered: " << i << '/' << count << " or " << accSize << "; space left: " << spaceRemaining << endc; } // yes, make buffer (ostream) full IOBuf::RandomFill(os, cookieContentOff, cookieSize); accSize += cookieSize; os << cookieValueSfx << crlf; if (!fit) break; } } void SrvXact::grokForeignUrl(const String &url, bool &ignoreUrls) { TheEmbedStats.foreignUrlRequested++; if (url.cmp("/pg/embed/", 10) == 0) { const int commaPos = url.find(','); ObjId &foid = theReqHdr.theUri.oid; // use url trailer (as is, base64 encoded, 24 bytes?) for seeds if (commaPos != String::npos) { const int seedSize = 3*sizeof(int); const Area &worldStr = url.area(commaPos+1, seedSize); const Area &nameStr = url.area(commaPos+1 + seedSize, seedSize); foid.world() = worldStr.size() >= seedSize ? UniqId::FromStr(worldStr) : UniqId::Create(); static RndGen rng; if (nameStr.size() >= seedSize) { int seed[4] = { 0, 0, 0, 0 }; memcpy(&seed, nameStr.data(), Min((int)sizeof(seed), nameStr.size())); rng.seed(LclPermut(seed[0] + seed[2], seed[1] + seed[3])); } foid.name(rng.ltrial() | 1); } const Area &category = url.area(10, commaPos - 10); if (theOwner->cfg()->setEmbedContType(foid, category)) { foid.target(theOwner->hostIdx()); foid.foreignSrc(false); foid.foreignUrl(String()); } else { static int count = 0; if (++count <= 100) { Comment(5) << "error: bad content type in URL: " << endc; printMsg(theConn->theRdBuf, theReqHdr.theHdrSize); } ignoreUrls = true; overwriteUrl(); // or we would not accept it } // TheEmbedStats.scriptSeen++; } else if (TheOpts.acceptForeignMsgs) { static bool informed = false; if (!informed) { Comment(5) << "fyi: received first foreign request URL: " << endc; printMsg(theConn->theRdBuf, theReqHdr.theHdrSize); informed = true; } ignoreUrls = true; overwriteUrl(); // or we would not accept it } } Error SrvXact::setViserv(const NetAddr &name) { int viserv = -1; HostCfg *host = TheHostMap->find(name, viserv); if (!host) { host = TheHostMap->find(theOwner->host(), viserv); const bool salvaged = Should(host != 0); if (ReportError(errForeignHostName)) { Comment << theOwner->host() << " server received request for " << name << " which is not an address of any visible server, " << (salvaged ? "salvaged" : "aborting transaction") << endc; } if (!salvaged) return errForeignHostName; } if (!host->thePubWorld) PubWorld::Add(host, new PubWorld()); theOid.viserv(viserv); return 0; } Error SrvXact::setTarget(const NetAddr &) { // XXX: the check below should know about DNS RR, etc. //if (target != theOwner->host()) // ReportError(errMisdirRequest); theOid.target(theOwner->hostIdx()); return 0; } // merge with CltXact::cfgAbortedReply()? bool SrvXact::cfgAbortedReq() const { if (!theReqHdr.theAbortCoord) return false; const Size abSz = theReqHdr.theAbortCoord.pos(theReqHdr.theHdrSize, theReqSize-theReqHdr.theHdrSize); if (!Should(abSz >= 0)) return false; if (abSz > theConsumedSize) return false; return true; } // check that requested URI belongs to our server Error SrvXact::checkUri() { const int pathLen = theReqHdr.theUri.pathLen; const bool sane = Should(pathLen >= 0) && Should(pathLen <= theConn->theRdBuf.contSize()) && Should(theReqHdr.theUri.pathBuf); if (!sane) { // no point in high-level checks Comment << "internal error: inconsistent request, salvaged" << endc; return errOther; } // first check if the content type belongs to us if (!theOwner->cfg()->hasContType(theOid.type())) { if (ReportError(errMisdirRequest)) { Comment << "host " << theOwner->host() << " does not have content type implied by request URI: "; Comment.write(theReqHdr.theUri.pathBuf, pathLen); Comment << endc; } return errOther; } // we cannot compare Oid2UrlPath() with pathBuf of special URLs if (pathLen >= 4 && strncmp("/pg/", theReqHdr.theUri.pathBuf, 4) == 0) return 0; ostringstream os; // XXX: expensive mallocs Oid2UrlPath(theOid, os); os << ends; const char *buf = os.str().c_str(); const bool res = (pathLen+1 == (int)os.tellp()) && strncmp(buf, theReqHdr.theUri.pathBuf, pathLen) == 0; streamFreeze(os, false); if (!res) { if (ReportError(errMisdirRequest)) { Comment << "host: " << theOwner->host() << endc; (Comment << "received: ").write(theReqHdr.theUri.pathBuf, pathLen); Comment << endc; Oid2UrlPath(theOid, Comment << "expected: "); Comment << endc; } return errOther; } return 0; } void SrvXact::updatePubWorld(const ObjWorld &newSlice) { PubWorld &world = *TheHostMap->findPubWorldAt(theOid.viserv()); int sliceIdx; if (world.find(newSlice.id(), sliceIdx)) world.sliceAt(sliceIdx).update(newSlice); else world.add(newSlice); } void SrvXact::putRemWorld(ostream &os, const ObjWorld &oldSlice) const { PubWorld &world = *TheHostMap->findPubWorldAt(theOid.viserv()); int sliceIdx; if (world.find(oldSlice.id(), sliceIdx)) { if (const PubWorldSlice *slice = world.newerSlice(oldSlice, sliceIdx)) os << hfpXRemWorld << *slice << crlf; } // else client-side sent remote world ID before the same public world ID } void SrvXact::logStats(OLog &ol) const { Xaction::logStats(ol); ol << theOwner->seqvId(); ol << (int)theReqHdr.theIms.sec(); }