// Copyright (C) 2001 Jean-Marc Valin #ifndef _THREADED_ITERATOR_CC_ #define _THREADED_ITERATOR_CC_ #include "ThreadedIterator.h" #include #include #include using namespace std; namespace FD { const int ThreadedIterator::STATUS_RUNNING = 1; const int ThreadedIterator::STATUS_STOPPED = 0; ThreadedIterator::ThreadedIterator (string nodeName, ParameterSet params) : Iterator(nodeName, params) , m_in_getOutput(false) , internal_pc(0) , thread_status(STATUS_STOPPED) { try { rate_per_second = dereference_cast(parameters.get("RATE_PER_SECOND")); //rate_per_second = 1; cout<<"ThreadedIterator constructor..."<print(); throw e->add(new NodeException (this,string("Error in ThreadedIterator constructor"), __FILE__,__LINE__)); } } ObjectRef ThreadedIterator::getOutput (int output_id, int count) { if (!hasOutput(output_id)) throw new NodeException (this, "Cannot getOutput id",__FILE__,__LINE__); m_in_getOutput = true; iterator_lock(); //if (thread_status == ThreadedIterator::STATUS_STOPPED) { //We must start the thread //start_thread(); //} //cerr<<"getOutput begin!"<setProcessCount(count); } int out_id=0; while (sinkNode->hasOutput(out_id)) { output[out_id] = sinkNode->getOutput(output_id,internal_pc); out_id++; } } catch (BaseException *e) { //Something weird happened //e->print(); throw e->add(new NodeException (this,string("Error in ThreadedIterator::getOutput"), __FILE__,__LINE__)); } processCount = count; } //cerr<<"getOutput end!"<Iterator::reset(); internal_pc = 0; } void ThreadedIterator::start_thread() { cerr<<"start_thread"<Network::initialize(); } void * workloop (void *param) { if (param == NULL) { throw new NodeException (NULL,string("Error in ThreadedIterator::getOutput workloop: NULL param."), __FILE__,__LINE__); } ThreadedIterator *ptr = (ThreadedIterator *) param; cerr<<"Starting the workloop."<thread_status == ThreadedIterator::STATUS_RUNNING) { cerr<<"status : "<thread_status<iterator_lock(); time_t begin = time(NULL); try { // updating all outputs for (int i =0; ;i++) { if (ptr->hasOutput(i)) { ptr->sinkNode->getOutput(i,ptr->internal_pc); } else { //no more outputs break; } } ptr->internal_pc++; } catch (GenericCastException *e) { //We had a problem casting, our inputs are invalid? e->print(); throw new NodeException (ptr,string("Error in ThreadedIterator::getOutput workloop"), __FILE__,__LINE__); } catch (BaseException *e) { //Something weird happened e->print(); throw new NodeException (ptr,string("Error in ThreadedIterator::getOutput workloop"), __FILE__,__LINE__); } ptr->iterator_unlock(); time_t end = time(NULL); //cout<<"got time : "<rate_per_second; if (end - begin < period) { //SLEEP the amount of time needed //to be verified... //usleep ((period - (end - begin)) * 1000); } usleep((int)(1.0 /(float)ptr->rate_per_second * 1000000.0)); } cerr<<"Exiting ThreadedIterator loop"<