///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007 Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#define SILENT
#include "Trace.h"
#include "MessageQueue.h"
Registry* MessageQueue::itsRegistry=NULL;
Decoupler* Decoupler::itsDefaultDecoupler=NULL;
bool Message::is(const char* theName)
{
TRACE("Message::is - start")
TRACE("Name=" << theName)
TRACE("Current message class=" << getClass())
bool ret=false;
if(itsClassName==theName)
{
ret=true;
TRACE("Match")
}
TRACE("Message::is - end")
return ret;
}
void MessageQueue::add(MessageQueue* theQueue)
{
TRACE("MessageQueue::add(static) - start")
if(itsRegistry==NULL)
itsRegistry=new Registry("DefaultRegistry");
itsRegistry->add(theQueue);
TRACE(theQueue->getName() << " added to registry with ID=" << theQueue->getID())
TRACE("MessageQueue::add(static) - end")
}
bool MessageQueue::lookup(const char* theName,MQHANDLE& theID)
{
TRACE("MessageQueue::lookup(static) - start")
bool ret=false;
theID=-1;
if(itsRegistry!=NULL)
ret=itsRegistry->lookup(theName,theID);
TRACE("Returned handle=" << theID)
TRACE(((ret) ? "Found" : "Not found"))
TRACE("MessageQueue::lookup(static) - end")
return ret;
}
MessageQueue* MessageQueue::lookup(MQHANDLE theID)
{
TRACE("MessageQueue::lookup(static) - start")
MessageQueue* ret=NULL;
if(itsRegistry!=NULL)
ret=itsRegistry->lookup(theID);
TRACE("MessageQueue::lookup(static) - end")
return ret;
}
bool MessageQueue::isStillAvailable(MQHANDLE theTarget)
{
TRACE("MessageQueue::isStillAvailable(static) - start")
bool ret=false;
if(itsRegistry!=NULL)
ret=itsRegistry->isStillAvailable(theTarget);
TRACE(((ret) ? "Available" : "Not available"))
TRACE("MessageQueue::isStillAvailable(static) - end")
return ret;
}
void MessageQueue::remove(MessageQueue* theQueue)
{
TRACE("MessageQueue::remove(static) - start")
TRACE("Target=" << theQueue->getName());
if(itsRegistry!=NULL)
itsRegistry->remove(theQueue);
TRACE("MessageQueue::remove(static) - end")
}
void MessageQueue::post(MQHANDLE theTarget,Message* theMessage)
{
TRACE("MessageQueue::post(static) - start")
TRACE("Target=" << theTarget);
if(itsRegistry!=NULL)
itsRegistry->post(theTarget,theMessage);
TRACE("MessageQueue::post(static) - end")
}
void MessageQueue::broadcast(Message* theMessage)
{
TRACE("MessageQueue::broadcast(static) - start")
if(itsRegistry!=NULL)
itsRegistry->broadcast(theMessage);
TRACE("MessageQueue::broadcast(static) - end")
}
void MessageQueue::dump()
{
TRACE("MessageQueue::dump(static) - start")
if(itsRegistry!=NULL)
itsRegistry->dump();
TRACE("MessageQueue::dump(static) - end")
}
void MessageQueue::waitForCompletion()
{
TRACE("MessageQueue::waitForCompletion - start")
if(itsRegistry!=NULL)
{
delete itsRegistry;
itsRegistry=NULL;
}
TRACE("MessageQueue::waitForCompletion - end")
}
MessageQueue::MessageQueue(const char* theThreadName) : Thread(theThreadName)
{
TRACE("MessageQueue constructor - start")
TRACE("Thread name=" << theThreadName)
start();
add(this);
TRACE("MessageQueue constructor - end")
}
MessageQueue::~MessageQueue()
{
TRACE("MessageQueue destructor - start")
stop(false); // ++ v1.5
remove(this);
free();
TRACE("MessageQueue destructor - end")
}
bool MessageQueue::is(const char* theName,MQHANDLE& theID)
{
TRACE("MessageQueue::is - start")
TRACE("Name=" << theName)
TRACE("Current queue=" << getName())
bool ret=false;
theID=-1;
if(Thread::is(theName))
{
theID=itsID;
ret=true;
}
TRACE(((ret) ? "Found" : "Not found"))
TRACE("MessageQueue::is - end")
return ret;
}
void MessageQueue::flush()
{
TRACE("MessageQueue::flush - start")
wait();
free();
release();
TRACE("MessageQueue::flush - end")
}
void MessageQueue::post(Message* theMessage)
{
TRACE("MessageQueue::post - start")
if(isShuttingDown())
{
delete theMessage;
TRACE("MessageQueue::post: Action aborted on shutdown")
return;
}
try
{
wait();
push(theMessage);
if(isSuspended()==true)
{
resume();
}
release();
}
catch(Exception& ex)
{
release();
onException(ex);
}
TRACE("MessageQueue::post - end")
}
void MessageQueue::shutdown()
{
TRACE("MessageQueue::shutdown - start")
itsRunningFlag=false;
if(isSuspended()==true)
resume();
TRACE("MessageQueue::shutdown - end")
}
void MessageQueue::run()
{
TRACE("MessageQueue::run - start")
TRACE("Thread name=" << getName())
while(true)
{
TESTCANCEL
if(m_hThread!=0)
{
try
{
while(true)
{
TESTCANCEL
wait();
TESTCANCEL
if(isEmpty())
{
release();
break;
}
TRACE("MessageQueue::run - dequeue a new message")
TRACE("Thread name=" << getName())
Message* aMessage=(Message*)pop();
release();
TESTCANCEL
if(!isShuttingDown())
onMessage(aMessage);
delete aMessage;
}
TESTCANCEL
suspend();
}
catch(Exception& ex)
{
release();
onException(ex);
}
catch(...)
{
release();
DISPLAY("MessageQueue::run(" << getName() << ") : Unhandled exception")
}
}
}
TRACE("MessageQueue::run - end")
}
void MessageQueue::onException(Exception& ex)
{
DISPLAY("MessageQueue::run(" << getName() << ") : " << ex.getMessage().c_str())
}
void Decoupler::deferredPost(MQHANDLE theTarget,Message* theMessage)
{
TRACE("Decoupler::deferredPost(static) - start")
if(itsDefaultDecoupler==NULL)
itsDefaultDecoupler=new Decoupler("DefaultDecoupler");
if(!Thread::isShuttingDown()) // Avoid use of itsDefultDecoupler during shutdown cleanup
itsDefaultDecoupler->post(theTarget,theMessage);
TRACE("Decoupler::deferredPost(static) - end")
}
void Decoupler::deferredBroadcast(Message* theMessage)
{
TRACE("Decoupler::deferredBroadcast(static) - start")
if(itsDefaultDecoupler==NULL)
itsDefaultDecoupler=new Decoupler("DefaultDecoupler");
if(!Thread::isShuttingDown()) // Avoid use of itsDefultDecoupler during shutdown cleanup
itsDefaultDecoupler->post(0,theMessage);
TRACE("Decoupler::deferredBroadcast(static) - end")
}
void Decoupler::post(MQHANDLE theTarget,Message* theMessage)
{
TRACE("Decoupler::post - start")
MessageQueue::post(new DeferredMessage(theTarget,theMessage));
TRACE("Decoupler::post - end")
}
void Decoupler::onMessage(Message* theMessage)
{
TRACE("Decoupler::onMessage - start")
if(theMessage->is("DeferredMessage"))
{
DeferredMessage* aMessage=(DeferredMessage*)theMessage;
if(aMessage->getTarget()==0)
MessageQueue::broadcast(aMessage->getMessage());
else
MessageQueue::post(aMessage->getTarget(),aMessage->getMessage());
}
TRACE("Decoupler::onMessage - end")
}
syntax highlighted by Code2HTML, v. 0.9.1