/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "beep/RawBeepMsg.h" #include "beep/BeepSessionMgr.h" #include "xml/XmlSearch.h" #include "runtime/LogComment.h" #include "runtime/globals.h" #include "app/BeepDoorman.h" static const String ThePolyBeepProfileUri = "http://www.web-polygraph.org/beep/profiles/basic/"; static const String ThePolyBeepProfile = String(""; static const String TheGreetMsgText = String("") + ThePolyBeepProfile + ""; static const String TheGreetRpyText = ""; BeepDoorman::BeepDoorman() { } BeepDoorman::~BeepDoorman() { if (theListReserv) TheFileScanner->clearRes(theListReserv); if (theListSock) theListSock.close(); while (theSessions.count()) abortSession(theSessions.last()); } void BeepDoorman::configure(const NetAddr &aListAddr, const NetAddr &aFwdAddr) { Assert(aListAddr || aFwdAddr); Assert(!theListAddr && !theFwdAddr); theListAddr = aListAddr; theFwdAddr = aFwdAddr; } void BeepDoorman::start() { if (theListAddr) { Must(theListSock.create(theListAddr.addrN().family())); Should(theListSock.reuseAddr(true)); Must(theListSock.blocking(false)); Must(theListSock.bind(theListAddr)); Must(theListSock.listen()); theListReserv = TheFileScanner->setFD(theListSock.fd(), dirRead, this); } if (theFwdAddr) { Socket fwd; Must(fwd.create(theFwdAddr.addrN().family())); Must(fwd.blocking(false)); if (Should(fwd.connect(theFwdAddr))) { startSession(fwd, theFwdAddr); } else { Comment << "error: doorman failed to connect to" << " the notification server at " << theFwdAddr << endc; fwd.close(); } } } void BeepDoorman::startSession(Socket &sock, const NetAddr &them) { SessionRec s; s.theSock = sock; s.theRAddr = them; s.theMgr = new BeepSessionMgr(sock.fd()); s.theRdRes = TheFileScanner->setFD(sock.fd(), dirRead, this); s.theIdx = theSessions.count(); s.theMgr->startChannel(0, ThePolyBeepProfileUri); theSessions.append(s); putMsg(theSessions.last(), genGreetingMsg()); } void BeepDoorman::noteReadReady(int fd) { if (theListSock.fd() == fd) accept(); else readFrom(fd); } void BeepDoorman::noteWriteReady(int fd) { for (int idx = 0; idx < theSessions.count(); ++idx) { if (theSessions[idx].theSock.fd() == fd) { writeFor(theSessions[idx]); return; } } Check(false); } void BeepDoorman::accept() { NetAddr them; Socket cl = theListSock.accept(them); if (Should(cl)) { Should(cl.blocking(false)); startSession(cl, them); } } void BeepDoorman::readFrom(int fd) { for (int idx = 0; idx < theSessions.count(); ++idx) { if (theSessions[idx].theSock.fd() == fd) { readFor(theSessions[idx]); return; } } Check(false); } void BeepDoorman::readFor(SessionRec &s) { Assert(s.theMgr); Assert(s.theMgr->hasSpaceIn()); Size spaceSz; char *space = s.theMgr->spaceIn(spaceSz); const Size contSz = s.theSock.read(space, spaceSz); if (contSz < 0) { if (Error::LastExcept(EWOULDBLOCK)) { Should(false); abortSession(s); } return; } if (contSz == 0) { abortSession(s); return; } s.theMgr->spaceInUsed(contSz); if (!s.theMgr->hasSpaceIn() && s.theRdRes) TheFileScanner->clearRes(s.theRdRes); if (s.theMgr->hasContentOut() && !s.theWrRes) s.theWrRes = TheFileScanner->setFD(s.theSock.fd(), dirWrite, this); Assert(s.theRdRes || s.theWrRes); processMsgs(s); } void BeepDoorman::writeFor(SessionRec &s) { Assert(s.theMgr); Assert(s.theMgr->hasContentOut()); Size contSz; const char *cont = s.theMgr->contentOut(contSz); const Size spaceSz = s.theSock.write(cont, contSz); if (spaceSz < 0) { if (Error::LastExcept(EWOULDBLOCK)) { Should(false); abortSession(s); } return; } if (spaceSz == 0) return; s.theMgr->contentOutUsed(spaceSz); if (!s.theMgr->hasContentOut() && s.theWrRes) TheFileScanner->clearRes(s.theWrRes); if (s.theMgr->hasSpaceIn() && !s.theRdRes) s.theRdRes = TheFileScanner->setFD(s.theSock.fd(), dirRead, this); Assert(s.theRdRes || s.theWrRes); } void BeepDoorman::processMsgs(SessionRec &s) { RawBeepMsg msg; while (s.theMgr->getMsg(msg)) { processMsg(s, msg); } } void BeepDoorman::processMsg(SessionRec &s, RawBeepMsg &msg) { //cerr << here << "recv: (" << msg.type() << ") " << msg.image() << endl; if (msg.channel() == 0) { // control channel if (msg.type() == RawBeepMsg::bmtRpy) { if (msg.no() == 0 && msg.image().str("channelCount() == 1; s.theMgr->startChannel(chId, profUri); if (doPushFwd) pushFwd(s); return; } } } } } cerr << here << "error: foreign BEEP message: (" << msg.type() << "): " << msg.image() << endl; Should(false); // XXX: implement } void BeepDoorman::bcastMsg(const String &image) { for (int i = 0; i < theSessions.count(); ++i) { SessionRec &s = theSessions[i]; // queue for forwarding session if no data channels yet if (s.theRAddr == theFwdAddr && s.theMgr->channelCount() == 1) { if (Should(theFwdImage.len() < Size::MB(1))) // use Check() theFwdImage += image; // XXX: not a real queue else theFwdImage = image; // cut if the queue grew too long } // skip control channel for (int c = 1; c < s.theMgr->channelCount(); ++c) { if (theFwdImage && s.theRAddr == theFwdAddr && c == 1) { theFwdImage += image; pushFwd(s); } else { RawBeepMsg msg(RawBeepMsg::bmtAns); msg.channel(s.theMgr->channelIdAt(c)); msg.image(image); putMsg(s, msg); } } } } void BeepDoorman::pushFwd(SessionRec &s) { Assert(s.theMgr->channelCount() > 1); RawBeepMsg msg(RawBeepMsg::bmtAns); msg.channel(s.theMgr->channelIdAt(1)); if (Should(s.theRAddr == theFwdAddr && theFwdImage)) { // forward delayed messages as one message msg.image(theFwdImage); theFwdImage = 0; } putMsg(s, msg); } void BeepDoorman::putMsg(SessionRec &s, const RawBeepMsg &msg) { //cerr << here << "sent: (" << msg.type() << ") " << msg.image() << endl; if (!s.theMgr->putMsg(msg)) cerr << here << "error: failed to buffer a BEEP message" << endl; if (s.theMgr->hasContentOut() && !s.theWrRes) s.theWrRes = TheFileScanner->setFD(s.theSock.fd(), dirWrite, this); } RawBeepMsg BeepDoorman::genGreetingMsg() const { RawBeepMsg m(RawBeepMsg::bmtRpy); m.channel(0); m.image(TheGreetMsgText); return m; } void BeepDoorman::abortSession(SessionRec &s) { Assert(0 <= s.theIdx && s.theIdx < theSessions.count()); s.theSock.close(); delete s.theMgr; if (s.theRdRes) TheFileScanner->clearRes(s.theRdRes); if (s.theWrRes) TheFileScanner->clearRes(s.theWrRes); if (s.theIdx != theSessions.count() - 1) { const int idx = s.theIdx; theSessions[idx] = theSessions.pop(); theSessions[idx].theIdx = idx; } else { theSessions.pop(); } }