// -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*-
// Copyright (c) 2001-2007 International Computer Science Institute
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software")
// to deal in the Software without restriction, subject to the conditions
// listed in the XORP LICENSE file. These conditions include: you must
// preserve this copyright notice, and you cannot mention the copyright
// holders in advertising related to the Software without their permission.
// The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
// notice is a summary of the XORP LICENSE file; the license in that file is
// legally binding.
#ident "$XORP: xorp/libxorp/buffered_asyncio.cc,v 1.12 2007/02/16 22:46:15 pavlin Exp $"
#include "libxorp_module.h"
#include "xorp.h"
#include "debug.h"
#include "xlog.h"
#include "buffered_asyncio.hh"
extern bool is_pseudo_error(const char* name, XorpFd fd, int error_num);
BufferedAsyncReader::BufferedAsyncReader(EventLoop& e,
XorpFd fd,
size_t reserve_bytes,
const Callback& cb)
: _eventloop(e), _fd(fd), _cb(cb), _buffer(reserve_bytes),
_last_error(0)
{
_config.head = &_buffer[0];
_config.head_bytes = 0;
_config.trigger_bytes = 1;
_config.reserve_bytes = reserve_bytes;
}
BufferedAsyncReader::~BufferedAsyncReader()
{
stop();
}
inline void
BufferedAsyncReader::provision_trigger_bytes()
{
size_t post_head_bytes = _buffer.size() - (_config.head - &_buffer[0]);
if (_config.head + _config.head_bytes == &_buffer[0] + _buffer.size() ||
_config.trigger_bytes >= post_head_bytes ||
post_head_bytes < _buffer.size() / 2) {
memmove(&_buffer[0], _config.head, _config.head_bytes);
_config.head = &_buffer[0];
}
}
bool
BufferedAsyncReader::set_trigger_bytes(size_t bytes)
{
if (bytes > _config.reserve_bytes)
return false;
_config.trigger_bytes = bytes;
provision_trigger_bytes();
return true;
}
size_t
BufferedAsyncReader::trigger_bytes() const
{
return _config.trigger_bytes;
}
bool
BufferedAsyncReader::dispose(size_t bytes)
{
if (_config.head_bytes < bytes)
return false;
_config.head += bytes;
_config.head_bytes -= bytes;
return true;
}
bool
BufferedAsyncReader::set_reserve_bytes(size_t bytes)
{
if (_config.reserve_bytes > bytes)
return false;
_buffer.resize(bytes);
_config.reserve_bytes = bytes;
return true;
}
size_t
BufferedAsyncReader::reserve_bytes() const
{
return _config.reserve_bytes;
}
size_t
BufferedAsyncReader::available_bytes() const
{
return _config.head_bytes;
}
void
BufferedAsyncReader::start()
{
if (_eventloop.add_ioevent_cb(_fd, IOT_READ,
callback(this,
&BufferedAsyncReader::io_event)) ==
false) {
XLOG_ERROR("BufferedAsyncReader: failed to add I/O event callback.");
}
#ifdef HOST_OS_WINDOWS
if (_eventloop.add_ioevent_cb(_fd, IOT_DISCONNECT,
callback(this,
&BufferedAsyncReader::io_event)) ==
false) {
XLOG_ERROR("BufferedAsyncReader: failed to add I/O event callback.");
}
#endif
if (_config.head_bytes >= _config.trigger_bytes) {
_ready_timer =
_eventloop.new_oneoff_after_ms(0,
callback(this, &BufferedAsyncReader::announce_event, DATA));
}
debug_msg("%p start\n", this);
}
void
BufferedAsyncReader::stop()
{
debug_msg("%p stop\n", this);
#ifdef HOST_OS_WINDOWS
_eventloop.remove_ioevent_cb(_fd, IOT_DISCONNECT);
#endif
_eventloop.remove_ioevent_cb(_fd, IOT_READ);
_ready_timer.unschedule();
}
void
BufferedAsyncReader::io_event(XorpFd fd, IoEventType type)
{
assert(fd == _fd);
#ifndef HOST_OS_WINDOWS
assert(type == IOT_READ);
#else
// Explicitly handle disconnection events
if (type == IOT_DISCONNECT) {
XLOG_ASSERT(fd.is_socket());
stop();
announce_event(END_OF_FILE);
return;
}
#endif
uint8_t* tail = _config.head + _config.head_bytes;
size_t tail_bytes = _buffer.size() - (tail - &_buffer[0]);
assert(tail_bytes >= 1);
assert(tail + tail_bytes == &_buffer[0] + _buffer.size());
ssize_t read_bytes = -1;
#ifdef HOST_OS_WINDOWS
if (fd.is_socket()) {
read_bytes = ::recvfrom(fd, (char *)tail, tail_bytes, 0,
NULL, 0);
_last_error = WSAGetLastError();
WSASetLastError(ERROR_SUCCESS);
} else {
(void)ReadFile(fd, (LPVOID)tail, (DWORD)tail_bytes,
(LPDWORD)&read_bytes, NULL);
_last_error = GetLastError();
SetLastError(ERROR_SUCCESS);
}
#else
errno = 0;
_last_error = 0;
read_bytes = ::read(fd, tail, tail_bytes);
if (read_bytes < 0)
_last_error = errno;
errno = 0;
#endif
if (read_bytes > 0) {
_config.head_bytes += read_bytes;
if (_config.head_bytes >= _config.trigger_bytes) {
debug_msg("YES notify - buffered I/O %u / %u\n",
XORP_UINT_CAST(_config.head_bytes),
XORP_UINT_CAST(_config.trigger_bytes));
announce_event(DATA);
} else {
debug_msg("NO notify - buffered I/O %u / %u read %d\n",
XORP_UINT_CAST(_config.head_bytes),
XORP_UINT_CAST(_config.trigger_bytes),
XORP_INT_CAST(read_bytes));
}
} else if (read_bytes == 0) {
announce_event(END_OF_FILE);
} else {
if (is_pseudo_error("BufferedAsyncReader", fd, _last_error))
return;
XLOG_ERROR("read error %d", _last_error);
stop();
announce_event(OS_ERROR);
}
}
void
BufferedAsyncReader::announce_event(Event ev)
{
if (ev == DATA && _config.head_bytes < _config.trigger_bytes) {
//
// We might get here because a read returns more data than a user
// wants to process. They exit the callback with more data in their
// buffer than the threshold event so we schedule a timer to
// prod them again, but in the meantime an I/O event occurs
// and pre-empts the timer callback.
// Another example could be when a previous callback modifies
// the threshold value.
// Basically, we don't want to call the user below threshold.
//
debug_msg("announce_event: DATA (head_bytes = %u, trigger_bytes = %u)",
XORP_UINT_CAST(_config.head_bytes),
XORP_UINT_CAST(_config.trigger_bytes));
return;
}
//
// Take a reference to callback and a copy of it's count. If it's
// count falls between here and when the callback dispatch returns
// the current instance has been deleted and we should return
// without accessing any member state.
//
assert(_cb.is_only() == true);
Callback cb = _cb;
cb->dispatch(this, ev, _config.head, _config.head_bytes);
if (cb.is_only() == true)
return; // We've been deleted! Just leave
provision_trigger_bytes();
if (_config.head_bytes >= _config.trigger_bytes) {
_ready_timer =
_eventloop.new_oneoff_after_ms(0,
callback(this, &BufferedAsyncReader::announce_event, DATA));
}
}
syntax highlighted by Code2HTML, v. 0.9.1