// ---------------------------------------------------------------------------
// - Selector.cpp -
// - afnix:sio module - i/o select class implementation -
// ---------------------------------------------------------------------------
// - This program is free software; you can redistribute it and/or modify -
// - it provided that this copyright notice is kept intact. -
// - -
// - This program is distributed in the hope that it will be useful, but -
// - without any warranty; without even the implied warranty of -
// - merchantability or fitness for a particular purpose. In no event shall -
// - the copyright holder be liable for any direct, indirect, incidental or -
// - special damages arising in any way out of the use of this software. -
// ---------------------------------------------------------------------------
// - copyright (c) 1999-2007 amaury darsch -
// ---------------------------------------------------------------------------
#include "Interp.hpp"
#include "Integer.hpp"
#include "QuarkZone.hpp"
#include "Selector.hpp"
#include "csio.hpp"
#include "cerr.hpp"
namespace afnix {
// -------------------------------------------------------------------------
// - class section -
// -------------------------------------------------------------------------
// create an empty selector
Selector::Selector (void) {
p_handle = c_shnew ();
}
// destroy this selector
Selector::~Selector (void) {
c_shfree (p_handle);
}
// return the class name
String Selector::repr (void) const{
return "Selector";
}
// add a new input stream
void Selector::add (Input* is) {
if (is == nilp) return;
wrlock ();
try {
if (d_isv.exists (is) == false) {
d_isv.append (is);
c_shiadd (p_handle, is->getsid ());
}
unlock ();
} catch (...) {
unlock ();
throw;
}
}
// add a new output stream
void Selector::add (Output* os) {
if (os == nilp) return;
wrlock ();
try {
if (d_osv.exists (os) == false) {
d_osv.append (os);
c_shoadd (p_handle, os->getsid ());
}
unlock ();
} catch (...) {
unlock ();
throw;
}
}
// return the number of input streams
long Selector::ilength (void) const {
rdlock ();
long result = d_isv.length ();
unlock ();
return result;
}
// return the number of output streams
long Selector::olength (void) const {
rdlock ();
long result = d_osv.length ();
unlock ();
return result;
}
// return an input stream by index
Input* Selector::iget (const long index) const {
rdlock ();
try {
Input* result = dynamic_cast <Input*> (d_isv.get (index));
unlock ();
return result;
} catch (...) {
unlock ();
throw;
}
}
// return an output stream by index
Output* Selector::oget (const long index) const {
rdlock ();
try {
Output* result = dynamic_cast <Output*> (d_osv.get (index));
unlock ();
return result;
} catch (...) {
unlock ();
throw;
}
}
// wait for one stream to be ready
Object* Selector::wait (const long tout) const {
wrlock ();
// look into the input pushback buffer - the pushback is locked
long len = d_isv.length ();
for (long i = 0; i < len; i++) {
Input* is = dynamic_cast <Input*> (d_isv.get (i));
if (is == nilp) continue;
is->wrlock ();
if (is->buflen () != 0) {
long max = i + 1;
for (long j = 0; j < max; j++) {
Input* iis = dynamic_cast <Input*> (d_isv.get (j));
if (iis == nilp) continue;
iis->unlock ();
}
unlock ();
return is;
}
}
// now wait for a descriptor to be ready
long nsid = c_shwait (p_handle, tout);
// unlock the input stream pushback buffers
for (long i = 0; i < len; i++) {
Input* is = dynamic_cast <Input*> (d_isv.get (i));
if (is == nilp) continue;
is->unlock ();
}
// check for error first
if (nsid < 0) {
unlock ();
throw Exception ("selector-error", c_errmsg (nsid));
}
// check the input descriptors first
for (long i = 0; i < len; i++) {
Input* is = dynamic_cast <Input*> (d_isv.get (i));
if (is == nilp) continue;
if (c_shitst (p_handle, is->getsid ()) == true) {
unlock ();
return is;
}
}
// check the output descriptors
len = d_osv.length ();
for (long i = 0; i < len; i++) {
Output* os = dynamic_cast <Output*> (d_osv.get (i));
if (os == nilp) continue;
if (c_shotst (p_handle, os->getsid ()) == true) {
unlock ();
return os;
}
}
unlock ();
return nilp;
}
// get all ready streams
Vector* Selector::waitall (const long tout) const {
wrlock ();
Vector* result = new Vector;
// look into the input pushback buffer - the pushback is locked
long len = d_isv.length ();
for (long i = 0; i < len; i++) {
Input* is = dynamic_cast <Input*> (d_isv.get (i));
if (is == nilp) continue;
is->wrlock ();
if (is->buflen () != 0) result->append (is);
}
// if we have something we unlock and return
if (result->length () != 0) {
for (long i = 0; i < len; i++) {
Input* is = dynamic_cast <Input*> (d_isv.get (i));
if (is == nilp) continue;
is->unlock ();
}
unlock ();
return result;
}
// now wait for a descriptor to be ready
long nsid = c_shwait (p_handle, tout);
// unlock the input stream pushback buffers
for (long i = 0; i < len; i++) {
Input* is = dynamic_cast <Input*> (d_isv.get (i));
if (is == nilp) continue;
is->unlock ();
}
// check for error first
if (nsid < 0) {
delete result;
unlock ();
throw Exception ("selector-error", c_errmsg (nsid));
}
// check the input descriptors first
for (long i = 0; i < len; i++) {
Input* is = dynamic_cast <Input*> (d_isv.get (i));
if (is == nilp) continue;
if (c_shitst (p_handle, is->getsid ()) == true) result->append (is);
}
// check the output descriptors
len = d_osv.length ();
for (long i = 0; i < len; i++) {
Output* os = dynamic_cast <Output*> (d_osv.get (i));
if (os == nilp) continue;
if (c_shotst (p_handle, os->getsid ()) == true) result->append (os);
}
unlock ();
return result;
}
// -------------------------------------------------------------------------
// - object section -
// -------------------------------------------------------------------------
// the quark zone
static const long QUARK_ZONE_LENGTH = 7;
static QuarkZone zone (QUARK_ZONE_LENGTH);
// the object supported quarks
static const long QUARK_ADD = zone.intern ("add");
static const long QUARK_WAIT = zone.intern ("wait");
static const long QUARK_IGET = zone.intern ("input-get");
static const long QUARK_OGET = zone.intern ("output-get");
static const long QUARK_ILENGTH = zone.intern ("input-length");
static const long QUARK_OLENGTH = zone.intern ("output-length");
static const long QUARK_WAITALL = zone.intern ("wait-all");
// create a new object in a generic way
Object* Selector::mknew (Vector* argv) {
long argc = (argv == nilp) ? 0 : argv->length ();
// check for 0 argument
if (argc == 0) return new Selector;
// try to get a selector with streams
Selector* result = new Selector;
for (long i = 0; i < argc; i++) {
Object* obj = argv->get (i);
Input* is = dynamic_cast <Input*> (obj);
if (is != nilp) {
result->add (is);
continue;
}
Output* os = dynamic_cast <Output*> (obj);
if (os != nilp) {
result->add (os);
continue;
}
delete result;
throw Exception ("type-error", "input or output stream expected");
}
return result;
}
// return true if the given quark is defined
bool Selector::isquark (const long quark, const bool hflg) const {
rdlock ();
if (zone.exists (quark) == true) {
unlock ();
return true;
}
bool result = hflg ? Object::isquark (quark, hflg) : false;
unlock ();
return result;
}
// apply this object with a set of arguments and a quark
Object* Selector::apply (Runnable* robj, Nameset* nset, const long quark,
Vector* argv) {
// get the number of arguments
long argc = (argv == nilp) ? 0 : argv->length ();
// check for 0 argument
if (argc == 0) {
if (quark == QUARK_WAIT) return wait (-1);
if (quark == QUARK_WAITALL) return waitall (-1);
if (quark == QUARK_ILENGTH) return new Integer (ilength ());
if (quark == QUARK_OLENGTH) return new Integer (olength ());
}
// check for 1 argument
if (argc == 1) {
if (quark == QUARK_WAIT) {
long tout = argv->getint (0);
Object* result = wait (tout);
robj->post (result);
return result;
}
if (quark == QUARK_WAITALL) {
long tout = argv->getint (0);
Object* result = waitall (tout);
robj->post (result);
return result;
}
if (quark == QUARK_ADD) {
Object* obj = argv->get (0);
Input* is = dynamic_cast <Input*> (obj);
if (is != nilp) {
add (is);
return nilp;
}
Output* os = dynamic_cast <Output*> (obj);
if (os != nilp) {
add (os);
return nilp;
}
throw Exception ("type-error", "input or output stream expected");
}
if (quark == QUARK_IGET) {
long index = argv->getint (0);
Object* result = iget (index);
robj->post (result);
return result;
}
if (quark == QUARK_OGET) {
long index = argv->getint (0);
Object* result = oget (index);
robj->post (result);
return result;
}
}
// call the object method
return Object::apply (robj, nset, quark, argv);
}
}
syntax highlighted by Code2HTML, v. 0.9.1