/* Web Polygraph http://www.web-polygraph.org/
* (C) 2003-2006 The Measurement Factory
* Licensed under the Apache License, Version 2.0 */
#include "base/polygraph.h"
#include "xstd/LoadableModule.h"
#include "xstd/gadgets.h"
#include "xstd/rndDistrs.h"
#include "base/OLog.h"
#include "base/polyLogCats.h"
#include "runtime/LogComment.h"
#include "runtime/SharedOpts.h"
#include "runtime/StatIntvl.h"
#include "runtime/StatPhase.h"
#include "runtime/StatPhaseMgr.h"
#include "runtime/Rptmstat.h"
#include "runtime/DutWatchdog.h"
#include "runtime/ErrorMgr.h"
#include "runtime/PersistWorkSetMgr.h"
#include "runtime/polyBcastChannels.h"
#include "runtime/polyErrors.h"
#include "runtime/globals.h"
#include "pgl/PglStaticSemx.h"
#include "pgl/RobotSym.h"
#include "pgl/BenchSym.h"
#include "pgl/BenchSideSym.h"
#include "app/BeepDoorman.h"
#include "client/AsyncClt.h"
#include "client/SyncClt.h"
#include "client/PassClt.h"
#include "client/ServerRep.h"
#include "client/IcpCltXact.h"
#include "client/CltOpts.h"
#include "client/CltXact.h"
#include "client/CltConnMgr.h"
#include "client/SessionWatchRegistry.h"
#include "client/CltDataFilterRegistry.h"
#include "client/wssFreezers.h"
#include "client/PolyClt.h"
typedef XactFarmT<CltXact, CltXact> CltXactFarm;
PolyClt::PolyClt() {
theAgentType = "Robot";
theChannels.append(TheSessionBegChannel);
theChannels.append(TheSessionCntChannel);
theChannels.append(TheSessionEndChannel);
theChannels.append(TheInfoChannel);
}
PolyClt::~PolyClt() {
TheSessionWatchRegistry().clear();
TheCltDataFilterRegistry().clear();
while (theModules.count())
delete theModules.pop(); // will unload
}
Agent *PolyClt::makeAgent(const AgentSym &agent, const NetAddr &address) {
const RobotSym *rcfg = &(const RobotSym&)agent.cast("Robot");
// cerr << here << "address=" << address << endl;
Client *client = 0;
RndDistr *iad = 0;
if (rcfg->reqInterArrival(iad)) {
if (iad)
client = new AsyncClt(iad);
else
client = new PassClt();
} else {
client = new SyncClt();
}
Assert(client);
client->configure(rcfg, address);
addAgent(client);
return client;
}
void PolyClt::configure() {
PolyApp::configure();
loadModules(TheCltOpts.theLoadableModules.val());
TheWssFreezer = 0;
if (PglStaticSemx::TheWorkSetLen >= 0 && PglStaticSemx::TheWorkSetCap >= 0) {
cerr << ThePrgName << ": canot handle more than one WSS freezing conditions" << endl;
exit(-3);
}
if (PglStaticSemx::TheWorkSetLen >= 0)
TheWssFreezer = new TimeWssFreezer(PglStaticSemx::TheWorkSetLen);
if (PglStaticSemx::TheWorkSetCap >= 0)
TheWssFreezer = new FillWssFreezer(PglStaticSemx::TheWorkSetCap);
Rptmstat::IsEnabled = true; // only clinet-side cares about load
DutWatchdog::IsEnabled = true; // only clinet-side cares about load
Client::Farm(new CltXactFarm);
IcpCltXact::TheTimeout = TheCltOpts.theIcpTout;
StatIntvl::ActiveCat(lgcCltSide);
StatIntvl::TheReportCat = lgcCltSide;
}
void PolyClt::getOpts(Array<OptGrp*> &opts) {
PolyApp::getOpts(opts);
opts.append(&TheCltOpts);
}
void PolyClt::reportCfg() {
PolyApp::reportCfg();
TheSessionWatchRegistry().report(Comment);
Comment << endc;
TheCltDataFilterRegistry().report(Comment);
Comment << endc;
}
void PolyClt::loadPersistence() {
PolyApp::loadPersistence();
if (ThePersistWorkSetMgr.loadSideState())
Client::ReportWss(1);
}
void PolyClt::loadModules(const Array<String*> &names) {
for (int i = 0; i < names.count(); ++i) {
const String &name = *names[i];
LoadableModule *module = new LoadableModule(name);
Comment(7) << "loading dynamic module: " << name << endc;
if (!module->load()) {
Comment << "error loading dynamic module '" << name << "': "
<< module->error() << endc;
FatalError(errOther);
}
theModules.append(module);
}
Comment << "dynamic modules loaded: " << theModules.count() << endc;
}
void PolyClt::logState(OLog &log) {
PolyApp::logState(log);
Client::LogState(log);
}
void PolyClt::startAgents() {
Comment(1) << "fyi: max local population size: " << theLocals.count() << " robots" << endc;
// do not start robots here; robots are started using population factor
theAvailClts.stretch(theLocals.count());
for (int i = 0; i < theLocals.count(); ++i)
theAvailClts.append((Client*)theLocals[i]);
// lock the phase; clients will unlock when ready to hit all servers
TheStatPhaseMgr->lock();
}
void PolyClt::startClients(int count) {
const int wasAvailableCnt = theAvailClts.count();
while (theAvailClts.count() && count-- > 0) {
Client *clt = flipCltState(theAvailClts, thePopulus);
clt->start();
}
if (count > 0 && ReportError(errUnderpopulated)) {
Comment << "cannot increase population size;"
<< " current level: " << thePopulus.count()
<< " robots available: " << theAvailClts.count()
<< " debt: " << count << endc;
} else
if (wasAvailableCnt && !theAvailClts.count()) {
Comment(3) << "fyi: reached max local population size: "
<< thePopulus.count() << " robots" << endc;
}
}
void PolyClt::stopClients(int count) {
while (thePopulus.count() && count-- > 0) {
Client *clt = flipCltState(thePopulus, theAvailClts);
clt->stop();
}
// if count > 0, then there were rounding errors in count calculations?
if (thePopulus.count() <= 1) {
Comment(3) << "fyi: reached min local population size: "
<< thePopulus.count() << " robots" << endc;
}
}
Client *PolyClt::flipCltState(Clients &from, Clients &to) {
Assert(from.count());
static RndGen rng;
const int idx = rng(0, from.count());
Client *clt = from[idx];
from.eject(idx);
to.append(clt);
return clt;
}
void PolyClt::startServices() {
PolyApp::startServices();
if (TheWssFreezer)
TheWssFreezer->start();
}
void PolyClt::step() {
/* check active populus levels */
const double pf = TheStatPhaseMgr->populusFactor().current();
Assert(pf >= 0);
const int goal = pf > 0 ? // leave at least one client alive if pf > 0
Max(1, (int)rint(pf*theLocals.count())) : 0;
const int diff = goal - thePopulus.count();
if (diff < 0)
stopClients(-diff);
else
if (diff > 0)
startClients(diff);
PolyApp::step();
}
const String PolyClt::sideName() const {
return "client";
}
int PolyClt::logCat() const {
return lgcCltSide;
}
void PolyClt::noteClientEvent(BcastChannel *ch, const Client *c) {
Assert(c);
String status;
if (ch == TheSessionBegChannel) {
status = "begin";
} else
if (ch == TheSessionCntChannel) {
status = "continue";
} else
if (ch == TheSessionEndChannel) {
status = "end";
} else {
Assert(false);
}
if (theBeepDoorman) {
ostringstream buf;
buf
<< "<session user='" << c->credentials() << "' "
<< "address='" << c->host() << "' "
<< "status='" << status << "' />"
<< ends;
theBeepDoorman->bcastMsg(buf.str().c_str());
streamFreeze(buf, false);
}
}
void PolyClt::noteInfoEvent(BcastChannel *ch, InfoEvent ev) {
Assert(ch == TheInfoChannel);
if (ev == ieReportProgress)
Client::ReportWss(7);
}
int main(int argc, char *argv[]) {
PolyClt app;
return app.run(argc, argv);
}
syntax highlighted by Code2HTML, v. 0.9.1