// -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*-
// Copyright (c) 2001-2007 International Computer Science Institute
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software")
// to deal in the Software without restriction, subject to the conditions
// listed in the XORP LICENSE file. These conditions include: you must
// preserve this copyright notice, and you cannot mention the copyright
// holders in advertising related to the Software without their permission.
// The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
// notice is a summary of the XORP LICENSE file; the license in that file is
// legally binding.
#ident "$XORP: xorp/libxorp/test_asyncio.cc,v 1.17 2007/02/16 22:46:24 pavlin Exp $"
#include "libxorp_module.h"
#include "libxorp/xorp.h"
#include "libxorp/debug.h"
#include "libxorp/xorpfd.hh"
#include "libxorp/xlog.h"
#include "libxorp/random.h"
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#ifdef HAVE_WINDOWS_H
#include <windows.h>
#endif
#ifdef HAVE_WINSOCK2_H
#include <winsock2.h>
#endif
#ifdef HAVE_WS2TCPIP_H
#include <ws2tcpip.h>
#endif
#include "asyncio.hh"
static const int TIMEOUT_MS = 2000;
static const int MAX_ITERS = 50;
static const int MAX_BUFFERS = 200;
static const int MAX_BUFFER_BYTES = 1000000;
#ifdef DETAILED_DEBUG
static int bytes_read = 0;
static int bytes_written = 0;
#endif
//
// XXX: Below is a copy of few libcomm functions. We include them here,
// because nothing inside the libxorp directory should depend on any
// other XORP library.
//
/**
* local_comm_init:
* @void:
*
* Library initialization. Need be called only once, during startup.
* XXX: Not currently thread-safe.
*
* Return value: %XORP_OK on success, otherwise %XORP_ERROR.
**/
int
local_comm_init(void)
{
static int init_flag = 0;
if (init_flag)
return (XORP_OK);
#ifdef HOST_OS_WINDOWS
{
int result;
WORD version;
WSADATA wsadata;
version = MAKEWORD(2, 2);
result = WSAStartup(version, &wsadata);
if (result != 0) {
return (XORP_ERROR);
}
if (LOBYTE(wsadata.wVersion) != 2 || HIBYTE(wsadata.wVersion) != 2) {
(void)WSACleanup();
return (XORP_ERROR);
}
}
#endif // HOST_OS_WINDOWS
init_flag = 1;
return (XORP_OK);
}
/**
* local_comm_exit:
* @void:
*
* Library termination/cleanup. Must be called at process exit.
* XXX: Not currently thread-safe.
*
**/
void
local_comm_exit(void)
{
#ifdef HOST_OS_WINDOWS
(void)WSACleanup();
#endif
}
/**
* comm_sock_pair:
*
* Create a pair of connected sockets. The sockets will be created in
* the blocking state by default, and with no additional socket options set.
*
* On Windows platforms, a domain of AF_UNIX, AF_INET, or AF_INET6 must
* be specified. For the AF_UNIX case, the sockets created will actually
* be in the AF_INET domain. The protocol field is ignored.
*
* On UNIX, this function simply wraps the socketpair() system call.
*
* XXX: There may be UNIX platforms lacking socketpair() where we
* have to emulate it.
*
* @param domain the domain of the socket (e.g., AF_INET, AF_INET6).
* @param type the type of the socket (e.g., SOCK_STREAM, SOCK_DGRAM).
* @param protocol the particular protocol to be used with the socket.
* @param sv pointer to an array of two xsock_t handles to receive the
* allocated socket pair.
*
* @return XORP_OK if the socket pair was created, otherwise if any error
* is encountered, XORP_ERROR.
**/
static int
local_comm_sock_pair(int domain, int type, int protocol, xsock_t sv[2])
{
#ifndef HOST_OS_WINDOWS
if (socketpair(domain, type, protocol, sv) == -1) {
XLOG_ERROR("socketpair() failed: %s", strerror(errno));
return (XORP_ERROR);
}
return (XORP_OK);
#else // HOST_OS_WINDOWS
struct sockaddr_storage ss;
struct sockaddr_in *psin;
socklen_t sslen;
SOCKET st[3];
u_long optval;
int numtries, error, intdomain;
unsigned short port;
static const int CSP_LOWPORT = 40000;
static const int CSP_HIGHPORT = 65536;
#ifdef HAVE_IPV6
struct sockaddr_in6 *psin6;
#endif
UNUSED(protocol);
if (domain != AF_UNIX && domain != AF_INET
#ifdef HAVE_IPV6
&& domain != AF_INET6
#endif
) {
XLOG_ERROR("Invalid socket domain: %d", domain);
return (XORP_ERROR);
}
intdomain = domain;
if (intdomain == AF_UNIX)
intdomain = AF_INET;
st[0] = st[1] = st[2] = INVALID_SOCKET;
st[2] = socket(intdomain, type, 0);
if (st[2] == INVALID_SOCKET)
goto error;
memset(&ss, 0, sizeof(ss));
psin = (struct sockaddr_in *)&ss;
#ifdef HAVE_IPV6
psin6 = (struct sockaddr_in6 *)&ss;
if (intdomain == AF_INET6) {
sslen = sizeof(struct sockaddr_in6);
ss.ss_family = AF_INET6;
psin6->sin6_addr = in6addr_loopback;
} else
#endif // HAVE_IPV6
{
sslen = sizeof(struct sockaddr_in);
ss.ss_family = AF_INET;
psin->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
}
numtries = 3;
do {
port = htons((rand() % (CSP_LOWPORT - CSP_HIGHPORT)) + CSP_LOWPORT);
#ifdef HAVE_IPV6
if (intdomain == AF_INET6)
psin6->sin6_port = port;
else
#endif
psin->sin_port = port;
error = bind(st[2], (struct sockaddr *)&ss, sslen);
if (error == 0)
break;
if ((error != 0) &&
((WSAGetLastError() != WSAEADDRNOTAVAIL) ||
(WSAGetLastError() != WSAEADDRINUSE)))
break;
} while (--numtries > 0);
if (error != 0)
goto error;
error = listen(st[2], 5);
if (error != 0)
goto error;
st[0] = socket(intdomain, type, 0);
if (st[0] == INVALID_SOCKET)
goto error;
optval = 1L;
error = ioctlsocket(st[0], FIONBIO, &optval);
if (error != 0)
goto error;
error = connect(st[0], (struct sockaddr *)&ss, sslen);
if (error != 0 && WSAGetLastError() != WSAEWOULDBLOCK)
goto error;
numtries = 3;
do {
st[1] = accept(st[2], NULL, NULL);
if (st[1] != INVALID_SOCKET) {
break;
} else {
if (WSAGetLastError() == WSAEWOULDBLOCK) {
SleepEx(100, TRUE);
} else {
break;
}
}
} while (--numtries > 0);
if (st[1] == INVALID_SOCKET)
goto error;
// Squelch inherited socket event mask
(void)WSAEventSelect(st[1], NULL, 0);
//
// XXX: Should use getsockname() here to verify that the client socket
// is connected.
//
optval = 0L;
error = ioctlsocket(st[0], FIONBIO, &optval);
if (error != 0)
goto error;
closesocket(st[2]);
sv[0] = st[0];
sv[1] = st[1];
return (XORP_OK);
error:
if (st[0] != INVALID_SOCKET)
closesocket(st[0]);
if (st[1] != INVALID_SOCKET)
closesocket(st[1]);
if (st[2] != INVALID_SOCKET)
closesocket(st[2]);
return (XORP_ERROR);
#endif // HOST_OS_WINDOWS
}
/**
* local_comm_sock_set_blocking:
*
* Set the blocking or non-blocking mode of an existing socket.
* @sock: The socket whose blocking mode is to be set.
* @is_blocking: If non-zero, then the socket will be blocking, otherwise
* non-blocking.
*
* Return value: XORP_OK if the operation was successful, otherwise
* if any error is encountered, XORP_ERROR.
**/
static int
local_comm_sock_set_blocking(xsock_t sock, int is_blocking)
{
#ifdef HOST_OS_WINDOWS
u_long opt;
int flags;
if (is_blocking)
opt = 0;
else
opt = 1;
flags = ioctlsocket(sock, FIONBIO, &opt);
if (flags != 0) {
XLOG_ERROR("FIONBIO error");
return (XORP_ERROR);
}
#else // !HOST_OS_WINDOWS
int flags;
if ( (flags = fcntl(sock, F_GETFL, 0)) < 0) {
XLOG_ERROR("F_GETFL error");
return (XORP_ERROR);
}
if (is_blocking)
flags &= ~O_NONBLOCK;
else
flags |= O_NONBLOCK;
if (fcntl(sock, F_SETFL, flags) < 0) {
XLOG_ERROR("F_SETFL error");
return (XORP_ERROR);
}
#endif // HOST_OS_WINDOWS
return (XORP_OK);
}
/**
* local_comm_sock_close:
* @sock: The socket to close.
*
* Close a socket.
*
* Return value: %XORP_OK on success, otherwise %XORP_ERROR.
**/
static int
local_comm_sock_close(xsock_t sock)
{
int ret;
#ifndef HOST_OS_WINDOWS
ret = close(sock);
#else
(void)WSAEventSelect(sock, NULL, 0);
ret = closesocket(sock);
#endif
if (ret < 0) {
XLOG_ERROR("Error closing socket (socket = %d)", sock);
return (XORP_ERROR);
}
return (XORP_OK);
}
static void
writer_check(AsyncFileReader::Event ev,
const uint8_t* buf, size_t bytes, size_t offset,
uint8_t* exp_buf,XorpTimer* t)
{
assert(ev == AsyncFileWriter::DATA || ev == AsyncFileWriter::FLUSHING);
assert(buf == exp_buf);
assert(offset <= bytes);
#ifdef DETAILED_DEBUG
bytes_written += bytes;
#endif
// Defer timeout
t->schedule_after_ms(TIMEOUT_MS);
}
static void
reader_check(AsyncFileReader::Event ev,
const uint8_t* buf, size_t bytes, size_t offset,
uint8_t* exp_buf, uint8_t data_value, XorpTimer* t)
{
assert(ev == AsyncFileReader::DATA || ev == AsyncFileReader::FLUSHING);
assert(buf == exp_buf);
assert(offset <= bytes);
// Defer timeout
t->schedule_after_ms(TIMEOUT_MS);
if (offset == bytes) {
#ifdef DETAILED_DEBUG
bytes_read += bytes;
#endif
// Check buffer is filled with expected value (== iteration no)
for (size_t i = 0; i < bytes; i++) {
assert(buf[i] == data_value);
}
}
}
static void
reader_eof_check(AsyncFileReader::Event ev,
const uint8_t* buf, size_t bytes, size_t offset,
uint8_t* exp_buf, bool* done)
{
assert(ev == AsyncFileReader::END_OF_FILE);
assert(buf == exp_buf);
assert(offset <= bytes);
assert(offset == 0);
*done = true;
}
static void
timeout()
{
fprintf(stderr, "Timed out");
exit(1);
}
static void
run_test()
{
EventLoop e;
xsock_t s[2]; // pair of sockets - one for read, one for write
if (local_comm_sock_pair(AF_UNIX, SOCK_STREAM, 0, s) != XORP_OK) {
puts("Failed to open socket pair");
exit(1);
}
if (local_comm_sock_set_blocking(s[0], 0) != XORP_OK) {
puts("Failed to set socket non-blocking");
exit(1);
}
if (local_comm_sock_set_blocking(s[1], 0) != XORP_OK) {
puts("Failed to set socket non-blocking");
exit(1);
}
AsyncFileWriter afw(e, s[0]);
AsyncFileReader afr(e, s[1]);
static uint8_t msg[MAX_BUFFER_BYTES];
const size_t msg_bytes = sizeof(msg) / sizeof(msg[0]);
XorpTimer t = e.new_oneoff_after_ms(TIMEOUT_MS, callback(timeout));
uint32_t bytes_transferred = 0;
for (int i = 0; i <= MAX_ITERS; i++) {
// set value of each bytes in buffer to be test iteration number
// then we can check for corruption
memset(msg, i, msg_bytes);
bool was_started = afr.start();
UNUSED(was_started);
assert(was_started == false); // can't start no buffer
// Choose number of buffers to use
int n = 1 + (random() % MAX_BUFFERS);
printf("%d ", n); fflush(stdout);
while (n >= 0) {
// Size of buffer add must be at least 1
size_t b_bytes = 1 + (rand() % (MAX_BUFFER_BYTES - 1));
afw.add_buffer(msg, b_bytes,
callback(&writer_check, msg, &t));
afr.add_buffer(msg, b_bytes,
callback(&reader_check, msg, (uint8_t)i, &t));
n--;
bytes_transferred += b_bytes;
}
// XXX: Because of the new ioevent semantics, start and stop
// calls must be exactly matched in Win32.
afr.stop();
afw.start(); afw.stop(); afw.start(); // Just walk thru starting and
afr.start(); afr.stop(); afr.start(); // stopping...
while (afw.running() || afr.running()) {
e.run();
#ifdef DETAILED_DEBUG
printf("bytes_read = %d bytes_written = %d\n",
bytes_read, bytes_written);
fflush(stdout);
#endif
}
assert(afw.buffers_remaining() == 0 && afr.buffers_remaining() == 0);
afw.stop(); // utterly redundant call to stop()
afr.stop(); // utterly redundant call to stop()
assert(afw.start() == false); // can't start, no buffers
assert(afr.start() == false); // can't start, no buffers
}
// test END_OF_FILE
local_comm_sock_close(s[0]); // close writer's file descriptor
bool eof_test_done = false;
afr.add_buffer(msg, msg_bytes,
callback(reader_eof_check, msg, &eof_test_done));
afr.start();
while (eof_test_done == false)
e.run();
printf("\nTransfered %u bytes between AsyncFileWriter and "
"AsyncFileReader.\n", XORP_UINT_CAST(bytes_transferred));
}
int
main(int /* argc */, char *argv[])
{
//
// Initialize and start xlog
//
xlog_init(argv[0], NULL);
xlog_set_verbose(XLOG_VERBOSE_LOW); // Least verbose messages
// XXX: verbosity of the error messages temporary increased
xlog_level_set_verbose(XLOG_LEVEL_ERROR, XLOG_VERBOSE_HIGH);
xlog_add_default_output();
xlog_start();
// Some of test generates warnings - under normal circumstances the
// end user wants to know, but here not.
xlog_disable(XLOG_LEVEL_WARNING);
if (local_comm_init() != XORP_OK) {
XLOG_ERROR("Failed to initialization socket communication facility");
return 1;
}
run_test();
local_comm_exit();
//
// Gracefully stop and exit xlog
//
xlog_stop();
xlog_exit();
return 0;
}
syntax highlighted by Code2HTML, v. 0.9.1