// --------------------------------------------------------------------------- // - Selector.cpp - // - afnix:sio module - i/o select class implementation - // --------------------------------------------------------------------------- // - This program is free software; you can redistribute it and/or modify - // - it provided that this copyright notice is kept intact. - // - - // - This program is distributed in the hope that it will be useful, but - // - without any warranty; without even the implied warranty of - // - merchantability or fitness for a particular purpose. In no event shall - // - the copyright holder be liable for any direct, indirect, incidental or - // - special damages arising in any way out of the use of this software. - // --------------------------------------------------------------------------- // - copyright (c) 1999-2007 amaury darsch - // --------------------------------------------------------------------------- #include "Interp.hpp" #include "Integer.hpp" #include "QuarkZone.hpp" #include "Selector.hpp" #include "csio.hpp" #include "cerr.hpp" namespace afnix { // ------------------------------------------------------------------------- // - class section - // ------------------------------------------------------------------------- // create an empty selector Selector::Selector (void) { p_handle = c_shnew (); } // destroy this selector Selector::~Selector (void) { c_shfree (p_handle); } // return the class name String Selector::repr (void) const{ return "Selector"; } // add a new input stream void Selector::add (Input* is) { if (is == nilp) return; wrlock (); try { if (d_isv.exists (is) == false) { d_isv.append (is); c_shiadd (p_handle, is->getsid ()); } unlock (); } catch (...) { unlock (); throw; } } // add a new output stream void Selector::add (Output* os) { if (os == nilp) return; wrlock (); try { if (d_osv.exists (os) == false) { d_osv.append (os); c_shoadd (p_handle, os->getsid ()); } unlock (); } catch (...) { unlock (); throw; } } // return the number of input streams long Selector::ilength (void) const { rdlock (); long result = d_isv.length (); unlock (); return result; } // return the number of output streams long Selector::olength (void) const { rdlock (); long result = d_osv.length (); unlock (); return result; } // return an input stream by index Input* Selector::iget (const long index) const { rdlock (); try { Input* result = dynamic_cast (d_isv.get (index)); unlock (); return result; } catch (...) { unlock (); throw; } } // return an output stream by index Output* Selector::oget (const long index) const { rdlock (); try { Output* result = dynamic_cast (d_osv.get (index)); unlock (); return result; } catch (...) { unlock (); throw; } } // wait for one stream to be ready Object* Selector::wait (const long tout) const { wrlock (); // look into the input pushback buffer - the pushback is locked long len = d_isv.length (); for (long i = 0; i < len; i++) { Input* is = dynamic_cast (d_isv.get (i)); if (is == nilp) continue; is->wrlock (); if (is->buflen () != 0) { long max = i + 1; for (long j = 0; j < max; j++) { Input* iis = dynamic_cast (d_isv.get (j)); if (iis == nilp) continue; iis->unlock (); } unlock (); return is; } } // now wait for a descriptor to be ready long nsid = c_shwait (p_handle, tout); // unlock the input stream pushback buffers for (long i = 0; i < len; i++) { Input* is = dynamic_cast (d_isv.get (i)); if (is == nilp) continue; is->unlock (); } // check for error first if (nsid < 0) { unlock (); throw Exception ("selector-error", c_errmsg (nsid)); } // check the input descriptors first for (long i = 0; i < len; i++) { Input* is = dynamic_cast (d_isv.get (i)); if (is == nilp) continue; if (c_shitst (p_handle, is->getsid ()) == true) { unlock (); return is; } } // check the output descriptors len = d_osv.length (); for (long i = 0; i < len; i++) { Output* os = dynamic_cast (d_osv.get (i)); if (os == nilp) continue; if (c_shotst (p_handle, os->getsid ()) == true) { unlock (); return os; } } unlock (); return nilp; } // get all ready streams Vector* Selector::waitall (const long tout) const { wrlock (); Vector* result = new Vector; // look into the input pushback buffer - the pushback is locked long len = d_isv.length (); for (long i = 0; i < len; i++) { Input* is = dynamic_cast (d_isv.get (i)); if (is == nilp) continue; is->wrlock (); if (is->buflen () != 0) result->append (is); } // if we have something we unlock and return if (result->length () != 0) { for (long i = 0; i < len; i++) { Input* is = dynamic_cast (d_isv.get (i)); if (is == nilp) continue; is->unlock (); } unlock (); return result; } // now wait for a descriptor to be ready long nsid = c_shwait (p_handle, tout); // unlock the input stream pushback buffers for (long i = 0; i < len; i++) { Input* is = dynamic_cast (d_isv.get (i)); if (is == nilp) continue; is->unlock (); } // check for error first if (nsid < 0) { delete result; unlock (); throw Exception ("selector-error", c_errmsg (nsid)); } // check the input descriptors first for (long i = 0; i < len; i++) { Input* is = dynamic_cast (d_isv.get (i)); if (is == nilp) continue; if (c_shitst (p_handle, is->getsid ()) == true) result->append (is); } // check the output descriptors len = d_osv.length (); for (long i = 0; i < len; i++) { Output* os = dynamic_cast (d_osv.get (i)); if (os == nilp) continue; if (c_shotst (p_handle, os->getsid ()) == true) result->append (os); } unlock (); return result; } // ------------------------------------------------------------------------- // - object section - // ------------------------------------------------------------------------- // the quark zone static const long QUARK_ZONE_LENGTH = 7; static QuarkZone zone (QUARK_ZONE_LENGTH); // the object supported quarks static const long QUARK_ADD = zone.intern ("add"); static const long QUARK_WAIT = zone.intern ("wait"); static const long QUARK_IGET = zone.intern ("input-get"); static const long QUARK_OGET = zone.intern ("output-get"); static const long QUARK_ILENGTH = zone.intern ("input-length"); static const long QUARK_OLENGTH = zone.intern ("output-length"); static const long QUARK_WAITALL = zone.intern ("wait-all"); // create a new object in a generic way Object* Selector::mknew (Vector* argv) { long argc = (argv == nilp) ? 0 : argv->length (); // check for 0 argument if (argc == 0) return new Selector; // try to get a selector with streams Selector* result = new Selector; for (long i = 0; i < argc; i++) { Object* obj = argv->get (i); Input* is = dynamic_cast (obj); if (is != nilp) { result->add (is); continue; } Output* os = dynamic_cast (obj); if (os != nilp) { result->add (os); continue; } delete result; throw Exception ("type-error", "input or output stream expected"); } return result; } // return true if the given quark is defined bool Selector::isquark (const long quark, const bool hflg) const { rdlock (); if (zone.exists (quark) == true) { unlock (); return true; } bool result = hflg ? Object::isquark (quark, hflg) : false; unlock (); return result; } // apply this object with a set of arguments and a quark Object* Selector::apply (Runnable* robj, Nameset* nset, const long quark, Vector* argv) { // get the number of arguments long argc = (argv == nilp) ? 0 : argv->length (); // check for 0 argument if (argc == 0) { if (quark == QUARK_WAIT) return wait (-1); if (quark == QUARK_WAITALL) return waitall (-1); if (quark == QUARK_ILENGTH) return new Integer (ilength ()); if (quark == QUARK_OLENGTH) return new Integer (olength ()); } // check for 1 argument if (argc == 1) { if (quark == QUARK_WAIT) { long tout = argv->getint (0); Object* result = wait (tout); robj->post (result); return result; } if (quark == QUARK_WAITALL) { long tout = argv->getint (0); Object* result = waitall (tout); robj->post (result); return result; } if (quark == QUARK_ADD) { Object* obj = argv->get (0); Input* is = dynamic_cast (obj); if (is != nilp) { add (is); return nilp; } Output* os = dynamic_cast (obj); if (os != nilp) { add (os); return nilp; } throw Exception ("type-error", "input or output stream expected"); } if (quark == QUARK_IGET) { long index = argv->getint (0); Object* result = iget (index); robj->post (result); return result; } if (quark == QUARK_OGET) { long index = argv->getint (0); Object* result = oget (index); robj->post (result); return result; } } // call the object method return Object::apply (robj, nset, quark, argv); } }