/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "xstd/LoadableModule.h" #include "xstd/gadgets.h" #include "xstd/rndDistrs.h" #include "base/OLog.h" #include "base/polyLogCats.h" #include "runtime/LogComment.h" #include "runtime/SharedOpts.h" #include "runtime/StatIntvl.h" #include "runtime/StatPhase.h" #include "runtime/StatPhaseMgr.h" #include "runtime/Rptmstat.h" #include "runtime/DutWatchdog.h" #include "runtime/ErrorMgr.h" #include "runtime/PersistWorkSetMgr.h" #include "runtime/polyBcastChannels.h" #include "runtime/polyErrors.h" #include "runtime/globals.h" #include "pgl/PglStaticSemx.h" #include "pgl/RobotSym.h" #include "pgl/BenchSym.h" #include "pgl/BenchSideSym.h" #include "app/BeepDoorman.h" #include "client/AsyncClt.h" #include "client/SyncClt.h" #include "client/PassClt.h" #include "client/ServerRep.h" #include "client/IcpCltXact.h" #include "client/CltOpts.h" #include "client/CltXact.h" #include "client/CltConnMgr.h" #include "client/SessionWatchRegistry.h" #include "client/CltDataFilterRegistry.h" #include "client/wssFreezers.h" #include "client/PolyClt.h" typedef XactFarmT CltXactFarm; PolyClt::PolyClt() { theAgentType = "Robot"; theChannels.append(TheSessionBegChannel); theChannels.append(TheSessionCntChannel); theChannels.append(TheSessionEndChannel); theChannels.append(TheInfoChannel); } PolyClt::~PolyClt() { TheSessionWatchRegistry().clear(); TheCltDataFilterRegistry().clear(); while (theModules.count()) delete theModules.pop(); // will unload } Agent *PolyClt::makeAgent(const AgentSym &agent, const NetAddr &address) { const RobotSym *rcfg = &(const RobotSym&)agent.cast("Robot"); // cerr << here << "address=" << address << endl; Client *client = 0; RndDistr *iad = 0; if (rcfg->reqInterArrival(iad)) { if (iad) client = new AsyncClt(iad); else client = new PassClt(); } else { client = new SyncClt(); } Assert(client); client->configure(rcfg, address); addAgent(client); return client; } void PolyClt::configure() { PolyApp::configure(); loadModules(TheCltOpts.theLoadableModules.val()); TheWssFreezer = 0; if (PglStaticSemx::TheWorkSetLen >= 0 && PglStaticSemx::TheWorkSetCap >= 0) { cerr << ThePrgName << ": canot handle more than one WSS freezing conditions" << endl; exit(-3); } if (PglStaticSemx::TheWorkSetLen >= 0) TheWssFreezer = new TimeWssFreezer(PglStaticSemx::TheWorkSetLen); if (PglStaticSemx::TheWorkSetCap >= 0) TheWssFreezer = new FillWssFreezer(PglStaticSemx::TheWorkSetCap); Rptmstat::IsEnabled = true; // only clinet-side cares about load DutWatchdog::IsEnabled = true; // only clinet-side cares about load Client::Farm(new CltXactFarm); IcpCltXact::TheTimeout = TheCltOpts.theIcpTout; StatIntvl::ActiveCat(lgcCltSide); StatIntvl::TheReportCat = lgcCltSide; } void PolyClt::getOpts(Array &opts) { PolyApp::getOpts(opts); opts.append(&TheCltOpts); } void PolyClt::reportCfg() { PolyApp::reportCfg(); TheSessionWatchRegistry().report(Comment); Comment << endc; TheCltDataFilterRegistry().report(Comment); Comment << endc; } void PolyClt::loadPersistence() { PolyApp::loadPersistence(); if (ThePersistWorkSetMgr.loadSideState()) Client::ReportWss(1); } void PolyClt::loadModules(const Array &names) { for (int i = 0; i < names.count(); ++i) { const String &name = *names[i]; LoadableModule *module = new LoadableModule(name); Comment(7) << "loading dynamic module: " << name << endc; if (!module->load()) { Comment << "error loading dynamic module '" << name << "': " << module->error() << endc; FatalError(errOther); } theModules.append(module); } Comment << "dynamic modules loaded: " << theModules.count() << endc; } void PolyClt::logState(OLog &log) { PolyApp::logState(log); Client::LogState(log); } void PolyClt::startAgents() { Comment(1) << "fyi: max local population size: " << theLocals.count() << " robots" << endc; // do not start robots here; robots are started using population factor theAvailClts.stretch(theLocals.count()); for (int i = 0; i < theLocals.count(); ++i) theAvailClts.append((Client*)theLocals[i]); // lock the phase; clients will unlock when ready to hit all servers TheStatPhaseMgr->lock(); } void PolyClt::startClients(int count) { const int wasAvailableCnt = theAvailClts.count(); while (theAvailClts.count() && count-- > 0) { Client *clt = flipCltState(theAvailClts, thePopulus); clt->start(); } if (count > 0 && ReportError(errUnderpopulated)) { Comment << "cannot increase population size;" << " current level: " << thePopulus.count() << " robots available: " << theAvailClts.count() << " debt: " << count << endc; } else if (wasAvailableCnt && !theAvailClts.count()) { Comment(3) << "fyi: reached max local population size: " << thePopulus.count() << " robots" << endc; } } void PolyClt::stopClients(int count) { while (thePopulus.count() && count-- > 0) { Client *clt = flipCltState(thePopulus, theAvailClts); clt->stop(); } // if count > 0, then there were rounding errors in count calculations? if (thePopulus.count() <= 1) { Comment(3) << "fyi: reached min local population size: " << thePopulus.count() << " robots" << endc; } } Client *PolyClt::flipCltState(Clients &from, Clients &to) { Assert(from.count()); static RndGen rng; const int idx = rng(0, from.count()); Client *clt = from[idx]; from.eject(idx); to.append(clt); return clt; } void PolyClt::startServices() { PolyApp::startServices(); if (TheWssFreezer) TheWssFreezer->start(); } void PolyClt::step() { /* check active populus levels */ const double pf = TheStatPhaseMgr->populusFactor().current(); Assert(pf >= 0); const int goal = pf > 0 ? // leave at least one client alive if pf > 0 Max(1, (int)rint(pf*theLocals.count())) : 0; const int diff = goal - thePopulus.count(); if (diff < 0) stopClients(-diff); else if (diff > 0) startClients(diff); PolyApp::step(); } const String PolyClt::sideName() const { return "client"; } int PolyClt::logCat() const { return lgcCltSide; } void PolyClt::noteClientEvent(BcastChannel *ch, const Client *c) { Assert(c); String status; if (ch == TheSessionBegChannel) { status = "begin"; } else if (ch == TheSessionCntChannel) { status = "continue"; } else if (ch == TheSessionEndChannel) { status = "end"; } else { Assert(false); } if (theBeepDoorman) { ostringstream buf; buf << "" << ends; theBeepDoorman->bcastMsg(buf.str().c_str()); streamFreeze(buf, false); } } void PolyClt::noteInfoEvent(BcastChannel *ch, InfoEvent ev) { Assert(ch == TheInfoChannel); if (ev == ieReportProgress) Client::ReportWss(7); } int main(int argc, char *argv[]) { PolyClt app; return app.run(argc, argv); }