/*
* Copyright (c) 1998-2005, Index Data.
* See the file LICENSE for details.
*
* $Id: yaz-socket-manager.cpp,v 1.36 2006/03/29 13:14:17 adam Exp $
*/
#ifdef WIN32
#include <winsock.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#if HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#if HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <errno.h>
#include <string.h>
#include <assert.h>
#include <yaz/log.h>
#include <yazpp/socket-manager.h>
using namespace yazpp_1;
SocketManager::SocketEntry **SocketManager::lookupObserver(
ISocketObserver *observer)
{
SocketEntry **se;
for (se = &m_observers; *se; se = &(*se)->next)
if ((*se)->observer == observer)
break;
return se;
}
void SocketManager::addObserver(int fd, ISocketObserver *observer)
{
SocketEntry *se;
se = *lookupObserver(observer);
if (!se)
{
se = new SocketEntry;
se->next= m_observers;
m_observers = se;
se->observer = observer;
}
se->fd = fd;
se->mask = 0;
se->last_activity = 0;
se->timeout = -1;
}
void SocketManager::deleteObserver(ISocketObserver *observer)
{
SocketEntry **se = lookupObserver(observer);
if (*se)
{
removeEvent (observer);
SocketEntry *se_tmp = *se;
*se = (*se)->next;
delete se_tmp;
}
}
void SocketManager::deleteObservers()
{
SocketEntry *se = m_observers;
while (se)
{
SocketEntry *se_next = se->next;
delete se;
se = se_next;
}
m_observers = 0;
}
void SocketManager::maskObserver(ISocketObserver *observer, int mask)
{
SocketEntry *se;
yaz_log(m_log, "obs=%p read=%d write=%d except=%d", observer,
mask & SOCKET_OBSERVE_READ,
mask & SOCKET_OBSERVE_WRITE,
mask & SOCKET_OBSERVE_EXCEPT);
se = *lookupObserver(observer);
if (se)
se->mask = mask;
}
void SocketManager::timeoutObserver(ISocketObserver *observer,
int timeout)
{
SocketEntry *se;
se = *lookupObserver(observer);
if (se)
se->timeout = timeout;
}
int SocketManager::processEvent()
{
SocketEntry *p;
SocketEvent *event = getEvent();
int timeout = -1;
yaz_log (m_log, "SocketManager::processEvent manager=%p", this);
if (event)
{
event->observer->socketNotify(event->event);
delete event;
return 1;
}
fd_set in, out, except;
int res;
int max = 0;
int no = 0;
FD_ZERO(&in);
FD_ZERO(&out);
FD_ZERO(&except);
time_t now = time(0);
for (p = m_observers; p; p = p->next)
{
int fd = p->fd;
if (p->mask)
no++;
if (p->mask & SOCKET_OBSERVE_READ)
{
yaz_log (m_log, "SocketManager::select fd=%d read", fd);
FD_SET(fd, &in);
}
if (p->mask & SOCKET_OBSERVE_WRITE)
{
yaz_log (m_log, "SocketManager::select fd=%d write", fd);
FD_SET(fd, &out);
}
if (p->mask & SOCKET_OBSERVE_EXCEPT)
{
yaz_log (m_log, "SocketManager::select fd=%d except", fd);
FD_SET(fd, &except);
}
if (fd > max)
max = fd;
if (p->timeout > 0 ||
(p->timeout == 0 && (p->mask & SOCKET_OBSERVE_WRITE) == 0))
{
int timeout_this;
timeout_this = p->timeout;
if (p->last_activity)
timeout_this -= now - p->last_activity;
else
p->last_activity = now;
if (timeout_this < 0 || timeout_this > 2147483646)
timeout_this = 0;
if (timeout == -1 || timeout_this < timeout)
timeout = timeout_this;
p->timeout_this = timeout_this;
yaz_log (m_log, "SocketManager::select timeout_this=%d",
p->timeout_this);
}
else
p->timeout_this = -1;
}
if (!no)
{
yaz_log (m_log, "no pending events return 0");
if (!m_observers)
yaz_log (m_log, "no observers");
return 0;
}
struct timeval to;
to.tv_sec = timeout;
to.tv_usec = 0;
yaz_log (m_log, "SocketManager::select begin no=%d timeout=%d",
no, timeout);
int pass = 0;
while ((res = select(max + 1, &in, &out, &except,
timeout== -1 ? 0 : &to)) < 0)
if (errno != EINTR)
{
yaz_log(YLOG_ERRNO|YLOG_WARN, "select");
yaz_log(YLOG_WARN, "errno=%d max=%d timeout=%d",
errno, max, timeout);
if (++pass > 10)
return -1;
}
yaz_log(m_log, "select returned res=%d", res);
now = time(0);
for (p = m_observers; p; p = p->next)
{
int fd = p->fd;
int mask = 0;
if (FD_ISSET(fd, &in))
mask |= SOCKET_OBSERVE_READ;
if (FD_ISSET(fd, &out))
mask |= SOCKET_OBSERVE_WRITE;
if (FD_ISSET(fd, &except))
mask |= SOCKET_OBSERVE_EXCEPT;
if (mask)
{
SocketEvent *event = new SocketEvent;
p->last_activity = now;
event->observer = p->observer;
event->event = mask;
putEvent (event);
yaz_log (m_log, "putEvent I/O mask=%d", mask);
}
else if (res == 0 && p->timeout_this == timeout)
{
SocketEvent *event = new SocketEvent;
assert (p->last_activity);
yaz_log (m_log, "putEvent timeout fd=%d, now = %ld last_activity=%ld timeout=%d",
p->fd, now, p->last_activity, p->timeout);
p->last_activity = now;
event->observer = p->observer;
event->event = SOCKET_OBSERVE_TIMEOUT;
putEvent (event);
}
}
if ((event = getEvent()))
{
event->observer->socketNotify(event->event);
delete event;
return 1;
}
yaz_log(YLOG_WARN, "unhandled event in processEvent res=%d", res);
return 1;
}
// n p n p ...... n p n p
// front back
void SocketManager::putEvent(SocketEvent *event)
{
// put in back of queue
if (m_queue_back)
{
m_queue_back->prev = event;
assert (m_queue_front);
}
else
{
assert (!m_queue_front);
m_queue_front = event;
}
event->next = m_queue_back;
event->prev = 0;
m_queue_back = event;
}
SocketManager::SocketEvent *SocketManager::getEvent()
{
// get from front of queue
SocketEvent *event = m_queue_front;
if (!event)
return 0;
assert (m_queue_back);
m_queue_front = event->prev;
if (m_queue_front)
{
assert (m_queue_back);
m_queue_front->next = 0;
}
else
m_queue_back = 0;
return event;
}
void SocketManager::removeEvent(ISocketObserver *observer)
{
SocketEvent *ev = m_queue_back;
while (ev)
{
SocketEvent *ev_next = ev->next;
if (observer == ev->observer)
{
if (ev->prev)
ev->prev->next = ev->next;
else
m_queue_back = ev->next;
if (ev->next)
ev->next->prev = ev->prev;
else
m_queue_front = ev->prev;
delete ev;
}
ev = ev_next;
}
}
SocketManager::SocketManager()
{
m_observers = 0;
m_queue_front = 0;
m_queue_back = 0;
m_log = YLOG_DEBUG;
}
SocketManager::~SocketManager()
{
deleteObservers();
}
/*
* Local variables:
* c-basic-offset: 4
* indent-tabs-mode: nil
* End:
* vim: shiftwidth=4 tabstop=8 expandtab
*/
syntax highlighted by Code2HTML, v. 0.9.1