/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "xstd/h/signal.h" #include #include #include "xstd/h/new.h" #include "xstd/h/fcntl.h" #include "xstd/h/net/if.h" #include "xstd/h/iostream.h" #include "xstd/h/iomanip.h" #include "xstd/h/process.h" /* for _getpid() on W2K */ #include "xstd/Socket.h" #include "xstd/AlarmClock.h" #include "xstd/FileScanner.h" #include "xstd/ResourceUsage.h" #include "xstd/Rnd.h" #include "xstd/gadgets.h" #include "xstd/getIfAddrs.h" #include "xstd/InetIfReq.h" #include "xstd/NetIface.h" #include "xstd/rndDistrs.h" #include "base/RndPermut.h" #include "base/UniqId.h" #include "base/BStream.h" #include "base/polyLogTags.h" #include "base/polyLogCats.h" #include "runtime/AddrMap.h" #include "runtime/HostMap.h" #include "runtime/AddrSubsts.h" #include "runtime/HttpDate.h" #include "runtime/StatCycle.h" #include "runtime/StatPhase.h" #include "runtime/StatPhaseMgr.h" #include "runtime/StatsSampleMgr.h" #include "runtime/ErrorMgr.h" #include "runtime/Notifier.h" #include "runtime/PolyOLog.h" #include "runtime/LogComment.h" #include "runtime/SharedOpts.h" #include "runtime/Agent.h" #include "runtime/AgentCfg.h" #include "runtime/PersistWorkSetMgr.h" #include "runtime/httpHdrs.h" #include "runtime/polyErrors.h" #include "runtime/polyBcastChannels.h" #include "runtime/globals.h" #include "csm/ContentSel.h" #include "pgl/PglPp.h" #include "pgl/PglParser.h" #include "pgl/PglStaticSemx.h" #include "pgl/PglCtx.h" #include "pgl/PglArraySym.h" #include "pgl/PglNetAddrSym.h" #include "pgl/PglNetAddrRange.h" #include "pgl/AgentSym.h" #include "pgl/AgentArrIter.h" #include "pgl/ServerSym.h" #include "pgl/PhaseSym.h" #include "pgl/BenchSym.h" #include "pgl/BenchSideSym.h" #include "app/DebugSwitch.h" #include "app/PolyApp.h" #include "app/BeepDoorman.h" #include "app/shutdown.h" static StatCycle *TheStatCycle = 0; #if FIND_MEMORY_LEAKS static void DebugMemSignal(int s); #endif /* PolyApp */ PolyApp::PolyApp(): theBeepDoorman(0), isIdle(false), theStateCount(0) { theChannels.append(ThePhasesEndChannel); } PolyApp::~PolyApp() { for (int i = 0; i < theLocals.count(); ++i) delete theLocals[i]; Socket::Clean(); delete theBeepDoorman; } // scans all FDs once void PolyApp::step() { Time tout = isIdle ? theIdleEnd - TheClock : (Time)TheOpts.theIdleTout; Time *toutp = &tout; Clock::Update(false); if (!TheAlarmClock.on()) { if (tout < 0) toutp = 0; } else if (tout < 0) { tout = TheAlarmClock.timeLeft(); } else { tout = Min(tout, TheAlarmClock.timeLeft()); } // avoid multiple micro iterations when alarm time is very close if (tout > 0) tout = Max(Time::Msec(1), tout); checkTiming(tout); const int readyCount = scan(toutp); if (ShutdownRequested()) return; if (readyCount < 0) { if (const Error err = Error::LastExcept(EINTR)) FatalError(err); } // start "idle state" if needed if (!readyCount) { if (!isIdle && TheOpts.theIdleTout >= 0) { isIdle = true; theIdleBeg = TheClock; theIdleEnd = theIdleBeg + TheOpts.theIdleTout; } } else { isIdle = false; } Clock::Update(); checkProgressReport(); if (!readyCount && isIdle && TheClock.time() >= theIdleEnd) wasIdle(TheClock - theIdleBeg); } int PolyApp::scan(Time *toutp) { static int scanCount = 0; static Time zeroTout = Time::Sec(0); const int hotCount = scanCount++ % TheOpts.thePrioritySched == 0 ? TheFileScanner->scan(toutp) : TheFileScanner->scan(fsupAsap, &zeroTout); if (!hotCount) scanCount = 0; return hotCount; } void PolyApp::begCycle(int) { Clock::Update(false); theTickCount = 0; } void PolyApp::endCycle() { } bool PolyApp::tick() { if (++theTickCount % 8 == 0) Clock::Update(false); return !ShutdownRequested(); } void PolyApp::checkTiming(Time drift) { // make these members static Time nextDriftToReport = Time::Msec(100); static Time lastCheck = TheClock; if (drift < 0) { // we are behind drift = -drift; if (drift >= nextDriftToReport) { if (ReportError(errTimingDrift)) Comment(1) << "record level of timing drift: " << drift << "; last check was " << (TheClock-lastCheck) << " ago" << endc; nextDriftToReport = drift + drift/5; } } lastCheck = TheClock; } void PolyApp::checkProgressReport() const { const Time reportGap = Time::Sec(5*60); static Time lastReport = TheClock; if (lastReport + reportGap <= TheClock) { lastReport = TheClock; reportRUsage(); Broadcast(TheInfoChannel, BcastRcver::ieReportProgress); } } void PolyApp::noteMsgStrEvent(BcastChannel *ch, const char *msg) { Assert(ch == ThePhasesEndChannel); // quit now unless we need to wait for inactivity timeout if (TheOpts.theIdleTout < 0) ShutdownReason(msg); } void PolyApp::wasIdle(Time tout) { if (TheOpts.theIdleTout >= 0 && tout >= TheOpts.theIdleTout) { Comment << "was idle for at least " << tout << endc; ShutdownReason("inactivity timeout"); } } bool PolyApp::handleCmdLine(int argc, char *argv[]) { Array opts; getOpts(opts); theCmdLine.configure(opts); if (!theCmdLine.parse(argc, argv)) return false; // validate command line params for (int i = 0; i < opts.count(); ++i) if (!opts[i]->validate()) return false; return true; } void PolyApp::configureBinLog(OLog &log, const String &fname, SizeOpt &bufSz) { if (ostream *os = new ofstream(fname.cstr(), ios::binary|ios::out|ios::trunc)) { log.stream(fname, os); } else { cerr << ThePrgName << ": cannot create binary log; " << fname << ": " << Error::Last() << endl << xexit; } // we cannot shrink log size if (bufSz > log.capacity()) log.capacity(bufSz); bufSz.set(log.capacity()); } void PolyApp::configureLogs(int prec) { // redirect console output if (TheOpts.theConsFileName && TheOpts.theConsFileName != "-") redirectOutput(TheOpts.theConsFileName.cstr()); configureStream(cout, prec); configureStream(cerr, prec); configureStream(clog, prec); /* initialize binary log files */ if (TheOpts.theLogFileName) { configureBinLog(TheOLog, TheOpts.theLogFileName, TheOpts.theLogBufSize); TheOLog.period(Time::Sec(5)); // flush at most that often } if (TheOpts.theSmplLogFileName) { TheSmplOLog = new PolyOLog(); configureBinLog(*TheSmplOLog, TheOpts.theSmplLogFileName, TheOpts.theSmplLogBufSize); // sample log is not flushed on a periodic basis } else { // use general purpose log TheOpts.theSmplLogFileName.val(TheOpts.theLogFileName); TheOpts.theSmplLogBufSize.set(TheOpts.theLogBufSize); TheSmplOLog = &TheOLog; } Comment.TheEchoLevel = TheOpts.theVerbLevel; } void PolyApp::configureRnd() { ThePersistWorkSetMgr.loadSeeds(); // must be called before seeds are used // set random seeds GlbPermut().reseed(TheOpts.theGlbRngSeed); LclPermut().reseed(TheOpts.theLclRngSeed); RndGen::TheDefSeed = LclPermut(TheOpts.theLclRngSeed); // use the seed as uid "space" index for non-unique worlds if (!TheOpts.useUniqueWorld) UniqId::Space(TheOpts.theLclRngSeed); TheGroupId = UniqId::Create(); ThePersistWorkSetMgr.configure(); // must be called after UniqId::Space() } // collect server configurations into TheHostMap void PolyApp::configureHosts() { TheAddrMap = new AddrMap(); TheAddrMap->configure(PglStaticSemx::TheAddrMapsToUse); TheAddrSubsts = new AddrSubsts; TheAddrSubsts->configure(PglStaticSemx::TheAddrSubstsToUse); AgentArrIter i(PglStaticSemx::TheAgentsToUse, 0, "Server"); // count the number of servers for HostMap // include AddrMap names in the count as we add them below int srvCount = TheAddrMap->nameCount(); for (i.includeProxySides(true), i.restart(); i; ++i) srvCount++; TheHostMap = new HostMap(srvCount); // create selectors for each server configuration symbol // note: cfgCount returns the number of all agents, not just servers theContentSels.stretch(i.cfgCount()); for (i.includeProxySides(false), i.restart(); i; ++i) { if (theContentSels.count() <= i.cfgIdx()) { ContentSel *sel = new ContentSel; sel->configure((const ServerSym*)i.agentSym()); theContentSels.put(sel, i.cfgIdx()); } } Assert(theContentSels.count() <= i.cfgCount()); // assign content selectors to hosts for (i.includeProxySides(false), i.restart(); i; ++i) { int idx = -1; const NetAddr &host = i.host(); if (host.isDomainName()) { cerr << here << "server address must be an IP, got: " << host << endl << xexit; } if (TheHostMap->find(host, idx)) { cerr << here << "server address " << host << " repeated/use()d twice!" << endl << xexit; } HostCfg *hostCfg = TheHostMap->addAt(idx, host); ContentSel *sel = theContentSels[i.cfgIdx()]; Assert(sel); hostCfg->theContent = sel; // add host addresses by default if (!TheAddrMap->findAddr(host)) TheAddrMap->add(host); } // assign SSL wraps to hosts, including proxies for (i.includeProxySides(true), i.restart(); i; ++i) { int idx = -1; const NetAddr &host = i.host(); HostCfg *hostCfg = 0; if (TheHostMap->find(host, idx)) { hostCfg = TheHostMap->at(idx); } else { hostCfg = TheHostMap->addAt(idx, host); } if (hostCfg->theSslWrap) { cerr << i.agentSym()->loc() << "[proxy] server address " << host << " repeated or use()d twice" << endl << xexit; } // XXX: very inefficient, should drag servers' TheSharedCfgs here?? AgentCfg agentCfg; agentCfg.configure((const ServerSym*)i.agentSym()); const SslWrap *wrap = 0; if (agentCfg.selectSslWrap(wrap)) { hostCfg->theSslWrap = wrap; Comment(3) << "fyi: expecting SSL proxy or server at " << host << endc; } } // add all missing visible names; needed to get to public worlds for (int viserv = 0; viserv < TheAddrMap->nameCount(); ++viserv) { const NetAddr &viname = TheAddrMap->nameAt(viserv); int idx = -1; if (!TheHostMap->find(viname, idx)) TheHostMap->addAt(idx, viname); // we do not create public world by default to save RAM if // it happens to be not-needed } } // get a list of all available interface addresses void PolyApp::getIfaces() { // manually configured [faked] addresses take priority if (TheOpts.theFakeHosts) { getFakeIfaces(); return; } Array addrs; if (!::GetIfAddrs(addrs, String())) cerr << ThePrgName << ": cannot get a list of all available network interfaces: " << Error::Last() << endl << xexit; theIfaces.stretch(addrs.count()); for (int i = 0; i < addrs.count(); ++i) theIfaces.append(addrs[i].addrN()); } void PolyApp::getFakeIfaces() { // note each list item may be an IP range for (int i = 0; i < TheOpts.theFakeHosts.val().count(); ++i) { const String &item = *TheOpts.theFakeHosts.val()[i]; PglNetAddrRange range; if (!range.parse(item)) cerr << ThePrgName << ": cannot convert fake host `" << item << "' to an IP address or range" << endl << xexit; Array addrs; range.toAddrs(addrs); for (int a = 0; a < addrs.count(); ++a) theIfaces.append(addrs[a]->addrN()); while (addrs.count()) delete addrs.pop(); } } // creates IP addresses for agents to bind to void PolyApp::makeAddresses() { Array hosts; Array agents; getHostAddrs(hosts); getAgentAddrs(agents); ostringstream err; makeAddresses(hosts, agents, err); if (err.tellp()) { err << ends; Comment(5) << "fyi: " << err.str() << "; will not attempt to create agent addresses" << endc; streamFreeze(err, false); } // cleanup while (hosts.count()) delete hosts.pop(); while (agents.count()) delete agents.pop(); theIfaces.reset(); while (theCleanIfaces.count()) delete theCleanIfaces.pop(); } ostream &PolyApp::makeAddresses(Array &hosts, AddrSyms &agents, ostream &err) { if (!PglStaticSemx::TheBench) return err << "no bench selected with use()"; if (!hosts.count()) return err << "no real host addresses for " << theAgentType << " side specified"; if (!agents.count()) return err << "no " << theAgentType << " agents found"; const int agentsPerHost = agents.count() / hosts.count(); if (agentsPerHost * hosts.count() != agents.count()) return err << "the number of virtual agent addresses (" << agents.count() << ") is not divisible by the number of " << "real host addresess (" << hosts.count() << ')'; // check that all addresses have ifnames and subnets for (int a = 0; a < agents.count(); ++a) { const NetAddrSym *as = agents[a]; if (!as->ifName()) { as->print(err << "no interface name for ", "") << " address"; return err << " of the " << theAgentType << " agent"; } int subnet; if (!as->subnet(subnet)) { as->print(err << "no subnet for ", "") << " address"; return err << " of the " << theAgentType << " agent"; } } getIfaces(); // create aliases for each local host address int createCount = 0; for (int h = 0; h < hosts.count(); ++h) { const NetAddr &host = *hosts[h]; // check if host address is local bool found = false; for (int l = 0; !found && l < theIfaces.count(); ++l) found = theIfaces[l] == host.addrN(); if (found) createCount += makeAddresses(h, agents, agentsPerHost); } if (!createCount) { cerr << ThePrgName << ": no specified host addresses match local addresses" << endl; cerr << "host addresses:"; for (int h = 0; h < hosts.count(); ++h) cerr << ' ' << *hosts[h]; cerr << endl; cerr << "local addresses: "; for (int l = 0; l < theIfaces.count(); ++l) cerr << ' ' << NetAddr(theIfaces[l], -1); cerr << endl; cerr << xexit; } Comment(6) << "fyi: created " << createCount << " agent addresses total" << endc; return err; } int PolyApp::makeAddresses(int hidx, AddrSyms &agents, int agentsPerHost) { int aliasCount = 0; for (int i = hidx*agentsPerHost; i < agents.count() && aliasCount < agentsPerHost; ++i, ++aliasCount) { int subnet; Assert(agents[i]->subnet(subnet)); const String &ifName = agents[i]->ifName(); Assert(ifName); deleteAddresses(ifName); const NetAddr &addr = agents[i]->val(); const InAddress netmask = InAddress::NetMask(subnet); NetIface iface; iface.name(ifName); if (!Should(iface.addAlias(addr.addrN(), aliasCount, netmask))) { agents[i]->print(cerr << "error: " << ifName << ": failed to create new alias (", "") << ')' << endl << xexit; } } Assert(aliasCount == agentsPerHost); return aliasCount; } // delete old aliases if needed void PolyApp::deleteAddresses(const String &ifname) { if (!TheOpts.deleteOldAliases) return; // check if we have done that already for (int i = 0; i < theCleanIfaces.count(); ++i) { if (*theCleanIfaces[i] == ifname) return; } NetIface iface; iface.name(ifname); const int delCount = iface.delAliases(); if (Should(delCount >= 0)) { Comment << "fyi: " << ifname << ": deleted " << delCount << " old IP aliases" << endc; } else { Comment << "error: " << ifname << ": failed to delete old IP aliases" << endc; } // remember that we tried to clean this iface theCleanIfaces.append(new String(ifname)); } void PolyApp::getHostAddrs(Array &hosts) const { if (PglStaticSemx::TheBench && PglStaticSemx::TheBench->side(sideName())) PglStaticSemx::TheBench->side(sideName())->hosts(hosts); } void PolyApp::getAgentAddrs(AddrSyms &agents) const { for (AgentArrIter i(PglStaticSemx::TheAgentsToUse, 0, theAgentType); i; ++i) agents.append(&(NetAddrSym&)i.hostSym()->clone()->cast("addr")); } NetAddrSym *PolyApp::getAgentAddrMask() const { Assert(PglStaticSemx::TheBench); const String name = sideName(); const BenchSideSym *side = PglStaticSemx::TheBench->side(name); return side ? side->addrMaskSym() : 0; } void PolyApp::makeAgents() { // PglCtx::RootCtx()->report(cout, ""); makeAddresses(); Clock::Update(false); getIfaces(); // create and configure agents that are assigned to our host // configuration process may take a while; inform about the progress const Time cfgReportGap = Time::Sec(5); // how often to report const Time cfgStart = TheClock; Time cfgNextReport = cfgStart + cfgReportGap; for (AgentArrIter ai(PglStaticSemx::TheAgentsToUse, &theIfaces, theAgentType); ai; ++ai) { makeAgent(*ai.agentSym(), ai.host()); Clock::Update(false); if (TheClock.time() >= cfgNextReport) { Comment(5) << "created " << setw(6) << theLocals.count() << " agents so far" << endc; cfgNextReport = TheClock + cfgReportGap; // drift is OK } } Clock::Update(false); Comment(4) << "created " << theLocals.count() << " agents total" << endc; if (!theLocals.count()) { cerr << ThePrgName << ": no " << theAgentType << " matches local interface addresses" << endl; ArraySym agentAddrs("addr"); {for (AgentArrIter i(PglStaticSemx::TheAgentsToUse, 0, theAgentType); i; i.nextSym()) i.agentSym()->addresses(agentAddrs); } ArraySym localAddrs("addr"); {for (int i = 0; i < theIfaces.count(); ++i) { NetAddrSym s; s.val(NetAddr(theIfaces[i], -1)); localAddrs.add(s); }} agentAddrs.print(cerr << theAgentType << " addresses:", ""); cerr << endl; localAddrs.print(cerr << "local addresses: ", ""); cerr << endl; cerr << xexit; } } void PolyApp::describeLocals() const { for (int i = 0; i < theLocals.count(); ++i) { theLocals[i]->describe(Comment(5) << theAgentType << ' '); Comment << endc; } } void PolyApp::addAgent(Agent *agent) { Assert(agent); theLocals.append(agent); } // configure StatPhaseMgr with user specified phases void PolyApp::buildSchedule() { if (!PglStaticSemx::TheSchedule.count()) { cerr << ThePrgName << ": warning: no run phases were specified; generating and using default phase" << endl; PhaseSym *ps = new PhaseSym(); ps->loc(TokenLoc("-")); ps->name("dflt"); PglStaticSemx::TheSchedule.append(ps); } const StatPhase *prevPh = 0; for (int i = 0; i < PglStaticSemx::TheSchedule.count(); ++i) { StatPhase *p = new StatPhase; p->configure(PglStaticSemx::TheSchedule[i], prevPh); TheStatPhaseMgr.addPhase(p); prevPh = p; } } // top level configure routine void PolyApp::configure() { Socket::Configure(); configureLogs(2); ReqHdr::Configure(); RepHdr::Configure(); startListen(); // ignore some signals signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); // set resource limits const int rlimit = SetMaxFD(GetMaxFD()); if (rlimit > FD_SETSIZE) cerr << here << "warning: getrlimit(2) system call reports " << rlimit << " file descriptors while FD_SETSIZE #defines only " << FD_SETSIZE << "; using lower value." << endl; const int fdLimit = Min(FD_SETSIZE, rlimit); if (fdLimit <= 0) FatalError(errZeroFDLimit); TheFileScanner = TheOpts.theFileScanner.val(); TheFileScanner->configure(fdLimit); TheFileScanner->ticker(this); // to be safe, cut about 3% from the actual limit Socket::TheMaxLevel = Max(0, fdLimit - 10 - fdLimit/33); // honor manually configured limit if it is lower if (TheOpts.theFDLimit >= 0 && TheOpts.theFDLimit < Socket::TheMaxLevel) Socket::TheMaxLevel = TheOpts.theFDLimit; TheOpts.theFDLimit.set(Socket::TheMaxLevel); configureHosts(); /* statistics */ TheStatCycle = new StatCycle; TheStatCycle->period(TheOpts.theStatCycleLen); buildSchedule(); // configures TheStatPhaseMgr if (PglStaticSemx::TheSmplSchedule.count()) TheStatsSampleMgr.configure(PglStaticSemx::TheSmplSchedule); // configure notification mechanism (old custom format) if (TheOpts.theNotifAddr) { Notifier *n = new Notifier(TheOpts.theRunLabel, TheOpts.theNotifAddr); if (n->active()) TheStatCycle->notifier(n); else delete n; } // configure notification mechanism (beep) if (TheOpts.theBeepDoormanListAt || TheOpts.theBeepDoormanSendTo) { theBeepDoorman = new BeepDoorman; theBeepDoorman->configure(TheOpts.theBeepDoormanListAt, TheOpts.theBeepDoormanSendTo); } atexit(&ShutdownAtExit); Should(xset_new_handler(&ShutdownAtNew)); } void PolyApp::getOpts(Array &opts) { opts.append(&TheOpts); } void PolyApp::parseConfigFile(const String &fname) { Assert(fname); TheOpts.theCfgDirs.copy(PglPp::TheDirs); PglPp pp(fname); PglParser parser(&pp); if (const SynSym *s = parser.parse()) { PglStaticSemx semx; semx.interpret(*s); } else { cerr << here << "internal error: failed to interpret parsed " << fname << endl << xexit; } // save PGL configuration to log it later thePglCfg = pp.image(); } void PolyApp::reportRUsage() const { ResourceUsage ru = ResourceUsage::Current(); Comment(5) << "resource usage: " << endl; ru.report(Comment, "\t"); Comment << endc; } void PolyApp::reportCfg() { theCmdLine.reportRaw(Comment(1)); Comment << endc; theCmdLine.reportParsed(Comment(2)); Comment << endc; // report server configs Comment(7) << "Server content distributions:" << endl; for (int i = 0; i < theContentSels.count(); ++i) { if (theContentSels[i]) theContentSels[i]->reportCfg(Comment); } Comment << endc; TheStatPhaseMgr.reportCfg(Comment(2) << "Phases:" << endl); Comment << endc; TheStatsSampleMgr.reportCfg(Comment(2) << "StatsSamples:" << endl); Comment << endc; Comment(2) << "FDs: " << GetCurFD() << " out of " << GetMaxFD() << " FDs can be used; safeguard limit: " << Socket::TheMaxLevel << endc; reportRUsage(); Comment(1) << "group-id: " << TheGroupId << " pid: " << getpid() << endc; TheOLog << bege(lgGroupId, lgcAll) << TheGroupId << ende; Clock::Update(false); Comment << "current time: " << TheClock.time() << " or "; HttpDatePrint(Comment) << endc; } void PolyApp::logCfg() { // PGL configuration TheOLog << bege(lgPglCfg, lgcAll) << thePglCfg << ende; Comment(5) << "fyi: PGL configuration stored (" << thePglCfg.len() << "bytes)" << endc; TheOLog << bege(lgContTypeKinds, lgcAll); ContTypeStat::Store(TheOLog); TheOLog << ende; } void PolyApp::flushState() { theStateCount++; logState(TheOLog); Comment(5) << "fyi: current state (" << theStateCount << ") stored" << endc; } // except for random seeds void PolyApp::loadPersistence() { ThePersistWorkSetMgr.loadPubWorlds(); // load per-agent data like private worlds if (IBStream *is = ThePersistWorkSetMgr.loadSideState()) { const int storedCount = is->geti(); ThePersistWorkSetMgr.checkInput(); if (storedCount != theLocals.count()) { Comment << "warning: persistent working set stored in " << TheOpts.doLoadWorkSet << " has information about " << storedCount << ' ' << theAgentType << " agent(s) while " << "the current test has " << theLocals.count() << " agent(s)" << endc; } for (int i = 0; i < theLocals.count() && i < storedCount; ++i) { if (i < storedCount) theLocals[i]->loadWorkingSet(*is); else theLocals[i]->missWorkingSet(); } ThePersistWorkSetMgr.checkInput(); Comment << "fyi: working set loaded from " << is->name() << "; id: " << ThePersistWorkSetMgr.id() << ", version: " << ThePersistWorkSetMgr.version() << endc; } } // except for random seeds void PolyApp::storePersistence() { ThePersistWorkSetMgr.storePubWorlds(); // store per-agent data like private worlds if (OBStream *os = ThePersistWorkSetMgr.storeSideState()) { *os << theLocals.count(); for (int i = 0; i < theLocals.count(); ++i) theLocals[i]->storeWorkingSet(*os); ThePersistWorkSetMgr.checkOutput(); Comment << "fyi: working set stored in " << os->name() << "; id: " << ThePersistWorkSetMgr.id() << ", version: " << ThePersistWorkSetMgr.version() << endc; } } void PolyApp::logState(OLog &log) { TheOLog << bege(lgAppState, lgcAll) << theStateCount << ende; Broadcast(TheLogStateChannel, &log); } void PolyApp::startServices() { TheStatPhaseMgr.start(); TheStatCycle->start(); TheStatsSampleMgr.start(); if (theBeepDoorman) theBeepDoorman->start(); } void PolyApp::startAgents() { Comment(1) << "starting " << theLocals.count() << " HTTP agents..." << endc; for (int i = 0; i < theLocals.count(); ++i) theLocals[i]->start(); } int PolyApp::run(int argc, char *argv[]) { ThePrgName = argv[0]; Clock::Update(); if (!handleCmdLine(argc, argv)) return -1; // must preceed configureRnd() call ThePersistWorkSetMgr.openInput(TheOpts.doLoadWorkSet); configureRnd(); // must preceed parsing parseConfigFile(TheOpts.theCfgFileName); Clock::Update(); configure(); Clock::Update(false); reportCfg(); Clock::Update(false); logCfg(); Clock::Update(false); makeAgents(); Clock::Update(false); loadPersistence(); ThePersistWorkSetMgr.closeInput(); describeLocals(); // after persistence (hence, agent IDs) is loaded // free some memory PglStaticSemx::Destroy(); thePglCfg = 0; flushState(); // flush logs headers TheOLog.flush(); TheSmplOLog->flush(); // quit nicely on some signals // note: install this handler after configuration is done // so a program can be killed if configure takes forever signal(SIGINT, (SignalHandler*)&ShutdownSignal); // other signal handlers signal(SIGUSR1, (SignalHandler*)&DebugSignal); #if FIND_MEMORY_LEAKS signal(SIGUSR2, (SignalHandler*)&DebugMemSignal); #endif Clock::Update(); startServices(); Clock::Update(); startAgents(); Clock::Update(); while (!ShutdownNow()) { step(); } reportRUsage(); flushState(); ThePersistWorkSetMgr.openOutput(TheOpts.doStoreWorkSet); ThePersistWorkSetMgr.storeSeeds(); storePersistence(); ThePersistWorkSetMgr.close(); return 0; } #if FIND_MEMORY_LEAKS #warning FIND_MEMORY_LEAKS code is enabled, USR2 signal starts a sample #include #include int MemDebugIgnore = 0; // global void *TheMainAddress = 0; int MemDumpCount = 0; static void dumpStack(ostream &os) { #ifdef __GNUC__ void *addr = &dumpStack; // __builtin_return_address parameter must be a constant # define dumpOneAddress(level) \ if (addr && addr > TheMainAddress) { \ addr = __builtin_return_address(level); \ cerr << '-' << addr; \ } dumpOneAddress(1); dumpOneAddress(2); dumpOneAddress(3); dumpOneAddress(4); dumpOneAddress(5); dumpOneAddress(6); dumpOneAddress(7); dumpOneAddress(8); dumpOneAddress(9); dumpOneAddress(10); dumpOneAddress(11); dumpOneAddress(12); dumpOneAddress(13); dumpOneAddress(14); dumpOneAddress(15); dumpOneAddress(16); dumpOneAddress(17); dumpOneAddress(18); dumpOneAddress(19); dumpOneAddress(20); dumpOneAddress(21); dumpOneAddress(22); dumpOneAddress(23); dumpOneAddress(24); dumpOneAddress(25); // arbitrary limit #endif } static void countNew(size_t size, void *data, const char *kind) { static unsigned long calls = 0; calls++; if (MemDumpCount <= 0) return; /*if (size == 68 && *kind == 'i') // RndBodyIter return; if (size == 4 && *kind == 'i') return; if (size == 100 && *kind == 'a') return; if (size == 608 && *kind == 'i') // SrvXact return; */ if (MemDumpCount > 0) { if (size == 68 && *kind == 'i') { clog << here << "#new: " << calls << " sz: " << size << " ptr: " << data << ' ' << kind << " at " << MemDumpCount << " trace:"; dumpStack(clog); clog << endl; } MemDumpCount--; //if (!MemDebugIgnore && size == 20 && *kind == 'a') // abort(); } } static void countFree(void *data, const char *kind) { static unsigned long calls = 0; calls++; if (MemDumpCount <= 0) return; clog << here << "#free: " << calls << " sz: ? ptr: " << data << ' ' << kind << " at " << MemDumpCount << endl; } void *operator new(size_t size) throw (std::bad_alloc) { void *data = malloc(size); countNew(size, data, "item"); return data; } void *operator new[](size_t size) throw (std::bad_alloc) { void *data = malloc(size); countNew(size, data, "arr"); return data; } void operator delete(void *data) throw() { countFree(data, "item"); free(data); } void operator delete[](void *data) throw() { countFree(data, "arr"); free(data); } static void DebugMemSignal(int s) { clog << "got debug signal (" << s << ')' << endl; MemDumpCount = 25000; } #endif #if 0 /* main() template */ int main(int argc, char *argv[]) { // extern void *TheMainAddress; // TheMainAddress = &main; XxxApp app; return app.run(argc, argv); } #endif