// Copyright (C) 2001 Jean-Marc Valin
#ifndef _THREADED_ITERATOR_CC_
#define _THREADED_ITERATOR_CC_
#include "ThreadedIterator.h"
#include <pthread.h>
#include <time.h>
#include <unistd.h>
using namespace std;
namespace FD {
const int ThreadedIterator::STATUS_RUNNING = 1;
const int ThreadedIterator::STATUS_STOPPED = 0;
ThreadedIterator::ThreadedIterator (string nodeName, ParameterSet params)
: Iterator(nodeName, params)
, m_in_getOutput(false)
, internal_pc(0)
, thread_status(STATUS_STOPPED)
{
try {
rate_per_second = dereference_cast<int>(parameters.get("RATE_PER_SECOND"));
//rate_per_second = 1;
cout<<"ThreadedIterator constructor..."<<endl;
if (rate_per_second <=0 ) {
throw new NodeException (this, "RATE_PER_SECOND IN THREADED ITERATOR MUST BE GREATER THAN ZERO.",__FILE__,__LINE__);
}
pthread_mutex_init(& mutex, NULL);
}
catch (BaseException *e) {
//Something weird happened
//e->print();
throw e->add(new NodeException (this,string("Error in ThreadedIterator constructor"), __FILE__,__LINE__));
}
}
ObjectRef ThreadedIterator::getOutput (int output_id, int count) {
if (!hasOutput(output_id)) throw new NodeException (this, "Cannot getOutput id",__FILE__,__LINE__);
m_in_getOutput = true;
iterator_lock();
//if (thread_status == ThreadedIterator::STATUS_STOPPED) {
//We must start the thread
//start_thread();
//}
//cerr<<"getOutput begin!"<<endl;
//ObjectRef output;
if (processCount != count) {
try {
//We are doing a little trick for the translator (real inputNode)
//if it exists
if (translator) {
translator->setProcessCount(count);
}
int out_id=0;
while (sinkNode->hasOutput(out_id))
{
output[out_id] = sinkNode->getOutput(output_id,internal_pc);
out_id++;
}
}
catch (BaseException *e) {
//Something weird happened
//e->print();
throw e->add(new NodeException (this,string("Error in ThreadedIterator::getOutput"), __FILE__,__LINE__));
}
processCount = count;
}
//cerr<<"getOutput end!"<<endl;
iterator_unlock();
m_in_getOutput = false;
return output[output_id];
}
void ThreadedIterator::reset() {
stop_thread();
//resetting every nodes
this->Iterator::reset();
internal_pc = 0;
}
void ThreadedIterator::start_thread() {
cerr<<"start_thread"<<endl;
internal_pc = 0;
thread_status = ThreadedIterator::STATUS_RUNNING;
pthread_create(&work_thread, NULL, &workloop, this);
}
void ThreadedIterator::stop_thread() {
cerr<<"stop_thread"<<endl;
if (thread_status == STATUS_RUNNING) {
cerr<<"Setting the stop status"<<endl;
thread_status = ThreadedIterator::STATUS_STOPPED;
//to avoid any deadlocks
iterator_unlock();
cerr<<"Threaded iterator should stop thread here... but it doesn't."<<endl;
}
else {
thread_status = ThreadedIterator::STATUS_STOPPED;
}
cerr<<"end stop thread."<<endl;
}
void ThreadedIterator::initialize() {
start_thread();
this->Network::initialize();
}
void * workloop (void *param) {
if (param == NULL) {
throw new NodeException (NULL,string("Error in ThreadedIterator::getOutput workloop: NULL param."), __FILE__,__LINE__);
}
ThreadedIterator *ptr = (ThreadedIterator *) param;
cerr<<"Starting the workloop."<<endl;
while (ptr->thread_status == ThreadedIterator::STATUS_RUNNING) {
cerr<<"status : "<<ptr->thread_status<<endl;
ptr->iterator_lock();
time_t begin = time(NULL);
try {
// updating all outputs
for (int i =0; ;i++) {
if (ptr->hasOutput(i)) {
ptr->sinkNode->getOutput(i,ptr->internal_pc);
}
else {
//no more outputs
break;
}
}
ptr->internal_pc++;
}
catch (GenericCastException *e) {
//We had a problem casting, our inputs are invalid?
e->print();
throw new NodeException (ptr,string("Error in ThreadedIterator::getOutput workloop"), __FILE__,__LINE__);
}
catch (BaseException *e) {
//Something weird happened
e->print();
throw new NodeException (ptr,string("Error in ThreadedIterator::getOutput workloop"), __FILE__,__LINE__);
}
ptr->iterator_unlock();
time_t end = time(NULL);
//cout<<"got time : "<<end - begin <<endl;
//period in ms
int period = 1000 / ptr->rate_per_second;
if (end - begin < period) {
//SLEEP the amount of time needed
//to be verified...
//usleep ((period - (end - begin)) * 1000);
}
usleep((int)(1.0 /(float)ptr->rate_per_second * 1000000.0));
}
cerr<<"Exiting ThreadedIterator loop"<<endl;
return NULL;
}
}//namespace FD
#endif
syntax highlighted by Code2HTML, v. 0.9.1