///////////////////////////////////////////////////////////////////////////////

// MQ4CPP - Message queuing for C++

// Copyright (C) 2004-2007  Riccardo Pompeo (Italy)

//

// This library is free software; you can redistribute it and/or

// modify it under the terms of the GNU Lesser General Public

// License as published by the Free Software Foundation; either

// version 2.1 of the License, or (at your option) any later version.

//

// This library is distributed in the hope that it will be useful,

// but WITHOUT ANY WARRANTY; without even the implied warranty of

// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU

// Lesser General Public License for more details.

//

// You should have received a copy of the GNU Lesser General Public

// License along with this library; if not, write to the Free Software

// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

//


#define SILENT

#include "Trace.h"

#include "MessageQueue.h"

#include "Logger.h"


Registry::Registry(const char* theName) : Thread(theName) 
{
	TRACE("Registry::Registry - start")
	itsNextHandleAvailable=1; //v1.5

	start();
	setPriority(Thread::P_LOWEST);
	TRACE("Registry::Registry - end")
}
		
Registry::~Registry()
{
	TRACE("Registry::~Registry - start")
	stop(false);	
	free();
	TRACE("Registry::~Registry - end")
}

void Registry::run() 
{
	TRACE("Registry::run - start")
	
	sleep(1000); // wait startup completion


	while(true)
	{		
		try 
		{												
			wait();
			TRACE("Registry::run - start garbage collection")
			itsAction=Registry::GARBAGE_COLLECTION;
			forEach(); // Iterate all elements in queue

			TRACE("Registry::run - garbage collection done")
			release();	

			if(itsRunningFlag==false)
				break;			
				
			sleep(500); // wait 500 ms

		}
		catch(Exception& ex) 
		{
			release();
			TRACE(ex.getMessage().c_str())
		}
		catch(...)
		{
			release();
			TRACE("Registry::run : Unhandled exception")
		}
	}
	
	TRACE("Registry::run - end")		
}

// ++ v1.1

MQHANDLE Registry::findID()
{
	for(int cnt=1;cnt <= 0xFFFF; cnt++, itsNextHandleAvailable++)
	{
		if(itsNextHandleAvailable==0) // ++ v1.5

			itsNextHandleAvailable=1;
		
		if(at(itsNextHandleAvailable)==NULL)
		{
			MQHANDLE anHandle=itsNextHandleAvailable++;			
			return anHandle;
		} 		
	}	
	
	throw ThreadException("Registry::findID - no more handles available");
	return 0;
}

void Registry::add(MessageQueue* theQueue)
{
	TRACE("Registry::add - start")
	TRACE("Queue name=" << theQueue->getName())
	if(isShuttingDown())
	{
		TRACE("Registry::add: Action aborted on shutdown")
		return;
	}

	wait(); //++v1.4

	MQHANDLE aNewID=findID(); // ++ v1.1

	theQueue->setID(aNewID); // ++ v1.1

	set(aNewID,theQueue); // ++ v1.1

	push(theQueue);
	release(); //++v1.4 

	TRACE("Registry::add - end")
}

bool Registry::lookup(const char* theName,MQHANDLE& theID)
{
	TRACE("Registry::lookup - start")
	TRACE("Search=" << theName)
	if(isShuttingDown())
	{
		TRACE("Registry::lookup: Action aborted on shutdown")
		return false;
	}

	itsFoundID=0;
	itsAction=Registry::LOOKUP;
	itsFoundFlag=false;
	itsQueueToLookup=theName;
	wait();  //++v1.4

	forEach();
	release();  //++v1.4

	theID=itsFoundID;
	TRACE("Returned handle=" << theID)
	TRACE(((itsFoundFlag) ? "Found" : "Not found"))
	TRACE("Registry::lookup - end")
	return itsFoundFlag;
}

MessageQueue* Registry::lookup(MQHANDLE theID)
{
	TRACE("Registry::lookup - start")
	TRACE("Search=" << theID)
	if(isShuttingDown())
	{
		TRACE("Registry::lookup: Action aborted on shutdown")
		return NULL;
	}
	
	itsAction=Registry::LOOKUP1;
	itsFoundFlag=false;
	itsIDToFind=theID;	
	itsMessageQueue=NULL;

	wait();  //++v1.4

	forEach();
	release();  //++v1.4


	TRACE(((itsFoundFlag) ? "Found" : "Not found"))
	TRACE("Registry::lookup - end")
	return itsMessageQueue;
}

bool Registry::isStillAvailable(MQHANDLE theTarget)
{
	TRACE("Registry::isAvailable - start")
	TRACE("Handle=" << theTarget)
	if(isShuttingDown())
	{
		TRACE("Registry::isStillAvailable: Action aborted on shutdown")
		return false;
	}

	bool ret=false;
	wait();  //++v1.4

	MessageQueue* aQueue=(MessageQueue*)at(theTarget);
	release();  //++v1.4

	if(aQueue!=0)
	{
		TRACE("MessageQueue found with name=" << aQueue->getName())
		if(aQueue->isRunning())
		{
			TRACE("State=Running")
			ret=true;
		}
		else
		{
			TRACE("State=Not running")
		}	
	}		
	TRACE(((ret) ? "Available" : "Not available"))
	TRACE("Registry::isAvailable - end")
	return ret;
}

void Registry::remove(MessageQueue* theQueue)
{
	TRACE("Registry::remove - start")
	TRACE("Queue name=" << theQueue->getName())
	if(isShuttingDown())
	{
		TRACE("Registry::remove: Action aborted on shutdown")
		return;
	}

	itsAction=Registry::REMOVE;
	itsMessageQueue=theQueue;
	wait();  //++v1.4

	forEach();
	release();  //++v1.4

	TRACE("Registry::remove - end")
}

void Registry::post(MQHANDLE theTarget,Message* theMessage)
{
	TRACE("Registry::post - start")
	if(isShuttingDown())
	{
		TRACE("Registry::post: Action aborted on shutdown")
		return;
	}
	
	wait();  //++v1.4

	MessageQueue* aQueue=(MessageQueue*)at(theTarget);
	release();  //++v1.4

	if(aQueue!=0)
		aQueue->post(theMessage);
	TRACE("Registry::post - end")
}

void Registry::broadcast(Message* theMessage)
{
	TRACE("Registry::broadcast - start")
	if(isShuttingDown())
	{
		TRACE("Registry::broadcast: Action aborted on shutdown")
		return;
	}

	itsAction=Registry::BROADCAST;
	itsMessage=theMessage;
	wait();  //++v1.4

	forEach();
	release();  //++v1.4

	delete theMessage;
	TRACE("Registry::broadcast - end")
}

void Registry::dump()
{
	TRACE("Registry::dump - start")
	LOG("Start of registry dump:")
	itsAction=Registry::DUMP;
	wait();
	forEach();
	release();
	LOG("End of dump")
	TRACE("Registry::dump - end")
}

bool Registry::onIteration(LinkedElement* theElement)
{
	TRACE("Registry::onIteration - start")
	bool ret=true;
	MessageQueue* aQueue=(MessageQueue*)theElement->getObject();
	TRACE("Queue name=" << aQueue->getName())

	switch(itsAction)
	{
		case Registry::REMOVE:
			if(itsMessageQueue==aQueue)
			{
				unset(aQueue->getID());
				TRACE(aQueue->getName() << " removed from registry")
				theElement->remove();
				delete theElement;
				itsElementCount--;
				ret=false;
			}
			break;

		case Registry::BROADCAST:
			{
				Message* aMessage=itsMessage->clone();
				if(aMessage!=NULL && aQueue->getID()!=aMessage->getSender()) // v1.5

					aQueue->post(aMessage);
				TRACE("Message sent to" << aQueue->getName())
			}
			break;

		case Registry::LOOKUP:
			if(aQueue->is(itsQueueToLookup.c_str(),itsFoundID))
			{
				itsFoundFlag=true;
				ret=false;
				TRACE(aQueue->getName() << " found with ID=" << aQueue->getID())
			}
			break;

		case Registry::LOOKUP1:
			if(aQueue->getID()==itsIDToFind)
			{
				itsFoundFlag=true;
				ret=false;
				itsMessageQueue=aQueue;
				TRACE(aQueue->getName() << " found with ID=" << aQueue->getID())
			}
			break;


		case Registry::GARBAGE_COLLECTION:
			if(!aQueue->isRunning())
			{
				string msg=string("Thread ")+aQueue->getName()+string(" not running. Removed from registry.");
				WARNING(msg.c_str())
				TRACE(aQueue->getName() << " not running. Removed from registry")
				unset(aQueue->getID());
				theElement->remove();
				delete theElement;
				itsElementCount--;
			}
			break;

		case Registry::DUMP:
			LOG(aQueue->getName())
			break;
	}
	
	TRACE(((ret) ? "Continue" : "Break loop"))
	TRACE("Registry::onIteration - end")
	return ret;
}				

void Registry::deleteObject(void* theObject)  //++ v1.5

{
	delete (MessageQueue*)theObject; 
}




syntax highlighted by Code2HTML, v. 0.9.1