/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "xstd/h/iostream.h" #include "xstd/h/iomanip.h" #include "xstd/Rnd.h" #include "xstd/rndDistrs.h" #include "xstd/TblDistr.h" #include "xstd/gadgets.h" #include "base/RndPermut.h" #include "base/ObjId.h" #include "base/polyLogCats.h" #include "base/polyLogTags.h" #include "base/rndDistrStat.h" #include "runtime/Connection.h" #include "runtime/HostMap.h" #include "runtime/AddrMap.h" #include "runtime/ErrorMgr.h" #include "runtime/LogComment.h" #include "runtime/StatPhaseMgr.h" #include "runtime/PopModel.h" #include "runtime/StatPhase.h" #include "runtime/polyBcastChannels.h" #include "runtime/polyErrors.h" #include "runtime/globals.h" #include "runtime/SslWrap.h" #include "runtime/SslWraps.h" #include "pgl/ServerSym.h" #include "pgl/SocketSym.h" #include "pgl/PopModelSym.h" #include "server/SrvCfg.h" #include "server/SrvConnMgr.h" #include "server/SrvOpts.h" #include "server/Server.h" XactFarm *Server::TheXacts; SrvSharedCfgs *Server::TheSharedCfgs = new SrvSharedCfgs; void Server::Farm(XactFarm *aFarm) { Assert(!TheXacts && aFarm); TheXacts = aFarm; } void Server::LogState(OLog &) { } Server::Server(): theCfg(0), theHostIdx(-1), theReqCount(0) { theConnMgr = new SrvConnMgr(this); // for now theChannels.append(TheLogCfgChannel); theChannels.append(TheLogStateChannel); startListen(); } Server::~Server() { stop(); delete theConnMgr; } void Server::configure(const ServerSym *cfg, const NetAddr &aHost) { Assert(theConnMgr); SockOpt opt; Agent::configure(cfg, aHost, opt); if (theHost.port() < 0) { cerr << cfg->loc() << "no port specified for the server `" << theKind << "' running on " << theHost << endl; exit(-3); } theCfg = TheSharedCfgs->getConfig(cfg); // XXX: cannot Assert: pxy server does not have a HostMap entry if (!TheHostMap->find(aHost, theHostIdx)) theHostIdx = -1; // setup listen socket (XXX: should be moved to start(), use Comment) Must(theSock.create(theHost.addrN().family())); Must(theSock.blocking(false)); Should(theSock.reuseAddr(true)); Must(theSock.configure(opt)); if (!Should(theSock.bind(theHost)) || !Should(theSock.listen())) { cerr << "error: the server `" << theKind << "' cannot listen on " << theHost << "; " << Error::Last() << endl; exit(-3); } int acceptLmt = -1; cfg->acceptLmt(acceptLmt); theConnMgr->configure(opt, cfg->pconnUseLmt()); theConnMgr->acceptLmt(acceptLmt); theConnMgr->idleTimeout(cfg->idlePconnTimeout()); const SslWrap *sslWrap = 0; if (theCfg->selectSslWrap(sslWrap)) { // sticky selection theSslCtx = sslWrap->makeServerCtx(theHost); theConnMgr->configureSsl(theSslCtx, sslWrap); } if (!TheAddrMap->has(theHost)) // PolyApp adds only if findAddr() fails TheAddrMap->add(theHost); selectHttpVersion(*theCfg); isCookieSender = theCfg->selectCookieSenderStatus(); if (isCookieSender && (!theCfg->theCookieSizes || !theCfg->theCookieCounts)) { cerr << cfg->loc() << "error: cookie-sending server does not have " << "cookie size or count distributions configured" << endl; FatalError2(errOther, lgcSrvSide); } TheXacts->limit(1024); // magic, no good way to estimate } void Server::start() { Assert(theSock); Assert(theConnMgr); Agent::start(); theReserv = TheFileScanner->setFD(theSock.fd(), dirRead, this); } void Server::stop() { deaf(); Agent::stop(); } PopModel *Server::popModel() { return theCfg->thePopModel; } void Server::selectRepType(ObjId &oid) { // XXX: server side does not support global xaction counting yet // and hence cannot suuply all the parameters to tf.current below static RndGen rng; const TransFactor &tf = TheStatPhaseMgr->specialMsgFactor(); const int repType = rng.event(tf.current()) ? (int)theCfg->theRepTypeSel->trial() : rptBasic; oid.repToRedir(repType == rpt302Found); } void Server::noteXactDone(SrvXact *x) { Assert(x); theConnMgr->put(x->conn()); TheXacts->put(x); } void Server::noteReadReady(int) { if (!theConnMgr->accept(theSock)) { // fatal error deaf(); FatalError2(errServerGone, lgcSrvSide); } } void Server::noteConnReady(Connection *conn) { startXact(conn); // that's it?! } void Server::noteLogEvent(BcastChannel *ch, OLog &log) { if (ch == TheLogCfgChannel) { log << bege(lgSrvCfg, lgcSrvSide); // XXX: implement (SrvCfgRec) log << ende; } else if (ch == TheLogStateChannel) { log << bege(lgSrvState, lgcSrvSide); log << theSeqvId; log << ende; } } int Server::logCat() const { return lgcSrvSide; } // SrvConnMgr::User needs this const UniqId &Server::id() const { return Agent::id(); } void Server::deaf() { if (theReserv) TheFileScanner->clearRes(theReserv); if (theSock) { Comment(2) << "server " << theHost << " is closing listen socket " << theSock.fd() << " after " << theReqCount << " xactions" << endc; theSock.close(); } } void Server::startXact(Connection *conn) { theReqCount++; SrvXact *x = TheXacts->get(); x->exec(this, conn, theThinkDistr ? Time::Secd(theThinkDistr->trial()) : Time()); }