// -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*-

// Copyright (c) 2001-2007 International Computer Science Institute
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software")
// to deal in the Software without restriction, subject to the conditions
// listed in the XORP LICENSE file. These conditions include: you must
// preserve this copyright notice, and you cannot mention the copyright
// holders in advertising related to the Software without their permission.
// The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
// notice is a summary of the XORP LICENSE file; the license in that file is
// legally binding.

#ident "$XORP: xorp/libxorp/buffered_asyncio.cc,v 1.12 2007/02/16 22:46:15 pavlin Exp $"

#include "libxorp_module.h"
#include "xorp.h"
#include "debug.h"
#include "xlog.h"

#include "buffered_asyncio.hh"

extern bool is_pseudo_error(const char* name, XorpFd fd, int error_num);

BufferedAsyncReader::BufferedAsyncReader(EventLoop& 		e,
					 XorpFd 			fd,
					 size_t 		reserve_bytes,
					 const Callback& 	cb)
    : _eventloop(e), _fd(fd), _cb(cb), _buffer(reserve_bytes),
      _last_error(0)
{
    _config.head 	  = &_buffer[0];
    _config.head_bytes 	  = 0;
    _config.trigger_bytes = 1;
    _config.reserve_bytes = reserve_bytes;
}

BufferedAsyncReader::~BufferedAsyncReader()
{
    stop();
}

inline void
BufferedAsyncReader::provision_trigger_bytes()
{
    size_t post_head_bytes = _buffer.size() - (_config.head - &_buffer[0]);

    if (_config.head + _config.head_bytes == &_buffer[0] + _buffer.size() ||
	_config.trigger_bytes >= post_head_bytes ||
	post_head_bytes < _buffer.size() / 2) {
	memmove(&_buffer[0], _config.head, _config.head_bytes);
	_config.head = &_buffer[0];
    }
}

bool
BufferedAsyncReader::set_trigger_bytes(size_t bytes)
{
    if (bytes > _config.reserve_bytes)
	return false;

    _config.trigger_bytes = bytes;
    provision_trigger_bytes();

    return true;
}

size_t
BufferedAsyncReader::trigger_bytes() const
{
    return _config.trigger_bytes;
}

bool
BufferedAsyncReader::dispose(size_t bytes)
{
    if (_config.head_bytes < bytes)
	return false;

    _config.head += bytes;
    _config.head_bytes -= bytes;

    return true;
}

bool
BufferedAsyncReader::set_reserve_bytes(size_t bytes)
{
    if (_config.reserve_bytes > bytes)
	return false;

    _buffer.resize(bytes);
    _config.reserve_bytes = bytes;

    return true;
}

size_t
BufferedAsyncReader::reserve_bytes() const
{
    return _config.reserve_bytes;
}

size_t
BufferedAsyncReader::available_bytes() const
{
    return _config.head_bytes;
}

void
BufferedAsyncReader::start()
{
    if (_eventloop.add_ioevent_cb(_fd, IOT_READ,
				  callback(this,
					   &BufferedAsyncReader::io_event)) ==
	false) {
	XLOG_ERROR("BufferedAsyncReader: failed to add I/O event callback.");
    }
#ifdef HOST_OS_WINDOWS
    if (_eventloop.add_ioevent_cb(_fd, IOT_DISCONNECT,
				  callback(this,
					   &BufferedAsyncReader::io_event)) ==
	false) {
	XLOG_ERROR("BufferedAsyncReader: failed to add I/O event callback.");
    }
#endif

    if (_config.head_bytes >= _config.trigger_bytes) {
	_ready_timer =
	     _eventloop.new_oneoff_after_ms(0,
		callback(this, &BufferedAsyncReader::announce_event, DATA));
    }

    debug_msg("%p start\n", this);
}

void
BufferedAsyncReader::stop()
{
    debug_msg("%p stop\n", this);

#ifdef HOST_OS_WINDOWS
    _eventloop.remove_ioevent_cb(_fd, IOT_DISCONNECT);
#endif
    _eventloop.remove_ioevent_cb(_fd, IOT_READ);
    _ready_timer.unschedule();
}

void
BufferedAsyncReader::io_event(XorpFd fd, IoEventType type)
{
    assert(fd == _fd);
#ifndef HOST_OS_WINDOWS
    assert(type == IOT_READ);
#else
    // Explicitly handle disconnection events
    if (type == IOT_DISCONNECT) {
	XLOG_ASSERT(fd.is_socket());
	stop();
	announce_event(END_OF_FILE);
	return;
    }
#endif

    uint8_t* 	tail 	   = _config.head + _config.head_bytes;
    size_t 	tail_bytes = _buffer.size() - (tail - &_buffer[0]);

    assert(tail_bytes >= 1);
    assert(tail + tail_bytes == &_buffer[0] + _buffer.size());

    ssize_t read_bytes = -1;

#ifdef HOST_OS_WINDOWS
    if (fd.is_socket()) {
	read_bytes = ::recvfrom(fd, (char *)tail, tail_bytes, 0,
		       NULL, 0);
	_last_error = WSAGetLastError();
	WSASetLastError(ERROR_SUCCESS);
    } else {
	(void)ReadFile(fd, (LPVOID)tail, (DWORD)tail_bytes,
		       (LPDWORD)&read_bytes, NULL);
	_last_error = GetLastError();
	SetLastError(ERROR_SUCCESS);
    }
#else
    errno = 0;
    _last_error = 0;
    read_bytes = ::read(fd, tail, tail_bytes);
    if (read_bytes < 0)
	_last_error = errno;
    errno = 0;
#endif

    if (read_bytes > 0) {
	_config.head_bytes += read_bytes;
	if (_config.head_bytes >= _config.trigger_bytes) {
	    debug_msg("YES notify - buffered I/O %u / %u\n",
		      XORP_UINT_CAST(_config.head_bytes),
		      XORP_UINT_CAST(_config.trigger_bytes));
	    announce_event(DATA);
	} else {
	    debug_msg("NO notify - buffered I/O %u / %u read %d\n",
		      XORP_UINT_CAST(_config.head_bytes),
		      XORP_UINT_CAST(_config.trigger_bytes),
		      XORP_INT_CAST(read_bytes));
	}
    } else if (read_bytes == 0) {
	announce_event(END_OF_FILE);
    } else {
	if (is_pseudo_error("BufferedAsyncReader", fd, _last_error))
	    return;
	XLOG_ERROR("read error %d", _last_error);
	stop();
	announce_event(OS_ERROR);
    }
}

void
BufferedAsyncReader::announce_event(Event ev)
{
    if (ev == DATA && _config.head_bytes < _config.trigger_bytes) {
	//
	// We might get here because a read returns more data than a user
	// wants to process.  They exit the callback with more data in their
	// buffer than the threshold event so we schedule a timer to
	// prod them again, but in the meantime an I/O event occurs
	// and pre-empts the timer callback.
	// Another example could be when a previous callback modifies
	// the threshold value.
	// Basically, we don't want to call the user below threshold.
	//
	debug_msg("announce_event: DATA (head_bytes = %u, trigger_bytes = %u)",
		  XORP_UINT_CAST(_config.head_bytes),
		  XORP_UINT_CAST(_config.trigger_bytes));
	return;
    }

    //
    // Take a reference to callback and a copy of it's count.  If it's
    // count falls between here and when the callback dispatch returns
    // the current instance has been deleted and we should return
    // without accessing any member state.
    //
    assert(_cb.is_only() == true);
    Callback cb = _cb;

    cb->dispatch(this, ev, _config.head, _config.head_bytes);

    if (cb.is_only() == true)
	return;	// We've been deleted!  Just leave

    provision_trigger_bytes();

    if (_config.head_bytes >= _config.trigger_bytes) {
	_ready_timer =
	    _eventloop.new_oneoff_after_ms(0,
		callback(this, &BufferedAsyncReader::announce_event, DATA));
    }
}


syntax highlighted by Code2HTML, v. 0.9.1