/* Web Polygraph http://www.web-polygraph.org/
* (C) 2003-2006 The Measurement Factory
* Licensed under the Apache License, Version 2.0 */
#include "base/polygraph.h"
#include "xstd/h/signal.h"
#include <sys/stat.h>
#include <fstream>
#include "xstd/h/new.h"
#include "xstd/h/fcntl.h"
#include "xstd/h/net/if.h"
#include "xstd/h/iostream.h"
#include "xstd/h/iomanip.h"
#include "xstd/h/process.h" /* for _getpid() on W2K */
#include "xstd/Socket.h"
#include "xstd/AlarmClock.h"
#include "xstd/FileScanner.h"
#include "xstd/ResourceUsage.h"
#include "xstd/Rnd.h"
#include "xstd/gadgets.h"
#include "xstd/getIfAddrs.h"
#include "xstd/InetIfReq.h"
#include "xstd/NetIface.h"
#include "xstd/rndDistrs.h"
#include "base/RndPermut.h"
#include "base/UniqId.h"
#include "base/BStream.h"
#include "base/polyLogTags.h"
#include "base/polyLogCats.h"
#include "runtime/AddrMap.h"
#include "runtime/HostMap.h"
#include "runtime/AddrSubsts.h"
#include "runtime/HttpDate.h"
#include "runtime/StatCycle.h"
#include "runtime/StatPhase.h"
#include "runtime/StatPhaseMgr.h"
#include "runtime/StatsSampleMgr.h"
#include "runtime/ErrorMgr.h"
#include "runtime/Notifier.h"
#include "runtime/PolyOLog.h"
#include "runtime/LogComment.h"
#include "runtime/SharedOpts.h"
#include "runtime/Agent.h"
#include "runtime/AgentCfg.h"
#include "runtime/PersistWorkSetMgr.h"
#include "runtime/httpHdrs.h"
#include "runtime/polyErrors.h"
#include "runtime/polyBcastChannels.h"
#include "runtime/globals.h"
#include "csm/ContentSel.h"
#include "pgl/PglPp.h"
#include "pgl/PglParser.h"
#include "pgl/PglStaticSemx.h"
#include "pgl/PglCtx.h"
#include "pgl/PglArraySym.h"
#include "pgl/PglNetAddrSym.h"
#include "pgl/PglNetAddrRange.h"
#include "pgl/AgentSym.h"
#include "pgl/AgentArrIter.h"
#include "pgl/ServerSym.h"
#include "pgl/PhaseSym.h"
#include "pgl/BenchSym.h"
#include "pgl/BenchSideSym.h"
#include "app/DebugSwitch.h"
#include "app/PolyApp.h"
#include "app/BeepDoorman.h"
#include "app/shutdown.h"
static StatCycle *TheStatCycle = 0;
#if FIND_MEMORY_LEAKS
static void DebugMemSignal(int s);
#endif
/* PolyApp */
PolyApp::PolyApp(): theBeepDoorman(0),
isIdle(false), theStateCount(0) {
theChannels.append(ThePhasesEndChannel);
}
PolyApp::~PolyApp() {
for (int i = 0; i < theLocals.count(); ++i)
delete theLocals[i];
Socket::Clean();
delete theBeepDoorman;
}
// scans all FDs once
void PolyApp::step() {
Time tout = isIdle ? theIdleEnd - TheClock : (Time)TheOpts.theIdleTout;
Time *toutp = &tout;
Clock::Update(false);
if (!TheAlarmClock.on()) {
if (tout < 0)
toutp = 0;
} else
if (tout < 0) {
tout = TheAlarmClock.timeLeft();
} else {
tout = Min(tout, TheAlarmClock.timeLeft());
}
// avoid multiple micro iterations when alarm time is very close
if (tout > 0)
tout = Max(Time::Msec(1), tout);
checkTiming(tout);
const int readyCount = scan(toutp);
if (ShutdownRequested())
return;
if (readyCount < 0) {
if (const Error err = Error::LastExcept(EINTR))
FatalError(err);
}
// start "idle state" if needed
if (!readyCount) {
if (!isIdle && TheOpts.theIdleTout >= 0) {
isIdle = true;
theIdleBeg = TheClock;
theIdleEnd = theIdleBeg + TheOpts.theIdleTout;
}
} else {
isIdle = false;
}
Clock::Update();
checkProgressReport();
if (!readyCount && isIdle && TheClock.time() >= theIdleEnd)
wasIdle(TheClock - theIdleBeg);
}
int PolyApp::scan(Time *toutp) {
static int scanCount = 0;
static Time zeroTout = Time::Sec(0);
const int hotCount = scanCount++ % TheOpts.thePrioritySched == 0 ?
TheFileScanner->scan(toutp) :
TheFileScanner->scan(fsupAsap, &zeroTout);
if (!hotCount)
scanCount = 0;
return hotCount;
}
void PolyApp::begCycle(int) {
Clock::Update(false);
theTickCount = 0;
}
void PolyApp::endCycle() {
}
bool PolyApp::tick() {
if (++theTickCount % 8 == 0)
Clock::Update(false);
return !ShutdownRequested();
}
void PolyApp::checkTiming(Time drift) {
// make these members
static Time nextDriftToReport = Time::Msec(100);
static Time lastCheck = TheClock;
if (drift < 0) { // we are behind
drift = -drift;
if (drift >= nextDriftToReport) {
if (ReportError(errTimingDrift))
Comment(1) << "record level of timing drift: " << drift
<< "; last check was " << (TheClock-lastCheck) << " ago" << endc;
nextDriftToReport = drift + drift/5;
}
}
lastCheck = TheClock;
}
void PolyApp::checkProgressReport() const {
const Time reportGap = Time::Sec(5*60);
static Time lastReport = TheClock;
if (lastReport + reportGap <= TheClock) {
lastReport = TheClock;
reportRUsage();
Broadcast(TheInfoChannel, BcastRcver::ieReportProgress);
}
}
void PolyApp::noteMsgStrEvent(BcastChannel *ch, const char *msg) {
Assert(ch == ThePhasesEndChannel);
// quit now unless we need to wait for inactivity timeout
if (TheOpts.theIdleTout < 0)
ShutdownReason(msg);
}
void PolyApp::wasIdle(Time tout) {
if (TheOpts.theIdleTout >= 0 && tout >= TheOpts.theIdleTout) {
Comment << "was idle for at least " << tout << endc;
ShutdownReason("inactivity timeout");
}
}
bool PolyApp::handleCmdLine(int argc, char *argv[]) {
Array<OptGrp*> opts;
getOpts(opts);
theCmdLine.configure(opts);
if (!theCmdLine.parse(argc, argv))
return false;
// validate command line params
for (int i = 0; i < opts.count(); ++i)
if (!opts[i]->validate())
return false;
return true;
}
void PolyApp::configureBinLog(OLog &log, const String &fname, SizeOpt &bufSz) {
if (ostream *os = new ofstream(fname.cstr(), ios::binary|ios::out|ios::trunc)) {
log.stream(fname, os);
} else {
cerr << ThePrgName << ": cannot create binary log; "
<< fname << ": " << Error::Last() << endl << xexit;
}
// we cannot shrink log size
if (bufSz > log.capacity())
log.capacity(bufSz);
bufSz.set(log.capacity());
}
void PolyApp::configureLogs(int prec) {
// redirect console output
if (TheOpts.theConsFileName && TheOpts.theConsFileName != "-")
redirectOutput(TheOpts.theConsFileName.cstr());
configureStream(cout, prec);
configureStream(cerr, prec);
configureStream(clog, prec);
/* initialize binary log files */
if (TheOpts.theLogFileName) {
configureBinLog(TheOLog, TheOpts.theLogFileName, TheOpts.theLogBufSize);
TheOLog.period(Time::Sec(5)); // flush at most that often
}
if (TheOpts.theSmplLogFileName) {
TheSmplOLog = new PolyOLog();
configureBinLog(*TheSmplOLog, TheOpts.theSmplLogFileName, TheOpts.theSmplLogBufSize);
// sample log is not flushed on a periodic basis
} else {
// use general purpose log
TheOpts.theSmplLogFileName.val(TheOpts.theLogFileName);
TheOpts.theSmplLogBufSize.set(TheOpts.theLogBufSize);
TheSmplOLog = &TheOLog;
}
Comment.TheEchoLevel = TheOpts.theVerbLevel;
}
void PolyApp::configureRnd() {
ThePersistWorkSetMgr.loadSeeds(); // must be called before seeds are used
// set random seeds
GlbPermut().reseed(TheOpts.theGlbRngSeed);
LclPermut().reseed(TheOpts.theLclRngSeed);
RndGen::TheDefSeed = LclPermut(TheOpts.theLclRngSeed);
// use the seed as uid "space" index for non-unique worlds
if (!TheOpts.useUniqueWorld)
UniqId::Space(TheOpts.theLclRngSeed);
TheGroupId = UniqId::Create();
ThePersistWorkSetMgr.configure(); // must be called after UniqId::Space()
}
// collect server configurations into TheHostMap
void PolyApp::configureHosts() {
TheAddrMap = new AddrMap();
TheAddrMap->configure(PglStaticSemx::TheAddrMapsToUse);
TheAddrSubsts = new AddrSubsts;
TheAddrSubsts->configure(PglStaticSemx::TheAddrSubstsToUse);
AgentArrIter i(PglStaticSemx::TheAgentsToUse, 0, "Server");
// count the number of servers for HostMap
// include AddrMap names in the count as we add them below
int srvCount = TheAddrMap->nameCount();
for (i.includeProxySides(true), i.restart(); i; ++i)
srvCount++;
TheHostMap = new HostMap(srvCount);
// create selectors for each server configuration symbol
// note: cfgCount returns the number of all agents, not just servers
theContentSels.stretch(i.cfgCount());
for (i.includeProxySides(false), i.restart(); i; ++i) {
if (theContentSels.count() <= i.cfgIdx()) {
ContentSel *sel = new ContentSel;
sel->configure((const ServerSym*)i.agentSym());
theContentSels.put(sel, i.cfgIdx());
}
}
Assert(theContentSels.count() <= i.cfgCount());
// assign content selectors to hosts
for (i.includeProxySides(false), i.restart(); i; ++i) {
int idx = -1;
const NetAddr &host = i.host();
if (host.isDomainName()) {
cerr << here << "server address must be an IP, got: " <<
host << endl << xexit;
}
if (TheHostMap->find(host, idx)) {
cerr << here << "server address " << host <<
" repeated/use()d twice!" << endl << xexit;
}
HostCfg *hostCfg = TheHostMap->addAt(idx, host);
ContentSel *sel = theContentSels[i.cfgIdx()];
Assert(sel);
hostCfg->theContent = sel;
// add host addresses by default
if (!TheAddrMap->findAddr(host))
TheAddrMap->add(host);
}
// assign SSL wraps to hosts, including proxies
for (i.includeProxySides(true), i.restart(); i; ++i) {
int idx = -1;
const NetAddr &host = i.host();
HostCfg *hostCfg = 0;
if (TheHostMap->find(host, idx)) {
hostCfg = TheHostMap->at(idx);
} else {
hostCfg = TheHostMap->addAt(idx, host);
}
if (hostCfg->theSslWrap) {
cerr << i.agentSym()->loc() << "[proxy] server address " <<
host << " repeated or use()d twice" << endl << xexit;
}
// XXX: very inefficient, should drag servers' TheSharedCfgs here??
AgentCfg agentCfg;
agentCfg.configure((const ServerSym*)i.agentSym());
const SslWrap *wrap = 0;
if (agentCfg.selectSslWrap(wrap)) {
hostCfg->theSslWrap = wrap;
Comment(3) << "fyi: expecting SSL proxy or server at " <<
host << endc;
}
}
// add all missing visible names; needed to get to public worlds
for (int viserv = 0; viserv < TheAddrMap->nameCount(); ++viserv) {
const NetAddr &viname = TheAddrMap->nameAt(viserv);
int idx = -1;
if (!TheHostMap->find(viname, idx))
TheHostMap->addAt(idx, viname);
// we do not create public world by default to save RAM if
// it happens to be not-needed
}
}
// get a list of all available interface addresses
void PolyApp::getIfaces() {
// manually configured [faked] addresses take priority
if (TheOpts.theFakeHosts) {
getFakeIfaces();
return;
}
Array<InetIfReq> addrs;
if (!::GetIfAddrs(addrs, String()))
cerr << ThePrgName << ": cannot get a list of all available network interfaces: " << Error::Last() << endl << xexit;
theIfaces.stretch(addrs.count());
for (int i = 0; i < addrs.count(); ++i)
theIfaces.append(addrs[i].addrN());
}
void PolyApp::getFakeIfaces() {
// note each list item may be an IP range
for (int i = 0; i < TheOpts.theFakeHosts.val().count(); ++i) {
const String &item = *TheOpts.theFakeHosts.val()[i];
PglNetAddrRange range;
if (!range.parse(item))
cerr << ThePrgName << ": cannot convert fake host `" << item
<< "' to an IP address or range" << endl << xexit;
Array<NetAddr*> addrs;
range.toAddrs(addrs);
for (int a = 0; a < addrs.count(); ++a)
theIfaces.append(addrs[a]->addrN());
while (addrs.count()) delete addrs.pop();
}
}
// creates IP addresses for agents to bind to
void PolyApp::makeAddresses() {
Array<NetAddr*> hosts;
Array<NetAddrSym*> agents;
getHostAddrs(hosts);
getAgentAddrs(agents);
ostringstream err;
makeAddresses(hosts, agents, err);
if (err.tellp()) {
err << ends;
Comment(5) << "fyi: " << err.str()
<< "; will not attempt to create agent addresses" << endc;
streamFreeze(err, false);
}
// cleanup
while (hosts.count()) delete hosts.pop();
while (agents.count()) delete agents.pop();
theIfaces.reset();
while (theCleanIfaces.count()) delete theCleanIfaces.pop();
}
ostream &PolyApp::makeAddresses(Array<NetAddr*> &hosts, AddrSyms &agents, ostream &err) {
if (!PglStaticSemx::TheBench)
return err << "no bench selected with use()";
if (!hosts.count())
return err << "no real host addresses for " << theAgentType << " side specified";
if (!agents.count())
return err << "no " << theAgentType << " agents found";
const int agentsPerHost = agents.count() / hosts.count();
if (agentsPerHost * hosts.count() != agents.count())
return err
<< "the number of virtual agent addresses (" << agents.count()
<< ") is not divisible by the number of "
<< "real host addresess (" << hosts.count() << ')';
// check that all addresses have ifnames and subnets
for (int a = 0; a < agents.count(); ++a) {
const NetAddrSym *as = agents[a];
if (!as->ifName()) {
as->print(err << "no interface name for ", "") << " address";
return err << " of the " << theAgentType << " agent";
}
int subnet;
if (!as->subnet(subnet)) {
as->print(err << "no subnet for ", "") << " address";
return err << " of the " << theAgentType << " agent";
}
}
getIfaces();
// create aliases for each local host address
int createCount = 0;
for (int h = 0; h < hosts.count(); ++h) {
const NetAddr &host = *hosts[h];
// check if host address is local
bool found = false;
for (int l = 0; !found && l < theIfaces.count(); ++l)
found = theIfaces[l] == host.addrN();
if (found)
createCount += makeAddresses(h, agents, agentsPerHost);
}
if (!createCount) {
cerr << ThePrgName << ": no specified host addresses match local addresses" << endl;
cerr << "host addresses:";
for (int h = 0; h < hosts.count(); ++h)
cerr << ' ' << *hosts[h];
cerr << endl;
cerr << "local addresses: ";
for (int l = 0; l < theIfaces.count(); ++l)
cerr << ' ' << NetAddr(theIfaces[l], -1);
cerr << endl;
cerr << xexit;
}
Comment(6) << "fyi: created " << createCount << " agent addresses total" << endc;
return err;
}
int PolyApp::makeAddresses(int hidx, AddrSyms &agents, int agentsPerHost) {
int aliasCount = 0;
for (int i = hidx*agentsPerHost; i < agents.count() && aliasCount < agentsPerHost; ++i, ++aliasCount) {
int subnet;
Assert(agents[i]->subnet(subnet));
const String &ifName = agents[i]->ifName();
Assert(ifName);
deleteAddresses(ifName);
const NetAddr &addr = agents[i]->val();
const InAddress netmask = InAddress::NetMask(subnet);
NetIface iface;
iface.name(ifName);
if (!Should(iface.addAlias(addr.addrN(), aliasCount, netmask))) {
agents[i]->print(cerr << "error: " << ifName
<< ": failed to create new alias (", "")
<< ')' << endl << xexit;
}
}
Assert(aliasCount == agentsPerHost);
return aliasCount;
}
// delete old aliases if needed
void PolyApp::deleteAddresses(const String &ifname) {
if (!TheOpts.deleteOldAliases)
return;
// check if we have done that already
for (int i = 0; i < theCleanIfaces.count(); ++i) {
if (*theCleanIfaces[i] == ifname)
return;
}
NetIface iface;
iface.name(ifname);
const int delCount = iface.delAliases();
if (Should(delCount >= 0)) {
Comment << "fyi: " << ifname << ": deleted "
<< delCount << " old IP aliases" << endc;
} else {
Comment << "error: " << ifname
<< ": failed to delete old IP aliases" << endc;
}
// remember that we tried to clean this iface
theCleanIfaces.append(new String(ifname));
}
void PolyApp::getHostAddrs(Array<NetAddr*> &hosts) const {
if (PglStaticSemx::TheBench && PglStaticSemx::TheBench->side(sideName()))
PglStaticSemx::TheBench->side(sideName())->hosts(hosts);
}
void PolyApp::getAgentAddrs(AddrSyms &agents) const {
for (AgentArrIter i(PglStaticSemx::TheAgentsToUse, 0, theAgentType); i; ++i)
agents.append(&(NetAddrSym&)i.hostSym()->clone()->cast("addr"));
}
NetAddrSym *PolyApp::getAgentAddrMask() const {
Assert(PglStaticSemx::TheBench);
const String name = sideName();
const BenchSideSym *side = PglStaticSemx::TheBench->side(name);
return side ? side->addrMaskSym() : 0;
}
void PolyApp::makeAgents() {
// PglCtx::RootCtx()->report(cout, "");
makeAddresses(); Clock::Update(false);
getIfaces();
// create and configure agents that are assigned to our host
// configuration process may take a while; inform about the progress
const Time cfgReportGap = Time::Sec(5); // how often to report
const Time cfgStart = TheClock;
Time cfgNextReport = cfgStart + cfgReportGap;
for (AgentArrIter ai(PglStaticSemx::TheAgentsToUse, &theIfaces, theAgentType); ai; ++ai) {
makeAgent(*ai.agentSym(), ai.host());
Clock::Update(false);
if (TheClock.time() >= cfgNextReport) {
Comment(5) << "created "
<< setw(6) << theLocals.count() << " agents so far" << endc;
cfgNextReport = TheClock + cfgReportGap; // drift is OK
}
}
Clock::Update(false);
Comment(4) << "created " << theLocals.count() << " agents total" << endc;
if (!theLocals.count()) {
cerr << ThePrgName << ": no " << theAgentType
<< " matches local interface addresses" << endl;
ArraySym agentAddrs("addr");
{for (AgentArrIter i(PglStaticSemx::TheAgentsToUse, 0, theAgentType); i; i.nextSym())
i.agentSym()->addresses(agentAddrs);
}
ArraySym localAddrs("addr");
{for (int i = 0; i < theIfaces.count(); ++i) {
NetAddrSym s;
s.val(NetAddr(theIfaces[i], -1));
localAddrs.add(s);
}}
agentAddrs.print(cerr << theAgentType << " addresses:", "");
cerr << endl;
localAddrs.print(cerr << "local addresses: ", "");
cerr << endl;
cerr << xexit;
}
}
void PolyApp::describeLocals() const {
for (int i = 0; i < theLocals.count(); ++i) {
theLocals[i]->describe(Comment(5) << theAgentType << ' ');
Comment << endc;
}
}
void PolyApp::addAgent(Agent *agent) {
Assert(agent);
theLocals.append(agent);
}
// configure StatPhaseMgr with user specified phases
void PolyApp::buildSchedule() {
if (!PglStaticSemx::TheSchedule.count()) {
cerr << ThePrgName << ": warning: no run phases were specified; generating and using default phase" << endl;
PhaseSym *ps = new PhaseSym();
ps->loc(TokenLoc("-"));
ps->name("dflt");
PglStaticSemx::TheSchedule.append(ps);
}
const StatPhase *prevPh = 0;
for (int i = 0; i < PglStaticSemx::TheSchedule.count(); ++i) {
StatPhase *p = new StatPhase;
p->configure(PglStaticSemx::TheSchedule[i], prevPh);
TheStatPhaseMgr.addPhase(p);
prevPh = p;
}
}
// top level configure routine
void PolyApp::configure() {
Socket::Configure();
configureLogs(2);
ReqHdr::Configure();
RepHdr::Configure();
startListen();
// ignore some signals
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
// set resource limits
const int rlimit = SetMaxFD(GetMaxFD());
if (rlimit > FD_SETSIZE)
cerr << here << "warning: getrlimit(2) system call reports " << rlimit << " file descriptors while FD_SETSIZE #defines only " << FD_SETSIZE << "; using lower value." << endl;
const int fdLimit = Min(FD_SETSIZE, rlimit);
if (fdLimit <= 0)
FatalError(errZeroFDLimit);
TheFileScanner = TheOpts.theFileScanner.val();
TheFileScanner->configure(fdLimit);
TheFileScanner->ticker(this);
// to be safe, cut about 3% from the actual limit
Socket::TheMaxLevel = Max(0, fdLimit - 10 - fdLimit/33);
// honor manually configured limit if it is lower
if (TheOpts.theFDLimit >= 0 && TheOpts.theFDLimit < Socket::TheMaxLevel)
Socket::TheMaxLevel = TheOpts.theFDLimit;
TheOpts.theFDLimit.set(Socket::TheMaxLevel);
configureHosts();
/* statistics */
TheStatCycle = new StatCycle;
TheStatCycle->period(TheOpts.theStatCycleLen);
buildSchedule(); // configures TheStatPhaseMgr
if (PglStaticSemx::TheSmplSchedule.count())
TheStatsSampleMgr.configure(PglStaticSemx::TheSmplSchedule);
// configure notification mechanism (old custom format)
if (TheOpts.theNotifAddr) {
Notifier *n = new Notifier(TheOpts.theRunLabel, TheOpts.theNotifAddr);
if (n->active())
TheStatCycle->notifier(n);
else
delete n;
}
// configure notification mechanism (beep)
if (TheOpts.theBeepDoormanListAt || TheOpts.theBeepDoormanSendTo) {
theBeepDoorman = new BeepDoorman;
theBeepDoorman->configure(TheOpts.theBeepDoormanListAt, TheOpts.theBeepDoormanSendTo);
}
atexit(&ShutdownAtExit);
Should(xset_new_handler(&ShutdownAtNew));
}
void PolyApp::getOpts(Array<OptGrp*> &opts) {
opts.append(&TheOpts);
}
void PolyApp::parseConfigFile(const String &fname) {
Assert(fname);
TheOpts.theCfgDirs.copy(PglPp::TheDirs);
PglPp pp(fname);
PglParser parser(&pp);
if (const SynSym *s = parser.parse()) {
PglStaticSemx semx;
semx.interpret(*s);
} else {
cerr << here << "internal error: failed to interpret parsed " <<
fname << endl << xexit;
}
// save PGL configuration to log it later
thePglCfg = pp.image();
}
void PolyApp::reportRUsage() const {
ResourceUsage ru = ResourceUsage::Current();
Comment(5) << "resource usage: " << endl;
ru.report(Comment, "\t");
Comment << endc;
}
void PolyApp::reportCfg() {
theCmdLine.reportRaw(Comment(1)); Comment << endc;
theCmdLine.reportParsed(Comment(2)); Comment << endc;
// report server configs
Comment(7) << "Server content distributions:" << endl;
for (int i = 0; i < theContentSels.count(); ++i) {
if (theContentSels[i])
theContentSels[i]->reportCfg(Comment);
}
Comment << endc;
TheStatPhaseMgr.reportCfg(Comment(2) << "Phases:" << endl);
Comment << endc;
TheStatsSampleMgr.reportCfg(Comment(2) << "StatsSamples:" << endl);
Comment << endc;
Comment(2) << "FDs: "
<< GetCurFD() << " out of " << GetMaxFD()
<< " FDs can be used; safeguard limit: " << Socket::TheMaxLevel
<< endc;
reportRUsage();
Comment(1) << "group-id: " << TheGroupId << " pid: " << getpid() << endc;
TheOLog << bege(lgGroupId, lgcAll) << TheGroupId << ende;
Clock::Update(false);
Comment << "current time: " << TheClock.time() << " or ";
HttpDatePrint(Comment) << endc;
}
void PolyApp::logCfg() {
// PGL configuration
TheOLog << bege(lgPglCfg, lgcAll) << thePglCfg << ende;
Comment(5) << "fyi: PGL configuration stored (" << thePglCfg.len() << "bytes)" << endc;
TheOLog << bege(lgContTypeKinds, lgcAll);
ContTypeStat::Store(TheOLog);
TheOLog << ende;
}
void PolyApp::flushState() {
theStateCount++;
logState(TheOLog);
Comment(5) << "fyi: current state (" << theStateCount << ") stored" << endc;
}
// except for random seeds
void PolyApp::loadPersistence() {
ThePersistWorkSetMgr.loadPubWorlds();
// load per-agent data like private worlds
if (IBStream *is = ThePersistWorkSetMgr.loadSideState()) {
const int storedCount = is->geti();
ThePersistWorkSetMgr.checkInput();
if (storedCount != theLocals.count()) {
Comment << "warning: persistent working set stored in " <<
TheOpts.doLoadWorkSet << " has information about " <<
storedCount << ' ' << theAgentType << " agent(s) while " <<
"the current test has " << theLocals.count() << " agent(s)" <<
endc;
}
for (int i = 0; i < theLocals.count() && i < storedCount; ++i) {
if (i < storedCount)
theLocals[i]->loadWorkingSet(*is);
else
theLocals[i]->missWorkingSet();
}
ThePersistWorkSetMgr.checkInput();
Comment << "fyi: working set loaded from " <<
is->name() << "; id: " << ThePersistWorkSetMgr.id() <<
", version: " << ThePersistWorkSetMgr.version() << endc;
}
}
// except for random seeds
void PolyApp::storePersistence() {
ThePersistWorkSetMgr.storePubWorlds();
// store per-agent data like private worlds
if (OBStream *os = ThePersistWorkSetMgr.storeSideState()) {
*os << theLocals.count();
for (int i = 0; i < theLocals.count(); ++i)
theLocals[i]->storeWorkingSet(*os);
ThePersistWorkSetMgr.checkOutput();
Comment << "fyi: working set stored in " <<
os->name() << "; id: " << ThePersistWorkSetMgr.id() <<
", version: " << ThePersistWorkSetMgr.version() << endc;
}
}
void PolyApp::logState(OLog &log) {
TheOLog << bege(lgAppState, lgcAll) << theStateCount << ende;
Broadcast(TheLogStateChannel, &log);
}
void PolyApp::startServices() {
TheStatPhaseMgr.start();
TheStatCycle->start();
TheStatsSampleMgr.start();
if (theBeepDoorman)
theBeepDoorman->start();
}
void PolyApp::startAgents() {
Comment(1) << "starting " << theLocals.count() << " HTTP agents..." << endc;
for (int i = 0; i < theLocals.count(); ++i)
theLocals[i]->start();
}
int PolyApp::run(int argc, char *argv[]) {
ThePrgName = argv[0];
Clock::Update();
if (!handleCmdLine(argc, argv))
return -1;
// must preceed configureRnd() call
ThePersistWorkSetMgr.openInput(TheOpts.doLoadWorkSet);
configureRnd(); // must preceed parsing
parseConfigFile(TheOpts.theCfgFileName);
Clock::Update();
configure(); Clock::Update(false);
reportCfg(); Clock::Update(false);
logCfg(); Clock::Update(false);
makeAgents(); Clock::Update(false);
loadPersistence();
ThePersistWorkSetMgr.closeInput();
describeLocals(); // after persistence (hence, agent IDs) is loaded
// free some memory
PglStaticSemx::Destroy();
thePglCfg = 0;
flushState();
// flush logs headers
TheOLog.flush();
TheSmplOLog->flush();
// quit nicely on some signals
// note: install this handler after configuration is done
// so a program can be killed if configure takes forever
signal(SIGINT, (SignalHandler*)&ShutdownSignal);
// other signal handlers
signal(SIGUSR1, (SignalHandler*)&DebugSignal);
#if FIND_MEMORY_LEAKS
signal(SIGUSR2, (SignalHandler*)&DebugMemSignal);
#endif
Clock::Update();
startServices();
Clock::Update();
startAgents();
Clock::Update();
while (!ShutdownNow()) {
step();
}
reportRUsage();
flushState();
ThePersistWorkSetMgr.openOutput(TheOpts.doStoreWorkSet);
ThePersistWorkSetMgr.storeSeeds();
storePersistence();
ThePersistWorkSetMgr.close();
return 0;
}
#if FIND_MEMORY_LEAKS
#warning FIND_MEMORY_LEAKS code is enabled, USR2 signal starts a sample
#include <stdlib.h>
#include <new>
int MemDebugIgnore = 0; // global
void *TheMainAddress = 0;
int MemDumpCount = 0;
static
void dumpStack(ostream &os) {
#ifdef __GNUC__
void *addr = &dumpStack;
// __builtin_return_address parameter must be a constant
# define dumpOneAddress(level) \
if (addr && addr > TheMainAddress) { \
addr = __builtin_return_address(level); \
cerr << '-' << addr; \
}
dumpOneAddress(1);
dumpOneAddress(2);
dumpOneAddress(3);
dumpOneAddress(4);
dumpOneAddress(5);
dumpOneAddress(6);
dumpOneAddress(7);
dumpOneAddress(8);
dumpOneAddress(9);
dumpOneAddress(10);
dumpOneAddress(11);
dumpOneAddress(12);
dumpOneAddress(13);
dumpOneAddress(14);
dumpOneAddress(15);
dumpOneAddress(16);
dumpOneAddress(17);
dumpOneAddress(18);
dumpOneAddress(19);
dumpOneAddress(20);
dumpOneAddress(21);
dumpOneAddress(22);
dumpOneAddress(23);
dumpOneAddress(24);
dumpOneAddress(25); // arbitrary limit
#endif
}
static
void countNew(size_t size, void *data, const char *kind) {
static unsigned long calls = 0;
calls++;
if (MemDumpCount <= 0)
return;
/*if (size == 68 && *kind == 'i') // RndBodyIter
return;
if (size == 4 && *kind == 'i')
return;
if (size == 100 && *kind == 'a')
return;
if (size == 608 && *kind == 'i') // SrvXact
return;
*/
if (MemDumpCount > 0) {
if (size == 68 && *kind == 'i') {
clog << here << "#new: " << calls << " sz: " << size << " ptr: " << data << ' ' << kind <<
" at " << MemDumpCount << " trace:";
dumpStack(clog);
clog << endl;
}
MemDumpCount--;
//if (!MemDebugIgnore && size == 20 && *kind == 'a')
// abort();
}
}
static
void countFree(void *data, const char *kind) {
static unsigned long calls = 0;
calls++;
if (MemDumpCount <= 0)
return;
clog << here << "#free: " << calls << " sz: ? ptr: " << data << ' ' << kind << " at " << MemDumpCount << endl;
}
void *operator new(size_t size) throw (std::bad_alloc) {
void *data = malloc(size);
countNew(size, data, "item");
return data;
}
void *operator new[](size_t size) throw (std::bad_alloc) {
void *data = malloc(size);
countNew(size, data, "arr");
return data;
}
void operator delete(void *data) throw() {
countFree(data, "item");
free(data);
}
void operator delete[](void *data) throw() {
countFree(data, "arr");
free(data);
}
static
void DebugMemSignal(int s) {
clog << "got debug signal (" << s << ')' << endl;
MemDumpCount = 25000;
}
#endif
#if 0 /* main() template */
int main(int argc, char *argv[]) {
// extern void *TheMainAddress;
// TheMainAddress = &main;
XxxApp app;
return app.run(argc, argv);
}
#endif
syntax highlighted by Code2HTML, v. 0.9.1