/*--------------------------------------------------------------------------
FTP client server :-)
A single program can control one or more of these via CORBA to implement
a distributed load generator.
--------------------------------------------------------------------------*/
#include "CHECK.h"
#include "dprint.h"
#include "getifaddrs.h"
#include "Platoon.h"
#include "CorbaPlatoon_srv.hh"
/* FIXME: orbit-specific */
#include "ORBitservices/CosNaming.hh"
#include "Poller_devpoll.h"
#include "Poller.h"
#include "Poller_kqueue.h"
#include "Poller_poll.h"
#include "Poller_select.h"
#include "Poller_sigfd.h"
#include "Poller_sigio.h"
#include "robouser.h"
#include "Sked.h"
#include <pthread.h>
#include <stdlib.h>
#include <arpa/inet.h>
/// handle fatal error; should do better than this...
static void die(int status, const char *error)
{
cerr << error << "exit status: " << status << endl;
exit(status);
}
/// What kind of Poller to use
char g_arg_selector = 'p';
/**--------------------------------------------------------------------------
Retrieve an array of local IP addresses from the operating system.
Caller may free the array using cfree().
@param local_addrs - where to put array of addresses.
@return number of addresses in array
--------------------------------------------------------------------------*/
static int getLocalAddrs(struct sockaddr_in **local_addrs)
{
struct ifaddrs *addrs;
struct ifaddrs *p;
/* ask the OS for the addresses */
getifaddrs(&addrs);
/* Allocate an array to hold 'em */
int i = 0;
for (p = addrs; p; p = p->ifa_next) {
if (!p->ifa_addr)
continue;
i++;
}
*local_addrs = (struct sockaddr_in *) calloc(i, sizeof(struct sockaddr_in));
/* Copy any we like into the array */
i = 0;
in_addr_t localhost = inet_addr("127.0.0.1");
for (p = addrs; p; p = p->ifa_next) {
if (!p->ifa_addr)
continue;
if (((struct sockaddr_in *) p->ifa_addr)->sin_addr.s_addr == localhost)
continue;
(*local_addrs)[i].sin_family = AF_INET;
(*local_addrs)[i].sin_port = 0; /* ephemeral */
(*local_addrs)[i].sin_addr = ((struct sockaddr_in *) p->ifa_addr)->sin_addr;
i++;
}
return i;
}
/**--------------------------------------------------------------------------
CORBA wrapper around existing C++ class "Platoon".
--------------------------------------------------------------------------*/
class CorbaPlatoon_impl : public POA_CorbaPlatoon {
public:
CorbaPlatoon_impl();
virtual void init(
const char* filename,
CORBA::ULong maxBytesPerSec,
CORBA::ULong minBytesPerSec,
CORBA::ULong bytesPerRead,
const char* servername,
CORBA::UShort port,
const char* username,
const char* passwd,
CORBA::Boolean useAllLocalInterfaces) throw ();
virtual void reset() throw ();
virtual CORBA::Long getStatus(CORBA::ULong& nconnecting,
CORBA::ULong& nalive,
CORBA::ULong& ndead) throw ();
virtual void set_verbosity(CORBA::Short verbosity) throw ()
{
lock();
m_platoon.setVerbosity((int)verbosity);
unlock();
}
virtual void set_nuserTarget(CORBA::ULong utarget) throw ()
{
lock();
m_platoon.set_nuserTarget((int)utarget);
unlock();
}
virtual void set_nconnectingTarget(CORBA::ULong ctarget) throw ()
{
lock();
m_platoon.set_nconnectingTarget((int)ctarget);
unlock();
}
virtual void set_lastConnectingState(CorbaPlatoon::sessionState_t lstate) throw ()
{
robouser_t::state_t rstate = robouser_t::UNINIT;
switch (lstate) {
case CorbaPlatoon::CONNECT_START: rstate = robouser_t::CONNECTING; break;
case CorbaPlatoon::CONNECT_FINISH: rstate = robouser_t::CONNECT; break;
}
lock();
m_platoon.set_lastConnectingState(rstate);
unlock();
}
/// Call this from main(); it's the body of the Platoon event loop
void perform_work();
~CorbaPlatoon_impl() throw ();
private:
Platoon m_platoon;
Poller * m_poller;
Sked m_sked;
pthread_mutex_t m_mutex;
int m_tix_per_second;
/// Local copy of filename (Platoon expects a stable pointer)
CORBA::String_var m_filename;
/// Local copy of servername (Platoon expects a stable pointer)
CORBA::String_var m_servername;
/// Local copy of username (Platoon expects a stable pointer)
CORBA::String_var m_username;
/// Local copy of password (Platoon expects a stable pointer)
CORBA::String_var m_passwd;
void lock();
void unlock();
};
// Public Method Implementations:
CorbaPlatoon_impl::CorbaPlatoon_impl()
{
pthread_mutex_init(&m_mutex, NULL);
if (m_sked.init()) {
die(1, "Can't init scheduler.\n");
}
m_tix_per_second = eclock_hertz();
m_poller = NULL;
}
void CorbaPlatoon_impl::init(const char *filename,
CORBA::ULong maxBytesPerSec,
CORBA::ULong minBytesPerSec,
CORBA::ULong bytesPerRead,
const char *servername,
CORBA::UShort port,
const char *username,
const char *passwd,
CORBA::Boolean useAllLocalInterfaces) throw ()
{
struct sockaddr_in *local_addrs = 0;
int n_local_addrs = 0;
if (useAllLocalInterfaces)
n_local_addrs = getLocalAddrs(&local_addrs);
lock();
switch (g_arg_selector) {
case 'p':
DPRINT(("Using poll()\n"));
m_poller = new Poller_poll();
break;
case 's':
DPRINT(("Using select()\n"));
m_poller = new Poller_select();
break;
#if HAVE_DEVPOLL
case 'd':
DPRINT(("Using Solaris /dev/poll\n"));
m_poller = new Poller_devpoll();
break;
#endif
#if HAVE_KQUEUE
case 'k':
DPRINT(("Using BSD kqueue()\n"));
m_poller = new Poller_kqueue();
break;
#endif
#if HAVE_F_SETSIG
case 'r':
DPRINT(("Using Linux rtsignals / F_SETSIG / O_ASYNC\n"));
m_poller = new Poller_sigio();
break;
#endif
#if HAVE_F_SETAUXFL
case 'f':
DPRINT(("Using Linux rtsignals / O_SIGPERFD\n"));
poller = new Poller_sigfd();
break;
#endif
default:
printf("Selector %c unsupported on this platform.\n", g_arg_selector);
exit(1);
}
// Initialize the poller
int err = m_poller->init();
CHECK(err, 0);
#ifdef SIGRTMIN
/* Tell it which signal number to use. (Only need to do this for case 'r',
* really.) */
err = poller->setSignum(SIGRTMIN);
CHECK(err, 0);
#endif
m_filename = filename;
m_servername = servername;
m_username = username;
m_passwd = passwd;
m_platoon.init(m_poller, &m_sked, m_filename,
(int)maxBytesPerSec, (int)minBytesPerSec,
(int)bytesPerRead, m_servername, (int)port, m_username, m_passwd,
local_addrs, n_local_addrs);
m_platoon.setVerbosity(3);
unlock();
}
void CorbaPlatoon_impl::reset() throw()
{
lock();
m_platoon.reset();
m_poller->shutdown();
delete m_poller;
m_poller = NULL;
unlock();
}
CORBA::Long CorbaPlatoon_impl::getStatus(CORBA::ULong & nconnecting,
CORBA::ULong & nalive, CORBA::ULong & ndead) throw()
{
lock();
int nc, na, nd;
long returnStatus = m_platoon.getStatus(&nc, &na, &nd);
unlock();
nconnecting = nc;
nalive = na;
ndead = nd;
return returnStatus;
}
void CorbaPlatoon_impl::perform_work()
{
lock();
if (!m_poller) {
unlock();
sleep(1);
return;
}
clock_t now = eclock();
/* Let the scheduler run the robots that need it */
m_sked.runAll(now);
/* Service any clients that might be ready. */
for (;;) {
Poller::PollEvent event;
int err;
err = m_poller->getNextEvent(&event);
if (err == EWOULDBLOCK)
break;
CHECK(0, err);
err = event.client->notifyPollEvent(&event);
CHECK(0, err);
m_platoon.reap();
}
/* Call poller->waitForEvents() to find out what handles are ready
* for read or write.
* Don't sleep too long here, or you'll interfere with robouser's
* bandwidth throttling.
*/
now = eclock();
clock_t tixUntilNextEvent = m_sked.nextTime(now + m_tix_per_second) - now;
int msUntilNextEvent = (tixUntilNextEvent * 1000) / m_tix_per_second + 1;
int err = m_poller->waitForEvents(msUntilNextEvent);
if (err && (err != EINTR) && (err != EWOULDBLOCK)) {
errno = err;
die(err, "poll");
}
unlock();
}
CorbaPlatoon_impl::~CorbaPlatoon_impl() throw()
{
lock();
m_poller->shutdown();
delete m_poller;
unlock();
int status = pthread_mutex_destroy(&m_mutex);
if (status != 0)
die(status, "Failed to properly destroy mutex");
}
// Private Method Implementations
void CorbaPlatoon_impl::lock()
{
int status = pthread_mutex_lock(&m_mutex);
if (status != 0)
die(status, "Unable to obtain lock on mutex");
}
void CorbaPlatoon_impl::unlock()
{
int status = pthread_mutex_unlock(&m_mutex);
if (status != 0)
die(status, "Unable to unlock mutex.");
}
int main(int argc, char **argv)
{
(void) argc;
(void) argv;
DPRINT_ENABLE(1);
try {
CORBA::ORB_var orb = CORBA::ORB_init(argc, argv, "omniORB3");
CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
PortableServer::POA_var poa = PortableServer::POA::_narrow(obj);
CorbaPlatoon_impl * myPlatoon = new CorbaPlatoon_impl ();
PortableServer::ObjectId_var myPlatoonId = poa->activate_object(myPlatoon);
// Obtain a reference to the object, and print it out as a
// stringified IOR.
CORBA::Object_var myobj = myPlatoon->_this();
CORBA::String_var sior(orb->object_to_string(myobj));
cerr << "'" << (const char*)sior << "'" << endl;
// Get the root naming context
CORBA::Object_var nsov=orb->resolve_initial_references("NameService");
CosNaming::NamingContext_var nsv = CosNaming::NamingContext::_narrow(nsov);
if (!CORBA::is_nil(nsv)) {
// Bind the object reference in naming
CosNaming::Name name;
name.length(1);
char hostname[256];
gethostname(hostname, sizeof(hostname));
name[0].id = CORBA::string_dup(hostname);
name[0].kind = CORBA::string_dup("CorbaPlatoon");
nsv->rebind(name, myobj);
printf("Binding to name service as %s\n", hostname);
} else {
printf("No name service to bind to.\n");
}
myPlatoon->_remove_ref();
PortableServer::POAManager_var pman = poa->the_POAManager();
pman->activate();
for (;;) {
if (orb->work_pending())
orb->perform_work();
myPlatoon->perform_work();
}
#if 0
/* doesn't compile with orbitcpp */
orb->destroy();
#endif
} catch(CORBA::SystemException &) {
cerr << "Caught CORBA::SystemException. " << endl;
} catch(CORBA::Exception &) {
cerr << "Caught CORBA::Exception." << endl;
#ifdef OMNIORB
} catch(omniORB::fatalException & fe) {
cerr << "Caught omniORB::fatalException:" << endl;
cerr << " file: " << fe.file() << endl;
cerr << " line: " << fe.line() << endl;
cerr << " mesg: " << fe.errmsg() << endl;
#endif
} catch(...) {
cerr << "Caught unknown exception." << endl;
}
return 0;
}
syntax highlighted by Code2HTML, v. 0.9.1