/* Web Polygraph http://www.web-polygraph.org/ * (C) 2003-2006 The Measurement Factory * Licensed under the Apache License, Version 2.0 */ #include "base/polygraph.h" #include "xstd/Clock.h" #include "xstd/Ssl.h" #include "base/polyLogCats.h" #include "runtime/PortMgr.h" #include "runtime/ErrorMgr.h" #include "runtime/StatPhase.h" #include "runtime/StatPhaseMgr.h" #include "runtime/LogComment.h" #include "runtime/Connection.h" #include "runtime/polyBcastChannels.h" #include "runtime/globals.h" #include "runtime/SslWrap.h" int Connection::TheLastSeqId = 0; /* Connection::HalfPipe */ void Connection::HalfPipe::reset() { Assert(!theReserv); theBuf.reset(); theIOCnt = 0; isReady = false; } void Connection::HalfPipe::start(FileScanUser *u) { Should(!theReserv); theReserv = TheFileScanner->setFD(theConn.theSock.fd(), theDir, u); } void Connection::HalfPipe::start(FileScanUser *u, Time timeout) { start(u); TheFileScanner->setTimeout(theReserv.fd(), timeout); } void Connection::HalfPipe::stop(FileScanUser *u) { if (!u || TheFileScanner->user(theReserv, u)) { if (theReserv) TheFileScanner->clearRes(theReserv); theBuf.pack(); } } void Connection::HalfPipe::changeUser(FileScanUser *uOld, FileScanUser *uNew) { if (theReserv) TheFileScanner->changeUser(theReserv, uOld, uNew); } /* Connection */ Connection::Connection(): theRd(*this, theRdBuf, dirRead), theWr(*this, theWrBuf, dirWrite), theSsl(0) { reset(); } void Connection::reset() { if (theSock || theSsl) closeNow(); theAddr = NetAddr(); theMgr = 0; thePortMgr = 0; theSslCtx = 0; theSslSession = 0; theSsl = 0; theRd.reset(); theWr.reset(); theCloseKind = ConnCloseStat::ckNone; theLogCat = lgcAll; isBad = isAtEof = isLastUse = false; theTunnel = NetAddr(); theLocPort = theRemPort = -1; theUseCountLmt = theUseLevelLmt = -1; // unlimited use by default theUseCnt = theUseLvl = theUseLvlMax = 0; theOpenTime = theUseStart = Time(); theMaxIoSize = -1; theSeqId = ++TheLastSeqId; } void Connection::useSsl(const SslCtx *aCtx, SslSession *aSession) { theSslCtx = aCtx; theSslSession = aSession; } NetAddr Connection::laddr() const { NetAddr addr; if (theLocPort >= 0 && thePortMgr) { addr = thePortMgr->addr(); addr.port(theLocPort); } return addr; } int Connection::rport() const { if (!theSock) return -1; else if (theRemPort >= 0) return theRemPort; else return theRemPort = theSock.rport(); // cache the result } bool Connection::connect(const NetAddr &addr, const SockOpt &opt, PortMgr *aPortMgr) { theAddr = addr; Assert(aPortMgr); Assert(!theSock); if (!theSock.create(addr.addrN().family())) { ReportError(Error::Last()); return false; } if (!setSockOpt(opt)) return false; thePortMgr = aPortMgr; theLocPort = thePortMgr->bind(theSock); if (theLocPort < 0) return false; theOpenTime = TheClock; Broadcast(TheConnOpenChannel, this); if (!theSock.connect(addr)) { ReportError(Error::Last()); return false; } // SSL is not activated implicitly, call sslActivate() when needed return true; } bool Connection::sslActivate() { if (Should(theSslCtx) && sslConnect()) { if (Should(sslActive())) Broadcast(TheConnSslActiveChannel, this); return true; } return false; } bool Connection::accept(Socket &s, const SockOpt &opt, bool &fatal) { Assert(!theSock); theSock = s.accept(theAddr); if (!theSock) { const Error err = Error::Last(); fatal = false; if (err != EWOULDBLOCK && err != EAGAIN && err != EMFILE) ReportError(err); return false; } if (!setSockOpt(opt)) return false; theLocPort = -1; // unknown, do not care; theOpenTime = TheClock; // now that we are connected, maybe associate the socket with SSL state if (theSslCtx && !sslAccept()) return false; Broadcast(TheConnOpenChannel, this); return true; } // zero read does not mean EOF because SSL may buffer // check bad() and atEof() Size Connection::read() { if (!theRdBuf.spaceSize() || !theMaxIoSize) { theRd.isReady = true; return 0; } theRd.isReady = false; const Size ioSz = theMaxIoSize >= 0 ? Min(theRdBuf.spaceSize(), theMaxIoSize) : theRdBuf.spaceSize(); Size sz; if (!preIo(theRd, "read after accept")) sz = 0; else if (theSsl) sz = sslRead(ioSz); else sz = rawRead(ioSz); theMaxIoSize = -1; //cerr << here << "read " << sz << " : {" << theRdBuf.space() << "}" << endl; if (sz > 0) { theRdBuf.appended(sz); TheStatPhaseMgr->noteSockRead(sz, logCat()); return sz; } return 0; } Size Connection::write() { if (!theWrBuf.contSize() || !theMaxIoSize) { theWr.isReady = true; return 0; } theWr.isReady = false; const Size ioSz = theMaxIoSize >= 0 ? Min(theWrBuf.contSize(), theMaxIoSize) : theWrBuf.contSize(); Size sz; if (!preIo(theWr, "write after connect")) sz = 0; else if (theSsl) sz = sslWrite(ioSz); else sz = rawWrite(ioSz); theMaxIoSize = -1; //cerr << here << "wrote " << sz << " : {" << theWrBuf.content() << "}" << endl; if (sz >= 0) { if (sz > 0) { theWrBuf.consumed(sz); theWrBuf.pack(); return sz; } TheStatPhaseMgr->noteSockWrite(sz, logCat()); } return 0; } bool Connection::closeNow() { bool res = true; if (theSsl) res = sslCloseNow() && res; return rawCloseNow() && res; } // does not close raw socket bool Connection::closeAsync(FileScanUser *u, bool &fatal) { if (theSsl) return sslCloseAsync(u, fatal); else return true; } SslSession *Connection::sslSession() { return theSslSession; } void Connection::sslSessionForget() { theSslSession = 0; } void Connection::sslStart(int role) { Should(!theSsl); theSsl = theSslCtx->makeConnection(); theSsl->playRole(role); // // must set mode before setting fd if (theSslSession) { if (!theSsl->resumeSession(theSslSession)) { Comment << here << "warning: failed to resume an SSL session " << "with " << theAddr << ", continuing anyway" << endc; SslWrap::ReportErrors(); } } // XXX: should not these be set for SslCtx instead? Should(theSsl->enablePartialWrite()); Should(theSsl->acceptMovingWriteBuffer()); // just in case Should(theSsl->setFd(theSock.fd())); } bool Connection::sslConnect() { sslStart(Ssl::rlClient); return true; } bool Connection::sslAccept() { sslStart(Ssl::rlServer); return true; } // zero read does not mean EOF Size Connection::sslRead(Size ioSz) { Should(!isAtEof); const Size sz = theSsl->read(theRdBuf.space(), ioSz); //clog << here << "ssl read: " << sz << " ? " << theSsl->dataPending() << endl; if (sz > 0) { if (theSsl->dataPending()) TheFileScanner->forceReady(theSock.fd()); return sz; } //clog << here << this << "ssl err fd: " << theSock.fd() << ' ' << theSsl->getErrorString(sz) << endl; switch (theSsl->getError(sz)) { case SSL_ERROR_ZERO_RETURN: isAtEof = true; break; case SSL_ERROR_WANT_READ: // OK, will try again later break; case SSL_ERROR_SYSCALL: if (sz == 0) { isAtEof = true; sslError(sz, "protocol-violating EOF on read"); } else { rawError("read on SSL connection"); } break; case SSL_ERROR_WANT_WRITE: TheFileScanner->setReadNeedsWrite(theSock.fd()); break; default: sslError(sz, "read"); } return 0; } Size Connection::sslWrite(Size ioSz) { const Size sz = theSsl->write(theWrBuf.content(), ioSz); //clog << here << "ssl wrote: " << sz << endl; if (sz > 0) return sz; //clog << here << this << "ssl err fd: " << theSock.fd() << ' ' << theSsl->getErrorString(sz) << endl; switch (theSsl->getError(sz)) { case SSL_ERROR_ZERO_RETURN: // not a connection-level error; XXX: caller must handle! isAtEof = true; break; case SSL_ERROR_WANT_WRITE: // OK, will try again later break; case SSL_ERROR_SYSCALL: if (sz == 0) sslError(sz, "protocol-violating EOF on write"); else rawError("write on SSL connection"); break; case SSL_ERROR_WANT_READ: TheFileScanner->setWriteNeedsRead(theSock.fd()); break; default: sslError(sz, "write"); } return 0; } bool Connection::sslCloseNow() { // assume bi-directional shutdown is not needed int err; const bool res = theSsl->shutdown(err) || err == 0; if (!res) sslError(err, "close"); sslForget(); return res; } bool Connection::sslCloseAsync(FileScanUser *u, bool &fatal) { int err; // assume bi-directional shutdown is not needed if (theSsl->shutdown(err) || err == 0) { sslForget(); return true; } Should(err < 0); switch (theSsl->getError(err)) { case SSL_ERROR_WANT_WRITE: theRd.stop(0); if (!theWr.theReserv) theWr.start(u); fatal = false; break; case SSL_ERROR_WANT_READ: theWr.stop(0); if (!theRd.theReserv) theRd.start(u); fatal = false; break; case SSL_ERROR_SYSCALL: rawError("close on SSL connection"); fatal = true; break; default: sslError(err, "close"); fatal = true; } if (fatal) sslForget(); return false; } void Connection::sslForget() { delete theSsl; theSsl = 0; } void Connection::sslError(int err, const char *operation) { Comment(3) << "error: SSL " << operation << " failure with err=" << err << "/" << theSsl->getErrorString(err) << "/" << Error::Last().no() << endc; SslWrap::ReportErrors(); reportErrorLoc(); isBad = true; } // zero read means EOF Size Connection::rawRead(Size ioSz) { const Size sz = theSock.read(theRdBuf.space(), ioSz); if (sz > 0) return sz; if (sz == 0) isAtEof = true; else if (sz < 0 && Error::Last() != EWOULDBLOCK) rawError("read"); return 0; } Size Connection::rawWrite(Size ioSz) { const Size sz = theSock.write(theWrBuf.content(), ioSz); if (sz > 0) return sz; if (sz < 0 && Error::Last() != EWOULDBLOCK) rawError("write"); return 0; } bool Connection::rawCloseNow() { Should(!theSsl); // must be close by now if (theLocPort >= 0) { thePortMgr->release(theLocPort, true); theLocPort = -1; } bool res = true; if (theSock) { theRd.stop(0); theWr.stop(0); Broadcast(TheConnCloseChannel, this); if (!theSock.close()) { res = false; rawError("close"); } } return res; } void Connection::rawError(const char *operation) { if (ReportError(Error::Last())) { Comment(3) << "error: raw " << operation << " failed" << endc; reportErrorLoc(); } isBad = true; } void Connection::reportErrorLoc() { Comment << "connection between " << laddr() << " and " << raddr() << " failed at " << theRd.theIOCnt << " reads, " << theWr.theIOCnt << " writes, and " << useCnt() << " xacts" << endc; } void Connection::decMaxIoSize(Size aMax) { if (theMaxIoSize < 0) theMaxIoSize = aMax; else theMaxIoSize = Min(theMaxIoSize, aMax); } // this has to be done for accepted sockets as well // FreeBSD accept(2) man page misleads bool Connection::setSockOpt(const SockOpt &opt) { return Should(theSock.blocking(false) && theSock.configure(opt)); } bool Connection::preIo(HalfPipe &ioPipe, const char *operation) { if (ioCnt() == 0) { // check for connect/accept errors if (theSock.error()) { rawError(operation); return false; } Broadcast(TheConnEstChannel, this); } ioPipe.theIOCnt++; return true; } bool Connection::reusable() const { if (exhausted() || isLastUse || isBad) return false; return theUseCountLmt < 0 || theUseCnt < theUseCountLmt; } bool Connection::pipelineable() const { if (!reusable()) return false; return theUseLevelLmt < 0 || theUseLvl < theUseLevelLmt; } // prepare for next use if possible // note: the notion of ``use'' is caller defined void Connection::startUse() { Assert(reusable()); // first user takes the hit of the opening delay theUseStart = theUseCnt++ ? (Time)TheClock : theOpenTime; ++theUseLvl; if (theUseLvl > theUseLvlMax) theUseLvlMax = theUseLvl; } void Connection::finishUse() { Assert(theUseLvl > 0); --theUseLvl; }