/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "xstd/h/iostream.h" #include "xstd/h/sstream.h" #include "xstd/h/iomanip.h" #include "base/polyVersion.h" // do not try to compile polymon if curses are not supported #if defined(HAVE_CONFIG_H) && (!defined(HAVE_NCURSES_H) || !defined(HAVE_LIBNCURSES)) int main() { cerr << "polymon could not be built without " << "a properly installed ncurses library" << endl; return -1; } #else #include "xstd/h/math.h" #include "xstd/h/os_std.h" #include "xstd/h/string.h" #include "xstd/h/signal.h" #include "xstd/h/netinet.h" #include #if defined(sun) #define bool BOOL #endif #include // cleanup after ncurses.h #if defined(bool) #undef bool #endif #if defined(clear) #undef clear #endif #if defined(timeout) #undef timeout #endif #include "xstd/Assert.h" #include "xstd/Time.h" #include "xstd/BitMask.h" #include "xstd/Socket.h" #include "xstd/Poll.h" #include "xstd/AlarmClock.h" #include "xstd/History.h" #include "base/AggrStat.h" #include "base/polyLogCats.h" #include "runtime/NotifMsg.h" // takes care of the keyboard input class KbdMonitor: public FileScanUser { public: KbdMonitor(int aFD); virtual ~KbdMonitor(); virtual void noteReadReady(int fd); public: int theFD; FileScanReserv theReserv; }; // handles incoming messages class MsgMonitor: public FileScanUser { public: MsgMonitor(Socket aSock); virtual ~MsgMonitor(); virtual void noteReadReady(int fd); public: Socket theSock; FileScanReserv theReserv; char theBuf[sizeof(StatusFwdMsg)*64]; int theSize; }; // handles periodic operations class Ticker: public AlarmUser { public: Ticker(Time aPeriod); virtual ~Ticker(); virtual void wakeUp(const Alarm &alarm); public: Time thePeriod; }; class HostFilter; // maintains info about a host class Host { public: typedef History Log; public: Host(const NetAddr &anAddr); const NetAddr &addr() const { return theAddr; } // XXX pre-IPv6 code returned 's_addr' as an int int id() const { return theAddr.addrN().octet(0); } int logCat() const; // XXX pre-IPv6 code returned 's_addr' as an int int lna() const { return theAddr.addrN().octet(0); } const Log &log() const { return theLog; } const char *runLabel() const; bool busy() const; bool selected() const { return isSelected; } bool isClient() const { return logCat() == lgcCltSide; } bool isServer() const { return logCat() == lgcSrvSide; } void noteMsg(const StatusFwdMsg &msg); // { theLog.insert(msg); } void selected(bool be) { isSelected = be; } bool matches(const HostFilter &sel) const; protected: NetAddr theAddr; // unique host address Log theLog; // notification messasge history bool isSelected; }; // used to select groups of hosts struct HostFilter { struct { int pos; } lbl; // experiment label based selection struct { int pos; } logCat; // category based selection (aka "side") HostFilter() { memset(this, 0, sizeof(*this)); } }; // used in some "matrix" windows to remember screen content class WinMatrix { public: WinMatrix(int aMaxY, int aMaxX); double operator ()(int y, int x) const { return theImage[safePos(y,x)]; } double &operator ()(int y, int x) { return theImage[safePos(y,x)]; } protected: int safePos(int y, int x) const; protected: Array theImage; int theMaxY; int theMaxX; }; // WINDOW wrapper with a title and event handlers class Window: public WINDOW { public: typedef void (Window::*EventHandler)(const Host &); public: Window(const char *aTitle); virtual ~Window(); virtual void noteAdd(const Host &) {} virtual void noteDel(const Host &) {} virtual void noteUpd(const Host &) {} public: const char *theTitle; }; // general monitor information class InfoWindow: public Window { public: InfoWindow(const char *aTitle); virtual void noteAdd(const Host &); virtual void noteDel(const Host &); virtual void noteUpd(const Host &); protected: void update(); protected: int theMsgCnt; // msgs seen so far int theListnCnt; // derived from CopyCnt in the fwded msg }; // summary information about SmxWindows class MsgSum; class SumWindow: public Window { public: SumWindow(const char *aTitle); virtual void noteAdd(const Host &); virtual void noteDel(const Host &); virtual void noteUpd(const Host &); protected: void update(); void displaySide(int &y, int x, const MsgSum &sum); void displayLine(int y, int x, const char *label, const AggrStat &stats, const char *meas, double scale = 1); }; // single measurement matrix window class SmxWindow: public Window { public: SmxWindow(const char *aTitle); virtual void noteAdd(const Host &); virtual void noteDel(const Host &); virtual void noteUpd(const Host &); protected: virtual void displayMsg(int y, int x, const StatusFwdMsg &msg) = 0; virtual void eraseSlot(int y, int x); protected: static int Host2X(const Host &host); static int Host2Y(const Host &host); static int Id2X(int id); static int Id2Y(int id); }; // number matrix window class NumxWindow: public SmxWindow { public: NumxWindow(const char *aTitle, const char *aFmt); virtual void noteAdd(const Host &); virtual void noteDel(const Host &); virtual void noteUpd(const Host &); protected: virtual void displayMsg(int y, int x, const StatusFwdMsg &msg); virtual void eraseSlot(int y, int x); virtual double msg2num(const StatusFwdMsg &msg) = 0; void update(); protected: const char *theFmt; // format to use for output WinMatrix theMatrix; // currently displayed values double theNumSum; // accumulator to compute averages and sums int theNumCnt; // number of hosts contributed to sum }; class MsgSum { public: MsgSum(); int hostCount() const { return theReqRate.count(); } MsgSum &operator +=(const StatusFwdMsg &msg); public: String theLabels; AggrStat theReqRate; AggrStat theRepRate; AggrStat theBwidth; AggrStat theRespTime; AggrStat theDHR; AggrStat theConnUse; AggrStat theErrRatio; AggrStat theXactTotCnt; AggrStat theTotErrRatio; AggrStat theSockInstCnt; }; /* smx windows for all attributes */ struct RunLabelSmxWin: public SmxWindow { RunLabelSmxWin(const char *aTitle): SmxWindow(aTitle) {} virtual void displayMsg(int y, int x, const StatusFwdMsg &msg) { mvwprintw(this, y, x, (char*)"%7.6s", msg.theLabel); } }; struct RunTimeSmxWin: public SmxWindow { RunTimeSmxWin(const char *aTitle): SmxWindow(aTitle) {} virtual void displayMsg(int y, int x, const StatusFwdMsg &msg); }; struct ReqRateSmxWin: public NumxWindow { ReqRateSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.1f") {} virtual double msg2num(const StatusFwdMsg &msg) { return (double)msg.theReqRate; } }; struct RepRateSmxWin: public NumxWindow { RepRateSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.1f") {} virtual double msg2num(const StatusFwdMsg &msg) { return (double)msg.theRepRate; } }; struct BwidthSmxWin: public NumxWindow { BwidthSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.1f") {} virtual double msg2num(const StatusFwdMsg &msg) { return (double)msg.theBwidth/(1024.*1024/8); } }; struct RespTimeSmxWin: public NumxWindow { RespTimeSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.0f") {} virtual double msg2num(const StatusFwdMsg &msg) { return msg.theRespTime.msec(); } }; struct DHRSmxWin: public NumxWindow { DHRSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {} virtual double msg2num(const StatusFwdMsg &msg) { return Max(100*(double)msg.theDHR, -1.); } }; struct ConnUseSmxWin: public NumxWindow { ConnUseSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {} virtual double msg2num(const StatusFwdMsg &msg) { return msg.theConnUse; } }; struct ErrRatioSmxWin: public NumxWindow { ErrRatioSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {} virtual double msg2num(const StatusFwdMsg &msg) { return Max(100*(double)msg.theErrRatio, -1.); } }; struct XactTotCntSmxWin: public NumxWindow { XactTotCntSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.0f") {} virtual double msg2num(const StatusFwdMsg &msg) { return msg.theXactTotCnt/1000.; } }; struct ErrTotCntSmxWin: public NumxWindow { ErrTotCntSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {} virtual double msg2num(const StatusFwdMsg &msg) { return msg.theErrTotCnt/1000.; } }; struct ErrTotRatioSmxWin: public NumxWindow { ErrTotRatioSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.2f") {} virtual double msg2num(const StatusFwdMsg &msg) { return Percent(msg.theErrTotCnt, msg.theXactTotCnt); } }; struct SockInstCntSmxWin: public NumxWindow { SockInstCntSmxWin(const char *aTitle): NumxWindow(aTitle, "%7.0f") {} virtual double msg2num(const StatusFwdMsg &msg) { return msg.theSockInstCnt; } }; class MsgGapSmxWin: public SmxWindow { public: MsgGapSmxWin(const char *aTitle): SmxWindow(aTitle) {} virtual void noteUpd(const Host &host); protected: virtual void displayMsg(int y, int x, const StatusFwdMsg &msg); protected: StatusFwdMsg theLastMsg; }; /* globals */ static NetAddr TheDisp; static FileScanner *TheScanner = 0; static bool DoShutdown = false; static Array TheHosts; static Array TheHostIdx; // address -> host map static int TheBusyHostCnt = 0; static Array TheLabels; static int TheUniqLblCnt = 0; static Array TheWins; static int TheWinPos = 0; // current window static Window *TheInfoWin = 0; // special static HostFilter TheFilter; static char TheFiltWarn[80] = ""; static int TheSelHostCnt = 0; static const int TheXMargin = 5; static const int TheYMargin = 4; static const int TheXCellWidth = 7; static const int TheYCellWidth = 2; static void NoteMsg(const StatusFwdMsg &msg); static void Broadcast(Window::EventHandler weh, const Host &host); static void DeleteIdleHosts(); static bool AddFirstLabel(const Host *skip, const String &l); static bool DelLastLabel(const Host *skip, const String &l); static void SelectHost(Host *host); static int SwitchWin(int idx); static int SwitchLblFilter(int idx); static int SwitchCatFilter(int idx); static void BuildFiltWarn(ostream &os); inline int Dbl2Int(double v) { return (int)rint(v); } /* KbdMonitor */ KbdMonitor::KbdMonitor(int aFD): theFD(aFD) { theReserv = TheScanner->setFD(theFD, dirRead, this); } KbdMonitor::~KbdMonitor() { if (theReserv) TheScanner->clearRes(theReserv); } void KbdMonitor::noteReadReady(int) { switch (getch()) { case '0': SwitchWin(0); break; case KEY_LEFT: SwitchWin(TheWinPos-1); break; case KEY_RIGHT: SwitchWin(TheWinPos+1); break; case KEY_DOWN: SwitchLblFilter(TheFilter.lbl.pos - 1); break; case KEY_UP: SwitchLblFilter(TheFilter.lbl.pos + 1); break; case 'S': SwitchCatFilter(TheFilter.logCat.pos - 1); break; case 's': SwitchCatFilter(TheFilter.logCat.pos + 1); break; case 'R': SwitchLblFilter(0); SwitchCatFilter(0); DeleteIdleHosts(); // fall through case 'r': clearok(curscr, true); SwitchWin(TheWinPos); break; case 'q': case 'Q': DoShutdown = true; break; default: return; } redrawwin(TheWins[TheWinPos]); } /* MsgMonitor */ MsgMonitor::MsgMonitor(Socket aSock): theSock(aSock), theSize(0) { theReserv = TheScanner->setFD(theSock.fd(), dirRead, this); } MsgMonitor::~MsgMonitor() { if (theReserv) TheScanner->clearRes(theReserv); if (theSock) theSock.close(); } void MsgMonitor::noteReadReady(int) { static const int msgsz = sizeof(StatusFwdMsg); const int sz = theSock.read(theBuf + theSize, sizeof(theBuf) - theSize); if (sz < 0) { if (const Error err = Error::LastExcept(EWOULDBLOCK)) { cerr << "failed to read from dispatcher at " << TheDisp << ": " << err << endl; DoShutdown = true; } return; } if (sz == 0) { cerr << "dispatcher at " << TheDisp << " quit." << endl; DoShutdown = true; return; } theSize += sz; while (theSize >= msgsz) { StatusFwdMsg msg; memcpy(&msg, theBuf, msgsz); theSize -= msgsz; memmove(theBuf, theBuf + msgsz, theSize); // handle the message msg.ntoh(); msg.theRcvTime = TheClock; NoteMsg(msg); } Assert(theSize >= 0); wrefresh(TheWins[TheWinPos]); } /* Ticker */ Ticker::Ticker(Time aPeriod): thePeriod(aPeriod) { sleepFor(thePeriod); } Ticker::~Ticker() { } void Ticker::wakeUp(const Alarm &alarm) { AlarmUser::wakeUp(alarm); DeleteIdleHosts(); sleepFor(thePeriod); } /* Host */ Host::Host(const NetAddr &anAddr): theAddr(anAddr), theLog(2), isSelected(false) { } int Host::logCat() const { return theLog.depth() ? theLog[0].theCat : lgcAll; } const char *Host::runLabel() const { return theLog.depth() ? theLog[0].theLabel : 0; } bool Host::busy() const { return theLog.depth() && theLog[0].theRcvTime > TheClock.time() - Time::Sec(90); } void Host::noteMsg(const StatusFwdMsg &msg) { theLog.insert(msg); } bool Host::matches(const HostFilter &filter) const { if (filter.logCat.pos != lgcAll && filter.logCat.pos != logCat()) return false; if (const int l = filter.lbl.pos) return theLog.depth() && *TheLabels[l] == theLog[0].theLabel; return true; } /* WinMatrix */ WinMatrix::WinMatrix(int aMaxY, int aMaxX): theImage((aMaxY+1) * (aMaxX+1)), theMaxY(aMaxY), theMaxX(aMaxX) { theImage.count(theImage.capacity()); theImage.memset(0); } int WinMatrix::safePos(int y, int x) const { Assert(y <= theMaxY && x <= theMaxX); const int p = y*(theMaxX+1) + x; Assert(p < theImage.count()); return p; } /* Window */ Window::Window(const char *aTitle): WINDOW(*newwin(0,0,0,0)), theTitle(aTitle) { leaveok(this, true); // do not bother about cursor pos int maxx, maxy; getmaxyx(this, maxy, maxx); werase(this); // print window title wattron(this, A_BOLD); wattron(this, A_UNDERLINE); mvwaddstr(this, 0, (maxx - strlen(theTitle))/2, (char*)theTitle); wattroff(this, A_UNDERLINE); wattroff(this, A_BOLD); } Window::~Window() { delwin(this); } /* InfoWindow */ InfoWindow::InfoWindow(char const *aTitle): Window(aTitle), theMsgCnt(0), theListnCnt(0) { update(); } void InfoWindow::update() { const int x = 2; int y = 3; mvwprintw(this, y++, x, (char*)"%20s %s:%d", "dispatcher:", TheDisp.addrA().cstr(), TheDisp.port()); mvwprintw(this, y++, x, (char*)"%20s %7d", "listeners:", theListnCnt); y++; mvwprintw(this, y++, x, (char*)"%20s %7d", "messages received:", theMsgCnt); mvwprintw(this, y++, x, (char*)"%20s %7d", "mean message size:", sizeof(StatusNotifMsg)); mvwprintw(this, y++, x, (char*)"%20s %7d", "selected hosts:", TheSelHostCnt); mvwprintw(this, y++, x, (char*)"%20s %7d", "busy hosts:", TheBusyHostCnt); mvwprintw(this, y++, x, (char*)"%20s %7d", "experiments:", TheUniqLblCnt); } void InfoWindow::noteAdd(const Host &host) { Window::noteAdd(host); update(); } void InfoWindow::noteDel(const Host &host) { Window::noteDel(host); update(); } void InfoWindow::noteUpd(const Host &host) { Window::noteUpd(host); if (host.log().depth()) { theMsgCnt++; theListnCnt = host.log()[0].theCopyCnt; } update(); } /* SumWindow */ SumWindow::SumWindow(char const *aTitle): Window(aTitle) { update(); } void SumWindow::update() { // collect summary info from hosts MsgSum cltSum, srvSum; for (int i = 0; i < TheHosts.count(); ++i) { if (const Host *host = TheHosts[i]) { if (host->log().depth()) { const StatusFwdMsg &msg = host->log()[0]; if (host->isClient()) cltSum += msg; else srvSum += msg; } } } const int x = 2; int y = 2; mvwprintw(this, y++, x, (char*)"%s (%d hosts)", "Client side:", cltSum.hostCount()); displaySide(y, x+2, cltSum); y += 1; mvwprintw(this, y++, x, (char*)"%s (%d hosts)", "Server side:", srvSum.hostCount()); displaySide(y, x+2, srvSum); y += 1; mvwprintw(this, y++, x+2, (char*)"%20s %7s %8s %9s %9s %s", "measurement:", "min", "mean", "max", "sum", "unit"); } void SumWindow::displaySide(int &y, int x, const MsgSum &sum) { //mvwprintw(this, y++, x, (char*)"%20s %s", "labels:", // (const char*)sum.theLabels); displayLine(y++, x, "load:", sum.theReqRate, "req/sec"); displayLine(y++, x, "throughput:", sum.theRepRate, "rep/sec"); //displayLine(y++, x, "bandwidth:", sum.theBwidth, "Mbit/sec", 1024.*1024/8); displayLine(y++, x, "response time:", sum.theRespTime, "msec"); displayLine(y++, x, "DHR:", sum.theDHR, "%"); //displayLine(y++, x, "connection use:", sum.theConnUse, "xact/conn"); displayLine(y++, x, "errors total:", sum.theTotErrRatio, "%"); //displayLine(y++, x, "errors now:", sum.theErrRatio, "%"); displayLine(y++, x, "xactions total:", sum.theXactTotCnt, "x10^6", 1e6); displayLine(y++, x, "open sockets now:", sum.theSockInstCnt, ""); } void SumWindow::displayLine(int y, int x, const char *label, const AggrStat &stats, const char *meas, double scale) { mvwprintw(this, y, x, (char*)"%20s %7d %8d %9d %9d %s", label, Dbl2Int(stats.min()/scale), Dbl2Int(stats.mean()/scale), Dbl2Int(stats.max()/scale), Dbl2Int(stats.sum()/scale), meas); } void SumWindow::noteAdd(const Host &host) { Window::noteAdd(host); update(); } void SumWindow::noteDel(const Host &host) { Window::noteDel(host); update(); } void SumWindow::noteUpd(const Host &host) { Window::noteUpd(host); update(); } /* SmxWindow */ SmxWindow::SmxWindow(char const *aTitle): Window(aTitle) { wattron(this, A_BOLD); // horizontal lables for (int i = 0; i < 10; ++i) mvwprintw(this, 2, Id2X(i), (char*)"%7d", i+1); // vertical labels for (int i = 0; i < 100; i += 10) mvwprintw(this, Id2Y(i), 0, (char*)"%2d", i); wattroff(this, A_BOLD); } void SmxWindow::noteAdd(const Host &host) { Window::noteAdd(host); displayMsg(Host2Y(host), Host2X(host), host.log()[0]); } void SmxWindow::noteDel(const Host &host) { Window::noteDel(host); eraseSlot(Host2Y(host), Host2X(host)); } void SmxWindow::noteUpd(const Host &host) { Window::noteUpd(host); displayMsg(Host2Y(host), Host2X(host), host.log()[0]); } void SmxWindow::eraseSlot(int y, int x) { mvwprintw(this, y, x, (char*)"%7s ", ""); } int SmxWindow::Host2Y(const Host &host) { return Id2Y((host.lna() & 255) - 1); } int SmxWindow::Host2X(const Host &host) { return Id2X((host.lna() & 255) - 1); } int SmxWindow::Id2Y(int host_id) { return TheYMargin + ((host_id % 100) / 10) * TheYCellWidth; } int SmxWindow::Id2X(int host_id) { return TheXMargin + (host_id % 10) * TheXCellWidth; } /* NumxWindow */ NumxWindow::NumxWindow(const char *aTitle, const char *aFmt): SmxWindow(aTitle), theFmt(aFmt), theMatrix(Id2Y(99),Id2X(99)), theNumSum(0), theNumCnt(0) { update(); } void NumxWindow::noteAdd(const Host &host) { theNumCnt++; SmxWindow::noteAdd(host); } void NumxWindow::noteDel(const Host &host) { theNumCnt--; SmxWindow::noteDel(host); } void NumxWindow::noteUpd(const Host &host) { SmxWindow::noteUpd(host); } void NumxWindow::displayMsg(int y, int x, const StatusFwdMsg &msg) { const double n = msg2num(msg); theNumSum -= theMatrix(y, x); mvwprintw(this, y, x, (char*)theFmt, n); theMatrix(y, x) = n; theNumSum += n; update(); } void NumxWindow::eraseSlot(int y, int x) { theNumSum -= theMatrix(y, x); SmxWindow::eraseSlot(y, x); theMatrix(y, x) = 0; update(); } void NumxWindow::update() { int maxx, maxy; getmaxyx(this, maxy, maxx); int y = maxy-1; wattron(this, A_BOLD); mvwaddstr(this, y, Id2X(2) + TheXCellWidth-4, (char*)"cnt:"); mvwaddstr(this, y, Id2X(4) + TheXCellWidth-4, (char*)"avg:"); mvwaddstr(this, y, Id2X(6) + TheXCellWidth-4, (char*)"sum:"); wattroff(this, A_BOLD); mvwprintw(this, y, Id2X(3), (char*)"%7d", theNumCnt); mvwprintw(this, y, Id2X(5), (char*)theFmt, Ratio(theNumSum,theNumCnt)); mvwprintw(this, y, Id2X(7), (char*)theFmt, theNumSum); } /* RunTimeSmxWin */ void RunTimeSmxWin::displayMsg(int y, int x, const StatusFwdMsg &msg) { const int sec = (msg.theSndTime - msg.theStartTime).sec(); const int min = (sec/60) % 60; const int hour = sec/3600; if (hour > 0) mvwprintw(this, y, x, (char*)"%4d:%02d", hour, min); else mvwprintw(this, y, x, (char*)"%4d.%02d", min, sec%60); } /* MsgGapSmxWin */ void MsgGapSmxWin::noteUpd(const Host &host) { if (host.log().depth() > 1) theLastMsg = host.log()[1]; SmxWindow::noteUpd(host); } void MsgGapSmxWin::displayMsg(int y, int x, const StatusFwdMsg &msg) { if (theLastMsg.theRcvTime >= 0) mvwprintw(this, y, x, (char*)"%7d", (msg.theRcvTime - theLastMsg.theRcvTime).sec()); else mvwprintw(this, y, x, (char*)"%7s", "?"); } MsgSum::MsgSum(): theLabels("") { } MsgSum &MsgSum::operator +=(const StatusFwdMsg &msg) { /*if (msg.theLabel && !theLabels.str(msg.theLabel)) { // XXX: not sufficient if (theLabels) theLabels += ","; theLabels += msg.theLabel; }*/ theReqRate.record(Dbl2Int(msg.theReqRate)); theRepRate.record(Dbl2Int(msg.theRepRate)); theBwidth.record(Dbl2Int(msg.theBwidth)); theRespTime.record(msg.theRespTime.msec()); theDHR.record(Dbl2Int(msg.theDHR*100)); theConnUse.record(Dbl2Int(msg.theConnUse)); theErrRatio.record(Dbl2Int(msg.theErrRatio*100)); theXactTotCnt.record(msg.theXactTotCnt); theTotErrRatio.record( Dbl2Int(Percent(msg.theErrTotCnt, msg.theErrTotCnt+msg.theXactTotCnt))); theSockInstCnt.record(msg.theSockInstCnt); return *this; } /* local routines */ static Host *AddHost(const NetAddr &addr) { Host *host = new Host(addr); Host *foundPos = 0; for (int i = 0; !foundPos && i < TheHosts.count(); ++i) { if (!TheHosts[i]) foundPos = TheHosts[i] = host; } if (!foundPos) TheHosts.append(host); TheBusyHostCnt++; return host; } static void NoteMsg(const StatusFwdMsg &msg) { //clog << "from " << inet_ntoa(from.sin_addr) << ": " << TheMsg.buf << endl; NetAddr from(msg.theSndAddr.addr, msg.theSndAddr.port); // find corresponding host // XXX pre-IPv6 code used 's_addr' as an int const int hidx = from.addrN().octet(0) % TheHostIdx.count(); Host *host = TheHostIdx[hidx]; if (host && host->addr() != from) { // collision host = 0; // find using linear search for (int i = 0; !host && i < TheHosts.count(); ++i) { if (TheHosts[i] && TheHosts[i]->addr() == from) host = TheHosts[i]; } } if (!host) { host = AddHost(from); if (!TheHostIdx[hidx]) TheHostIdx[hidx] = host; AddFirstLabel(host, msg.theLabel); } host->noteMsg(msg); SelectHost(host); // check select status Broadcast(&Window::noteUpd, *host); } static void Broadcast(Window::EventHandler weh, const Host &host) { if (!host.selected()) (TheInfoWin->*weh)(host); // info window gets all events else for (int w = 0; w < TheWins.count(); ++w) (TheWins[w]->*weh)(host); } #if 0 static void Broadcast(Window::EventHandler weh) { for (int h = 0; h < TheHosts.count(); ++h) { if (TheHosts[h]) Broadcast(weh, *TheHosts[h]); } } #endif static void DeleteIdleHosts() { // we will these from scratch: TheHostIdx.memset(0); TheSelHostCnt = 0; for (int h = 0; h < TheHosts.count(); ++h) { if (Host *host = TheHosts[h]) { if (host->busy()) { // XXX pre-IPv6 code used 's_addr' as an int const int idx = host->addr().addrN().octet(0) % TheHostIdx.count(); TheHostIdx[idx] = host; if (host->selected()) TheSelHostCnt++; } else { TheBusyHostCnt--; Broadcast(&Window::noteDel, *host); DelLastLabel(host, host->runLabel()); delete host; TheHosts[h] = 0; } } } } static bool AddFirstLabel(const Host *skip, const String &l) { for (int h = 0; h < TheHosts.count(); ++h) { if (TheHosts[h] == skip) continue; if (TheHosts[h] && l == TheHosts[h]->runLabel()) return false; // not first label } TheUniqLblCnt++; for (int i = 1; i < TheLabels.count(); ++i) { if (!*TheLabels[i]) { *TheLabels[i] = l; return true; } } TheLabels.append(new String(l)); return false; } static bool DelLastLabel(const Host *skip, const String &l) { for (int h = 0; h < TheHosts.count(); ++h) { if (TheHosts[h] == skip) continue; if (TheHosts[h] && l == TheHosts[h]->runLabel()) return false; // not last label } TheUniqLblCnt--; for (int i = 1; i < TheLabels.count(); ++i) { if (*TheLabels[i] == l) { *TheLabels[i] = 0; if (i == TheLabels.count()-1) delete TheLabels.pop(); return true; } } Assert(0); return false; } static int SwitchWin(int idx) { TheWinPos = (idx + TheWins.count()) % TheWins.count(); touchwin(TheWins[TheWinPos]); return TheWinPos; } static void SelectHost(Host *host) { const bool wasSel = host->selected(); const bool isSel = host->matches(TheFilter); if (!wasSel && isSel) { host->selected(isSel); TheSelHostCnt++; Broadcast(&Window::noteAdd, *host); } else if (wasSel && !isSel) { TheSelHostCnt--; Broadcast(&Window::noteDel, *host); host->selected(isSel); } } static void SelectHosts() { for (int h = 0; h < TheHosts.count(); ++h) { if (Host *host = TheHosts[h]) SelectHost(host); } // update selection warnings ofixedstream s(TheFiltWarn, sizeof(TheFiltWarn)); BuildFiltWarn(s); for (int w = 0; w < TheWins.count(); ++w) { if (strlen(TheFiltWarn)) { wattron(TheWins[w], A_BOLD); mvwaddstr(TheWins[w], 1, 0, (char*)"Filters:"); wattroff(TheWins[w], A_BOLD); mvwprintw(TheWins[w], 1, 10, (char*)"%-20s", TheFiltWarn); } else { wattroff(TheWins[w], A_BOLD); mvwprintw(TheWins[w], 1, 0, (char*)"%-30s", ""); } } touchwin(TheWins[TheWinPos]); } static int SwitchLblFilter(int idx) { TheFilter.lbl.pos = (idx + TheLabels.count()) % TheLabels.count(); if (TheFilter.lbl.pos) for (int i = 0; !*TheLabels[TheFilter.lbl.pos] && i < TheLabels.count(); ++i) { TheFilter.lbl.pos++; TheFilter.lbl.pos %= TheLabels.count(); } SelectHosts(); return TheFilter.lbl.pos; } static int SwitchCatFilter(int idx) { TheFilter.logCat.pos = (idx + lgcEnd) % lgcEnd; SelectHosts(); return TheFilter.logCat.pos; } static void BuildFiltWarn(ostream &os) { if (TheFilter.lbl.pos) { Assert(*TheLabels[TheFilter.lbl.pos]); os << *TheLabels[TheFilter.lbl.pos]; } if (TheFilter.logCat.pos) { if (TheFilter.lbl.pos) os << ','; os << (TheFilter.logCat.pos == lgcCltSide ? "clt" : "srv"); } os << ends; } int main(int argc, char *argv[]) { (void)PolyVersion(); const char *disph = argc >= 2 ? argv[1] : "127.0.0.1"; const int dispp = argc >= 3 ? atoi(argv[2]) : 18256; TheDisp = NetAddr(disph, dispp); Socket sock; Must(sock.create(TheDisp.addrN().family())); if (!Should(sock.connect(TheDisp))) { cerr << "failed to connect to udp2tcp dispatcher at " << TheDisp << endl; return -2; } Must(sock.blocking(false)); Must(initscr()); cbreak(); noecho(); nonl(); intrflush(stdscr,false); keypad(stdscr,true); curs_set(0); wtimeout(stdscr, 0); // delay for cusrses input, msec; zero == nonblocking TheHostIdx.stretch(256); TheHostIdx.count(TheHostIdx.capacity()); TheLabels.append(new String); TheWins.append(TheInfoWin = new InfoWindow("Monitor Info")); TheWins.append(new SumWindow("Summary Information")); TheWins.append(new RunLabelSmxWin("Experiment Label")); TheWins.append(new RunTimeSmxWin("Run time [h:m.s]")); TheWins.append(new ReqRateSmxWin("Load [requests/sec]")); TheWins.append(new RepRateSmxWin("Throughput [replies/sec]")); TheWins.append(new BwidthSmxWin("Network Bandwidth [Mbps,replies]")); TheWins.append(new RespTimeSmxWin("Response Time [msec]")); TheWins.append(new DHRSmxWin("DHR [%]")); TheWins.append(new ConnUseSmxWin("Connection Use [xact/conn]")); TheWins.append(new ErrTotRatioSmxWin("Errors Total [%]")); TheWins.append(new ErrTotCntSmxWin("Errors Total Count x 1000")); TheWins.append(new XactTotCntSmxWin("Total Xaction Count x 1000")); TheWins.append(new ErrRatioSmxWin("Errors Now [%]")); TheWins.append(new SockInstCntSmxWin("Open Sockets Now")); TheWins.append(new MsgGapSmxWin("Message Gap [sec]")); signal(SIGPIPE, SIG_IGN); Clock::Update(); TheScanner = new Poll(); TheScanner->configure(FD_SETSIZE); MsgMonitor *msgSrv = new MsgMonitor(sock); KbdMonitor *kbdSrv = new KbdMonitor(0); // XXX: how to get stdin fd? Ticker *ticker = new Ticker(Time::Sec(60)); SwitchWin(0); redrawwin(TheWins[TheWinPos]); while (!DoShutdown) { Clock::Update(); Time tout = TheAlarmClock.on() ? TheAlarmClock.timeLeft() : Time(); TheScanner->scan(TheAlarmClock.on() ? 0 : &tout); } delete ticker; delete msgSrv; delete kbdSrv; delete TheScanner; endwin(); return 0; } #endif /* HAVE_NCURSES_H */