// -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*-
// Copyright (c) 2001-2007 International Computer Science Institute
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software")
// to deal in the Software without restriction, subject to the conditions
// listed in the XORP LICENSE file. These conditions include: you must
// preserve this copyright notice, and you cannot mention the copyright
// holders in advertising related to the Software without their permission.
// The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
// notice is a summary of the XORP LICENSE file; the license in that file is
// legally binding.

#ident "$XORP: xorp/libxorp/test_asyncio.cc,v 1.17 2007/02/16 22:46:24 pavlin Exp $"

#include "libxorp_module.h"

#include "libxorp/xorp.h"
#include "libxorp/debug.h"
#include "libxorp/xorpfd.hh"
#include "libxorp/xlog.h"
#include "libxorp/random.h"

#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif
#ifdef HAVE_WINSOCK2_H
#include <winsock2.h>
#endif
#ifdef HAVE_WS2TCPIP_H
#include <ws2tcpip.h>
#endif

#include "asyncio.hh"


static const int TIMEOUT_MS	  = 2000;
static const int MAX_ITERS	  = 50;
static const int MAX_BUFFERS	  = 200;
static const int MAX_BUFFER_BYTES = 1000000;

#ifdef DETAILED_DEBUG
static int bytes_read = 0;
static int bytes_written = 0;
#endif


//
// XXX: Below is a copy of few libcomm functions. We include them here,
// because nothing inside the libxorp directory should depend on any
// other XORP library.
//

/**
 * local_comm_init:
 * @void:
 *
 * Library initialization. Need be called only once, during startup.
 * XXX: Not currently thread-safe.
 *
 * Return value: %XORP_OK on success, otherwise %XORP_ERROR.
 **/
int
local_comm_init(void)
{
    static int init_flag = 0;

    if (init_flag)
	return (XORP_OK);

#ifdef HOST_OS_WINDOWS
    {
	int result;
	WORD version;
	WSADATA wsadata;

	version = MAKEWORD(2, 2);
	result = WSAStartup(version, &wsadata);
	if (result != 0) {
	    return (XORP_ERROR);
	}
	if (LOBYTE(wsadata.wVersion) != 2 || HIBYTE(wsadata.wVersion) != 2) {
	    (void)WSACleanup();
	    return (XORP_ERROR);
	}
    }
#endif // HOST_OS_WINDOWS

    init_flag = 1;

    return (XORP_OK);
}

/**
 * local_comm_exit:
 * @void:
 *
 * Library termination/cleanup. Must be called at process exit.
 * XXX: Not currently thread-safe.
 *
 **/
void
local_comm_exit(void)
{
#ifdef HOST_OS_WINDOWS
    (void)WSACleanup();
#endif
}

/**
 * comm_sock_pair:
 *
 * Create a pair of connected sockets. The sockets will be created in
 * the blocking state by default, and with no additional socket options set.
 *
 * On Windows platforms, a domain of AF_UNIX, AF_INET, or AF_INET6 must
 * be specified. For the AF_UNIX case, the sockets created will actually
 * be in the AF_INET domain. The protocol field is ignored.
 *
 * On UNIX, this function simply wraps the socketpair() system call.
 *
 * XXX: There may be UNIX platforms lacking socketpair() where we
 * have to emulate it.
 *
 * @param domain the domain of the socket (e.g., AF_INET, AF_INET6).
 * @param type the type of the socket (e.g., SOCK_STREAM, SOCK_DGRAM).
 * @param protocol the particular protocol to be used with the socket.
 * @param sv pointer to an array of two xsock_t handles to receive the
 *        allocated socket pair.
 *
 * @return XORP_OK if the socket pair was created, otherwise if any error
 * is encountered, XORP_ERROR.
 **/
static int
local_comm_sock_pair(int domain, int type, int protocol, xsock_t sv[2])
{
#ifndef HOST_OS_WINDOWS
    if (socketpair(domain, type, protocol, sv) == -1) {
	XLOG_ERROR("socketpair() failed: %s", strerror(errno));
	return (XORP_ERROR);
    }
    return (XORP_OK);

#else // HOST_OS_WINDOWS
    struct sockaddr_storage ss;
    struct sockaddr_in	*psin;
    socklen_t		sslen;
    SOCKET		st[3];
    u_long		optval;
    int			numtries, error, intdomain;
    unsigned short	port;
    static const int	CSP_LOWPORT = 40000;
    static const int	CSP_HIGHPORT = 65536;
#ifdef HAVE_IPV6
    struct sockaddr_in6 *psin6;
#endif

    UNUSED(protocol);

    if (domain != AF_UNIX && domain != AF_INET
#ifdef HAVE_IPV6
	&& domain != AF_INET6
#endif
	) {
	XLOG_ERROR("Invalid socket domain: %d", domain);
	return (XORP_ERROR);
    }

    intdomain = domain;
    if (intdomain == AF_UNIX)
	intdomain = AF_INET;

    st[0] = st[1] = st[2] = INVALID_SOCKET;

    st[2] = socket(intdomain, type, 0);
    if (st[2] == INVALID_SOCKET)
	goto error;

    memset(&ss, 0, sizeof(ss));
    psin = (struct sockaddr_in *)&ss;
#ifdef HAVE_IPV6
    psin6 = (struct sockaddr_in6 *)&ss;
    if (intdomain == AF_INET6) {
	sslen = sizeof(struct sockaddr_in6);
	ss.ss_family = AF_INET6;
	psin6->sin6_addr = in6addr_loopback;
    } else
#endif  // HAVE_IPV6
    {
	sslen = sizeof(struct sockaddr_in);
	ss.ss_family = AF_INET;
	psin->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    }

    numtries = 3;
    do {
	port = htons((rand() % (CSP_LOWPORT - CSP_HIGHPORT)) + CSP_LOWPORT);
#ifdef HAVE_IPV6
	if (intdomain == AF_INET6)
	    psin6->sin6_port = port;
	else
#endif
	    psin->sin_port = port;
	error = bind(st[2], (struct sockaddr *)&ss, sslen);
	if (error == 0)
	    break;
	if ((error != 0) &&
	    ((WSAGetLastError() != WSAEADDRNOTAVAIL) ||
	     (WSAGetLastError() != WSAEADDRINUSE)))
	    break;
    } while (--numtries > 0);

    if (error != 0)
	goto error;

    error = listen(st[2], 5);
    if (error != 0)
	goto error;

    st[0] = socket(intdomain, type, 0);
    if (st[0] == INVALID_SOCKET)
	goto error;

    optval = 1L;
    error = ioctlsocket(st[0], FIONBIO, &optval);
    if (error != 0)
	goto error;

    error = connect(st[0], (struct sockaddr *)&ss, sslen);
    if (error != 0 && WSAGetLastError() != WSAEWOULDBLOCK)
	goto error;

    numtries = 3;
    do {
	st[1] = accept(st[2], NULL, NULL);
	if (st[1] != INVALID_SOCKET) {
	    break;
	} else {
	    if (WSAGetLastError() == WSAEWOULDBLOCK) {
		SleepEx(100, TRUE);
	    } else {
		break;
	    }
	}
    } while (--numtries > 0);

    if (st[1] == INVALID_SOCKET)
	goto error;

    // Squelch inherited socket event mask
    (void)WSAEventSelect(st[1], NULL, 0);

    //
    // XXX: Should use getsockname() here to verify that the client socket
    // is connected.
    //
    optval = 0L;
    error = ioctlsocket(st[0], FIONBIO, &optval);
    if (error != 0)
	goto error;

    closesocket(st[2]);
    sv[0] = st[0];
    sv[1] = st[1];
    return (XORP_OK);

error:
    if (st[0] != INVALID_SOCKET)
	closesocket(st[0]);
    if (st[1] != INVALID_SOCKET)
	closesocket(st[1]);
    if (st[2] != INVALID_SOCKET)
	closesocket(st[2]);
    return (XORP_ERROR);
#endif // HOST_OS_WINDOWS
}

/**
 * local_comm_sock_set_blocking:
 *
 * Set the blocking or non-blocking mode of an existing socket.
 * @sock: The socket whose blocking mode is to be set.
 * @is_blocking: If non-zero, then the socket will be blocking, otherwise
 * non-blocking.
 *
 * Return value: XORP_OK if the operation was successful, otherwise
 *               if any error is encountered, XORP_ERROR.
 **/
static int
local_comm_sock_set_blocking(xsock_t sock, int is_blocking)
{
#ifdef HOST_OS_WINDOWS
    u_long opt;
    int flags;

    if (is_blocking)
	opt = 0;
    else
	opt = 1;

    flags = ioctlsocket(sock, FIONBIO, &opt);
    if (flags != 0) {
	XLOG_ERROR("FIONBIO error");
	return (XORP_ERROR);
    }

#else // !HOST_OS_WINDOWS
    int flags;
    if ( (flags = fcntl(sock, F_GETFL, 0)) < 0) {
	XLOG_ERROR("F_GETFL error");
	return (XORP_ERROR);
    }

    if (is_blocking)
	flags &= ~O_NONBLOCK;
    else
	flags |= O_NONBLOCK;

    if (fcntl(sock, F_SETFL, flags) < 0) {
	XLOG_ERROR("F_SETFL error");
	return (XORP_ERROR);
    }
#endif // HOST_OS_WINDOWS

    return (XORP_OK);
}

/**
 * local_comm_sock_close:
 * @sock: The socket to close.
 *
 * Close a socket.
 *
 * Return value: %XORP_OK on success, otherwise %XORP_ERROR.
 **/
static int
local_comm_sock_close(xsock_t sock)
{
    int ret;

#ifndef HOST_OS_WINDOWS
    ret = close(sock);
#else
    (void)WSAEventSelect(sock, NULL, 0);
    ret = closesocket(sock);
#endif

    if (ret < 0) {
	XLOG_ERROR("Error closing socket (socket = %d)", sock);
	return (XORP_ERROR);
    }

    return (XORP_OK);
}


static void
writer_check(AsyncFileReader::Event ev,
	     const uint8_t* buf, size_t bytes, size_t offset,
	     uint8_t* exp_buf,XorpTimer* t)
{
    assert(ev == AsyncFileWriter::DATA || ev == AsyncFileWriter::FLUSHING);
    assert(buf == exp_buf);
    assert(offset <= bytes);

#ifdef DETAILED_DEBUG
    bytes_written += bytes;
#endif

    // Defer timeout
    t->schedule_after_ms(TIMEOUT_MS);
}

static void
reader_check(AsyncFileReader::Event ev,
	     const uint8_t* buf, size_t bytes, size_t offset, 
	     uint8_t* exp_buf, uint8_t data_value, XorpTimer* t)
{
    assert(ev == AsyncFileReader::DATA || ev == AsyncFileReader::FLUSHING);
    assert(buf == exp_buf);
    assert(offset <= bytes);

    // Defer timeout
    t->schedule_after_ms(TIMEOUT_MS);
    
    if (offset == bytes) {
#ifdef DETAILED_DEBUG
    	bytes_read += bytes;
#endif
	// Check buffer is filled with expected value (== iteration no)
	for (size_t i = 0; i < bytes; i++) {
	    assert(buf[i] == data_value);
	}
    }
}

static void
reader_eof_check(AsyncFileReader::Event ev,
		 const uint8_t* buf, size_t bytes, size_t offset,
		 uint8_t* exp_buf, bool* done)
{
    assert(ev == AsyncFileReader::END_OF_FILE);
    assert(buf == exp_buf);
    assert(offset <= bytes);
    assert(offset == 0);
    *done = true;
}

static void
timeout()
{
    fprintf(stderr, "Timed out");
    exit(1);
}

static void
run_test()
{
    EventLoop e;

    xsock_t s[2]; // pair of sockets - one for read, one for write
    if (local_comm_sock_pair(AF_UNIX, SOCK_STREAM, 0, s) != XORP_OK) {
	puts("Failed to open socket pair");
	exit(1);
    }
    if (local_comm_sock_set_blocking(s[0], 0) != XORP_OK) {
	puts("Failed to set socket non-blocking");
	exit(1);
    }
    if (local_comm_sock_set_blocking(s[1], 0) != XORP_OK) {
	puts("Failed to set socket non-blocking");
	exit(1);
    }

    AsyncFileWriter afw(e, s[0]);
    AsyncFileReader afr(e, s[1]);

    static uint8_t msg[MAX_BUFFER_BYTES];
    const size_t msg_bytes = sizeof(msg) / sizeof(msg[0]);

    XorpTimer t = e.new_oneoff_after_ms(TIMEOUT_MS, callback(timeout));

    uint32_t bytes_transferred = 0;
    for (int i = 0; i <= MAX_ITERS; i++) {
	// set value of each bytes in buffer to be test iteration number
	// then we can check for corruption
	memset(msg, i, msg_bytes); 
	bool was_started = afr.start();
	UNUSED(was_started);
	assert(was_started == false); // can't start no buffer
	// Choose number of buffers to use
	int n = 1 + (random() % MAX_BUFFERS);
	printf("%d ", n); fflush(stdout);
	while (n >= 0) {
	    // Size of buffer add must be at least 1
	    size_t b_bytes = 1 + (rand() % (MAX_BUFFER_BYTES - 1)); 
	    afw.add_buffer(msg, b_bytes, 
			   callback(&writer_check, msg, &t));
	    afr.add_buffer(msg, b_bytes,
 			   callback(&reader_check, msg, (uint8_t)i, &t));
	    n--;
	    bytes_transferred += b_bytes;
	}

	// XXX: Because of the new ioevent semantics, start and stop
	// calls must be exactly matched in Win32.
	afr.stop();

	afw.start(); afw.stop(); afw.start(); // Just walk thru starting and
	afr.start(); afr.stop(); afr.start(); // stopping...

	while (afw.running() || afr.running()) {
	    e.run();
#ifdef DETAILED_DEBUG
	    printf("bytes_read = %d bytes_written = %d\n",
			bytes_read, bytes_written);
	    fflush(stdout);
#endif
	}
	assert(afw.buffers_remaining() == 0 && afr.buffers_remaining() == 0);

	afw.stop(); // utterly redundant call to stop()
	afr.stop(); // utterly redundant call to stop()

	assert(afw.start() == false); // can't start, no buffers
	assert(afr.start() == false); // can't start, no buffers
    }

    // test END_OF_FILE
    local_comm_sock_close(s[0]); // close writer's file descriptor

    bool eof_test_done = false;
    afr.add_buffer(msg, msg_bytes,
		   callback(reader_eof_check, msg, &eof_test_done));
    afr.start();
    while (eof_test_done == false)
	e.run();

    printf("\nTransfered %u bytes between AsyncFileWriter and "
	   "AsyncFileReader.\n", XORP_UINT_CAST(bytes_transferred));
}

int
main(int /* argc */, char *argv[]) 
{
    //
    // Initialize and start xlog
    //
    xlog_init(argv[0], NULL);
    xlog_set_verbose(XLOG_VERBOSE_LOW);		// Least verbose messages
    // XXX: verbosity of the error messages temporary increased
    xlog_level_set_verbose(XLOG_LEVEL_ERROR, XLOG_VERBOSE_HIGH);
    
    xlog_add_default_output();
    xlog_start();

    // Some of test generates warnings - under normal circumstances the
    // end user wants to know, but here not.
    xlog_disable(XLOG_LEVEL_WARNING);

    if (local_comm_init() != XORP_OK) {
	XLOG_ERROR("Failed to initialization socket communication facility");
	return 1;
    }

    run_test();

    local_comm_exit();

    //
    // Gracefully stop and exit xlog
    //
    xlog_stop();
    xlog_exit();

    return 0;
}


syntax highlighted by Code2HTML, v. 0.9.1