///////////////////////////////////////////////////////////////////////////////

// MQ4CPP - Message queuing for C++

// Copyright (C) 2004-2007  Riccardo Pompeo (Italy)

//

// This library is free software; you can redistribute it and/or

// modify it under the terms of the GNU Lesser General Public

// License as published by the Free Software Foundation; either

// version 2.1 of the License, or (at your option) any later version.

//

// This library is distributed in the hope that it will be useful,

// but WITHOUT ANY WARRANTY; without even the implied warranty of

// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU

// Lesser General Public License for more details.

//

// You should have received a copy of the GNU Lesser General Public

// License along with this library; if not, write to the Free Software

// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

//


#define SILENT

#include "Trace.h"

#include "MessageQueue.h"


Registry* MessageQueue::itsRegistry=NULL;
Decoupler* Decoupler::itsDefaultDecoupler=NULL;	

bool Message::is(const char* theName)
{
	TRACE("Message::is - start")
	TRACE("Name=" << theName)
	TRACE("Current message class=" << getClass())
	bool ret=false;
	if(itsClassName==theName)
	{
		ret=true;
		TRACE("Match")
	}	
	TRACE("Message::is - end")
	return ret;
}

void MessageQueue::add(MessageQueue* theQueue)
{
	TRACE("MessageQueue::add(static) - start")
	if(itsRegistry==NULL)
		itsRegistry=new Registry("DefaultRegistry");	
	itsRegistry->add(theQueue);
	TRACE(theQueue->getName() << " added to registry with ID=" << theQueue->getID())	
	TRACE("MessageQueue::add(static) - end")
}

bool MessageQueue::lookup(const char* theName,MQHANDLE& theID)
{
	TRACE("MessageQueue::lookup(static) - start")
	bool ret=false;
	theID=-1;
	if(itsRegistry!=NULL)
		ret=itsRegistry->lookup(theName,theID);
	TRACE("Returned handle=" << theID)
	TRACE(((ret) ? "Found" : "Not found"))
	TRACE("MessageQueue::lookup(static) - end")
	return ret;
}

MessageQueue* MessageQueue::lookup(MQHANDLE theID)
{
	TRACE("MessageQueue::lookup(static) - start")
	MessageQueue* ret=NULL;
	if(itsRegistry!=NULL)
		ret=itsRegistry->lookup(theID);
	TRACE("MessageQueue::lookup(static) - end")
	return ret;
}

bool MessageQueue::isStillAvailable(MQHANDLE theTarget)
{
	TRACE("MessageQueue::isStillAvailable(static) - start")
	bool ret=false;
	if(itsRegistry!=NULL)
		ret=itsRegistry->isStillAvailable(theTarget);
	TRACE(((ret) ? "Available" : "Not available"))
	TRACE("MessageQueue::isStillAvailable(static) - end")
	return ret;
}

void MessageQueue::remove(MessageQueue* theQueue)
{
	TRACE("MessageQueue::remove(static) - start")
	TRACE("Target=" << theQueue->getName());
	if(itsRegistry!=NULL)
		itsRegistry->remove(theQueue);	
	TRACE("MessageQueue::remove(static) - end")
}

void MessageQueue::post(MQHANDLE theTarget,Message* theMessage)
{
	TRACE("MessageQueue::post(static) - start")
	TRACE("Target=" << theTarget);
	if(itsRegistry!=NULL)
		itsRegistry->post(theTarget,theMessage);	
	TRACE("MessageQueue::post(static) - end")
}

void MessageQueue::broadcast(Message* theMessage)
{
	TRACE("MessageQueue::broadcast(static) - start")
	if(itsRegistry!=NULL)
		itsRegistry->broadcast(theMessage);	
	TRACE("MessageQueue::broadcast(static) - end")
}
		
void MessageQueue::dump()
{
	TRACE("MessageQueue::dump(static) - start")
	if(itsRegistry!=NULL)
		itsRegistry->dump();	
	TRACE("MessageQueue::dump(static) - end")
}

void MessageQueue::waitForCompletion()
{ 
	TRACE("MessageQueue::waitForCompletion - start")
	if(itsRegistry!=NULL)
	{
		delete itsRegistry;
		itsRegistry=NULL;
	}
	TRACE("MessageQueue::waitForCompletion - end")
}

MessageQueue::MessageQueue(const char* theThreadName) : Thread(theThreadName)
{
	TRACE("MessageQueue constructor - start")
	TRACE("Thread name=" << theThreadName)
	start();
	add(this);
	TRACE("MessageQueue constructor - end")
}
	
MessageQueue::~MessageQueue() 
{
	TRACE("MessageQueue destructor - start")
	stop(false); // ++ v1.5

	remove(this);
	free();	
	TRACE("MessageQueue destructor - end")
}

bool MessageQueue::is(const char* theName,MQHANDLE& theID)
{
	TRACE("MessageQueue::is - start")
	TRACE("Name=" << theName)
	TRACE("Current queue=" << getName())
	bool ret=false;
	theID=-1;
	if(Thread::is(theName))
	{
		theID=itsID;
		ret=true;
	}	
	TRACE(((ret) ? "Found" : "Not found"))
	TRACE("MessageQueue::is - end")
	return ret;
}

void MessageQueue::flush()
{
	TRACE("MessageQueue::flush - start")
	wait();
	free();
	release();
	TRACE("MessageQueue::flush - end")
}

void MessageQueue::post(Message* theMessage)
{
	TRACE("MessageQueue::post - start")
	
	if(isShuttingDown())
	{
		delete theMessage;
		TRACE("MessageQueue::post: Action aborted on shutdown")
		return;
	}
		
	try 
	{
		wait();
		
		push(theMessage);

		if(isSuspended()==true)
		{
			resume();
		}

		release();
	}
	catch(Exception& ex) 
	{
		release();
		onException(ex);
	}
	
	TRACE("MessageQueue::post - end")		
}

void MessageQueue::shutdown()
{
	TRACE("MessageQueue::shutdown - start")		
	itsRunningFlag=false;
	if(isSuspended()==true)
		resume();
	TRACE("MessageQueue::shutdown - end")		
}

void MessageQueue::run() 
{
	TRACE("MessageQueue::run - start")
	TRACE("Thread name=" << getName())

	while(true)
	{
		TESTCANCEL					

		if(m_hThread!=0)
		{		
			try 
			{									
				while(true)
				{
					TESTCANCEL					
					wait();
					TESTCANCEL					

					if(isEmpty())
					{
						release();
						break;
					}
									
					TRACE("MessageQueue::run - dequeue a new message")
					TRACE("Thread name=" << getName())
					Message* aMessage=(Message*)pop();
					release();	
					
					TESTCANCEL
										
					if(!isShuttingDown())
						onMessage(aMessage);
						
					delete aMessage;
				}		
	
				TESTCANCEL							
				suspend();		
			}
			catch(Exception& ex) 
			{
				release();
				onException(ex);
			}
			catch(...)
			{
				release();
				DISPLAY("MessageQueue::run(" << getName() << ") : Unhandled exception")
			}			
		}	
	}
	
	TRACE("MessageQueue::run - end")		
}

void MessageQueue::onException(Exception& ex)
{
	DISPLAY("MessageQueue::run(" << getName() << ") : " << ex.getMessage().c_str())
}

void Decoupler::deferredPost(MQHANDLE theTarget,Message* theMessage)
{
	TRACE("Decoupler::deferredPost(static) - start")	
	if(itsDefaultDecoupler==NULL)
		itsDefaultDecoupler=new Decoupler("DefaultDecoupler");

	if(!Thread::isShuttingDown()) // Avoid use of itsDefultDecoupler during shutdown cleanup

		itsDefaultDecoupler->post(theTarget,theMessage);
	TRACE("Decoupler::deferredPost(static) - end")	
}

void Decoupler::deferredBroadcast(Message* theMessage)
{
	TRACE("Decoupler::deferredBroadcast(static) - start")	
	if(itsDefaultDecoupler==NULL)
		itsDefaultDecoupler=new Decoupler("DefaultDecoupler");

	if(!Thread::isShuttingDown()) // Avoid use of itsDefultDecoupler during shutdown cleanup

		itsDefaultDecoupler->post(0,theMessage);
	TRACE("Decoupler::deferredBroadcast(static) - end")	
}

void Decoupler::post(MQHANDLE theTarget,Message* theMessage)
{
	TRACE("Decoupler::post - start")	
	MessageQueue::post(new DeferredMessage(theTarget,theMessage));
	TRACE("Decoupler::post - end")	
}

void Decoupler::onMessage(Message* theMessage)
{
	TRACE("Decoupler::onMessage - start")	
	if(theMessage->is("DeferredMessage"))
	{
		DeferredMessage* aMessage=(DeferredMessage*)theMessage;
		if(aMessage->getTarget()==0)
			MessageQueue::broadcast(aMessage->getMessage());
		else	
			MessageQueue::post(aMessage->getTarget(),aMessage->getMessage());
	}
	TRACE("Decoupler::onMessage - end")	
}



syntax highlighted by Code2HTML, v. 0.9.1