/*
* Copyright (C) 1999, 2000, 2001, 2002, 2003, 2004, 2005 Yokogawa Electric Corporation,
* YDC Corporation, IPA (Information-technology Promotion Agency, Japan).
* All rights reserved.
*
* Redistribution and use of this software in source and binary forms, with
* or without modification, are permitted provided that the following
* conditions and disclaimer are agreed and accepted by the user:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the names of the copyrighters, the name of the project which
* is related to this software (hereinafter referred to as "project") nor
* the names of the contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* 4. No merchantable use may be permitted without prior written
* notification to the copyrighters. However, using this software for the
* purpose of testing or evaluating any products including merchantable
* products may be permitted without any notification to the copyrighters.
*
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHTERS, THE PROJECT AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING
* BUT NOT LIMITED THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE, ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHTERS, THE PROJECT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT,STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*
* $TAHI: v6eval/lib/Cm/CmDispatch.cc,v 1.8 2001/10/12 04:56:13 tanaka Exp $
*/
#include "CmDispatch.h"
//IMPLEMENTATION
// CmDispatch provides an interface to the "select" system call.
#include "CmReceiver.h"
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/param.h>
#include "CmFdSet.h"
#include "Timer.h"
#include "CmFdMasks.h"
#include "PerfCollect.h"
CmDispatch::CmDispatch():nfds_(0),
onRead_(new CmFdMasks(NOFILE)),
onWrite_(new CmFdMasks(NOFILE)),
onExcept_(new CmFdMasks(NOFILE)),
queue_(new TimerQueue),
collect_(new PerformanceCollector(-1,NOFILE)) {
}
CmDispatch::~CmDispatch() {
delete collect_, collect_=0;
delete onRead_, onRead_=0;
delete onWrite_, onWrite_=0;
delete onExcept_, onExcept_=0;
delete queue_, queue_=0;}
CmDispatch& CmDispatch::instance() {
if(instance_ == 0) {
instance_ = new CmDispatch;}
return *instance_;}
void CmDispatch::instance(CmDispatch* d) {
if(instance_) delete instance_;
instance_=d;}
CmFdMasks* CmDispatch::maskHandler(CmDispatchMask mask) const {
CmFdMasks* m=0;
switch(mask) {
case ReadMask: {m=onRead_; break;}
case WriteMask: {m=onWrite_; break;}
case ExceptMask:{m=onExcept_; break;}
default: abort();}
return m;}
CmReceiver* CmDispatch::handler(int fd, CmDispatchMask mask) const {
if(fd<0||fd>=NOFILE) {abort();}
return maskHandler(mask)->handler(fd);}
void CmDispatch::attach(int fd, CmDispatchMask mask, CmReceiver* handler) {
maskHandler(mask)->attach(fd,handler);
if(nfds_<fd+1) {nfds_=fd+1;}}
void CmDispatch::link(int fd, CmDispatchMask mask, CmReceiver* handler) {
if(fd<0||fd>=NOFILE) {abort();}
attach(fd, mask, handler);}
void CmDispatch::reduce_nfds(int fd){
if(nfds_!=fd+1) {return;}
for(;nfds_>0;nfds_--) {
int i=nfds_-1;
if( onRead_->handler(i)!=0||
onWrite_->handler(i)!=0||
onWrite_->handler(i)!=0) {
break;}}}
void CmDispatch::detach(int fd, CmDispatchMask mask){
maskHandler(mask)->detach(fd);}
void CmDispatch::detach(int fd){ //rewote
detach(fd, ReadMask);
detach(fd, WriteMask);
detach(fd, ExceptMask);
reduce_nfds(fd);}
void CmDispatch::unlink(int fd, CmDispatchMask mask) {
if(fd<0||fd>=NOFILE) {abort();}
detach(fd, mask);
reduce_nfds(fd);}
void CmDispatch::unlink(int fd) {
if(fd<0||fd>=NOFILE) {abort();}
detach(fd);}
void CmDispatch::startTimer(time_t sec, uint32_t usec, CmReceiver* handler) {
timeval deltaTime;
deltaTime.tv_sec = sec; deltaTime.tv_usec = usec;
queue_->deltaInsert(deltaTime,handler);}
void CmDispatch::startTimerByAbsoluteTime(time_t sec, CmReceiver* handler) {
timeval unixTime;
unixTime.tv_sec=sec; unixTime.tv_usec=0;
queue_->insert(unixTime, handler);}
void CmDispatch::startTimerByAbsoluteTime(timeval unixTime, CmReceiver* handler) {
queue_->insert(unixTime, handler);}
void CmDispatch::stopTimer(CmReceiver* handler) {
queue_->remove(handler);}
void CmDispatch::expire(CmReceiver *h,timeval& t) {
if(h==0) return;
startTransaction();
h->timerExpired(t.tv_sec,t.tv_usec);
recordTransaction(-1);}
int CmDispatch::notify(int n,CmReceiver *h,ioNotify func) {
if(h==0) return 0;
startTransaction();
int status=(h->*func)(n);
recordTransaction(n);
return status;}
void CmDispatch::notify(int n,CmFdMasks *m,ioNotify func) {
int status=notify(n,m->notify(n),func);
/**/ if(status<0 ) {detach(n);}
else if(status==0) {m->clearReady(n);}
else if(status> 0) {m->setReady(n);}}
void CmDispatch::notify() {
for(int i=0;i<nfds_;i++){
notify(i,onRead_,&CmReceiver::inputReady);
notify(i,onWrite_,&CmReceiver::outputReady);
notify(i,onExcept_,&CmReceiver::exceptionRaised);}
timeval current=TimerQueue::currentTime();
CmReceiver* h;
for(;(h=queue_->expire(current));) {
expire(h,current);}}
int CmDispatch::select(CmFdSet& r,CmFdSet& w,CmFdSet& e,timeval* howlong) {
return ::select(nfds_,r,w,e,howlong);}
int CmDispatch::waitFor(timeval* howlong) {
int nfound=-1;
for(;nfound<0;) {
CmFdSet& r=onRead_->onSelects();
CmFdSet& w=onWrite_->onSelects();
CmFdSet& e=onExcept_->onSelects();
howlong = queue_->calculateTimeout(howlong);
nfound=select(r,w,e,howlong);
if(nfound<0) {handleError();}}
return nfound;}
bool CmDispatch::dispatch(timeval* howlong) {
int nfound;
static timeval timeout;
if(anyReady()){howlong=&timeout;}
nfound=waitFor(howlong);
notify();
return nfound>0;}
bool CmDispatch::dispatch(time_t& sec, uint32_t& usec) {
timeval howlong;
timeval prevTime;
howlong.tv_sec = sec; howlong.tv_usec = usec;
prevTime = TimerQueue::currentTime();
bool success = dispatch(&howlong);
howlong = TimerQueue::spendTime(prevTime,howlong);
sec = howlong.tv_sec; usec = howlong.tv_usec;
return success;}
void CmDispatch::dispatch() {
dispatch(0);}
bool CmDispatch::anyReady() const {
bool r=onRead_->anySet(), w=onWrite_->anySet(),e=onExcept_->anySet();
return r||w||e;}
void CmDispatch::handleError() {
if(errno == EINTR) {return;}
if(errno == EBADF) {
checkConnections(); return;}
eerr("CmDispatch: select");
abort();}
void CmDispatch::checkConnections() {
timeval zero=TimerQueue::zeroTime();
onRead_->checkConnections(nfds_,zero);}
void CmDispatch::startTransaction() {
collect_->startTransaction();}
void CmDispatch::recordTransaction(int fd) {
collect_->recordTransaction(fd);
if(transactionVector_) (*transactionVector_)(fd);}
void CmDispatch::reportPerformance() {
collect_->report();}
void CmDispatch::clearPerformance() {
collect_->clear();}
CmDispatch* CmDispatch::instance_=0;
CmDispatch::vector CmDispatch::transactionVector_=0;
syntax highlighted by Code2HTML, v. 0.9.1