///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007 Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#define SILENT
#include "Router.h"
#include "Logger.h"
#include <string>
#include <strstream>
using namespace std;
//#define DISPLAY_TRAFFIC
#ifdef DISPLAY_TRAFFIC
#define DISPLAYC2S { cout << ">"; }
#define DISPLAYS2C { cout << "<"; }
#define DISPLAYB2S { cout << "*"; }
#else
#define DISPLAYC2S
#define DISPLAYS2C
#define DISPLAYB2S
#endif
RemoteRouter::RemoteRouter(const char* theName, const char* theHost,int thePort, const char* theTarget)
:MessageQueue(theName)
{
TRACE("RemoteRouter::RemoteRouter - start")
TRACE("Queue name=" << getName())
itsSeqNum=0;
itsProxy=0;
itsServer=0;
itsHost=theHost;
itsPort=thePort;
itsTarget=theTarget;
itsConnected=false;
for(int i=0; i < MAXSESSIONS; i++)
{
itsSessions[i].proxy=0;
itsSessions[i].client=0;
itsSessions[i].seqnum=0;
}
SCHEDULE(this,5000);
MessageProxyFactory::lookupAt(itsHost.c_str(),itsPort,itsTarget.c_str(),this);
TRACE("RemoteRouter::RemoteRouter - end")
}
RemoteRouter::~RemoteRouter()
{
TRACE("RemoteRouter::~RemoteRouter - start")
TRACE("RemoteRouter::~RemoteRouter - end")
}
void RemoteRouter::onLookup(LookupReplyMessage* theMessage)
{
TRACE("RemoteRouter::onLookup - start")
if(itsConnected==false && !theMessage->isFailed())
{
itsServer=theMessage->getHandle();
itsProxy=theMessage->getSender();
itsConnected=true;
LOG("Remote thread lookup ok.")
}
TRACE("RemoteRouter::onLookup - end")
}
void RemoteRouter::onWakeup(Wakeup* theMessage)
{
TRACE("RemoteRouter::onWakeup - start")
if(itsConnected==false || (itsConnected==true && !isStillAvailable(itsProxy)))
{
itsConnected=false;
TRACE("Try to lookup service")
MessageProxyFactory::lookupAt(itsHost.c_str(),itsPort,itsTarget.c_str(),this);
}
TRACE("RemoteRouter::onWakeup - end")
}
void RemoteRouter::onMessage(Message* theMessage)
{
TRACE("RemoteRouter::onMessage - start")
TRACE("Thread name=" << getName())
try
{
if(theMessage->is("Wakeup") && !isShuttingDown())
{
TRACE("Call onWakeup")
onWakeup((Wakeup*)theMessage);
}
else if(theMessage->is("LookupReplyMessage") && !isShuttingDown())
{
TRACE("Call onLookup")
onLookup((LookupReplyMessage*)theMessage);
}
else if(theMessage->is("NetworkMessage") && itsConnected && !isShuttingDown())
{
NetworkMessage* aRequest=(NetworkMessage*)theMessage;
if(itsProxy==aRequest->getSender() && itsServer==aRequest->getRemoteSender() && !aRequest->isBroadcasting())
{
TRACE("Handling message from server")
DISPLAYS2C
int anIndex=aRequest->getSequenceNumber() % MAXSESSIONS;
if(itsSessions[anIndex].proxy!=0 && isStillAvailable(itsSessions[anIndex].proxy))
{
_TIMEVAL current=Timer::timeExt();
long elapsed=Timer::subtractMillisecs(&itsSessions[anIndex].time,¤t);
if(elapsed < SESSION_EXPIRATION_TIME)
{
NetworkMessage* aReply=(NetworkMessage*)aRequest->clone();
aReply->setSender(getID());
aReply->setRemoteSender(0);
aReply->setTarget(itsSessions[anIndex].client);
aReply->setSequenceNumber(itsSessions[anIndex].seqnum);
post(itsSessions[anIndex].proxy,aReply);
}
itsSessions[anIndex].proxy=0;
itsSessions[anIndex].client=0;
itsSessions[anIndex].seqnum=0;
}
}
else if(!aRequest->isBroadcasting() && !isShuttingDown())
{
TRACE("Handling message from client")
if(itsConnected==true)
{
DISPLAYC2S
unsigned short anIndex=itsSeqNum % MAXSESSIONS;
itsSessions[anIndex].proxy=aRequest->getSender();
itsSessions[anIndex].client=aRequest->getRemoteSender();
itsSessions[anIndex].seqnum=aRequest->getSequenceNumber();
itsSessions[anIndex].time=Timer::timeExt();
NetworkMessage* aNewRequest=(NetworkMessage*)aRequest->clone();
aNewRequest->setSender(getID());
aNewRequest->setRemoteSender(0);
aNewRequest->setTarget(itsServer);
aNewRequest->setSequenceNumber(itsSeqNum);
post(itsProxy,aNewRequest);
itsSeqNum++;
}
}
}
}
catch(Exception& ex)
{
DISPLAY("RemoteRouter::onMessage(" << getName() << ") : " << ex.getMessage().c_str())
}
catch(...)
{
DISPLAY("RemoteRouter::onMessage(" << getName() << ") : Unhandled exception")
}
TRACE("RemoteRouter::onMessage - end")
}
LocalRouter::LocalRouter(const char* theName, const char* theTarget)
:MessageQueue(theName)
{
TRACE("LocalRouter::LocalRouter - start")
TRACE("Queue name=" << getName())
itsSeqNum=0;
if(!lookup(theTarget,itsServer))
throw ThreadException("Lookup of local service failed");
for(int i=0; i < MAXSESSIONS; i++)
{
itsSessions[i].proxy=0;
itsSessions[i].client=0;
itsSessions[i].seqnum=0;
}
TRACE("LocalRouter::LocalRouter - end")
}
LocalRouter::~LocalRouter()
{
TRACE("LocalRouter::~LocalRouter - start")
TRACE("LocalRouter::~LocalRouter - end")
}
void LocalRouter::onMessage(Message* theMessage)
{
TRACE("LocalRouter::onMessage - start")
TRACE("Thread name=" << getName())
try
{
if(theMessage->is("NetworkMessage") && !isShuttingDown())
{
NetworkMessage* aRequest=(NetworkMessage*)theMessage;
if(itsServer==aRequest->getSender() && aRequest->getRemoteSender()==0 && !aRequest->isBroadcasting())
{
TRACE("Handling message from server")
int anIndex=aRequest->getSequenceNumber() % MAXSESSIONS;
if(itsSessions[anIndex].proxy!=0 && isStillAvailable(itsSessions[anIndex].proxy))
{
DISPLAYS2C
_TIMEVAL current=Timer::timeExt();
long elapsed=Timer::subtractMillisecs(&itsSessions[anIndex].time,¤t);
if(elapsed < SESSION_EXPIRATION_TIME)
{
NetworkMessage* aReply=(NetworkMessage*)aRequest->clone();
aReply->setSender(getID());
aReply->setRemoteSender(0);
aReply->setTarget(itsSessions[anIndex].client);
aReply->setSequenceNumber(itsSessions[anIndex].seqnum);
post(itsSessions[anIndex].proxy,aReply);
}
itsSessions[anIndex].proxy=0;
itsSessions[anIndex].client=0;
itsSessions[anIndex].seqnum=0;
}
}
else if (!aRequest->isBroadcasting() && !isShuttingDown())
{
TRACE("Handling message from client")
DISPLAYC2S
unsigned short anIndex=itsSeqNum % MAXSESSIONS;
itsSessions[anIndex].proxy=aRequest->getSender();
itsSessions[anIndex].client=aRequest->getRemoteSender();
itsSessions[anIndex].seqnum=aRequest->getSequenceNumber();
itsSessions[anIndex].time=Timer::timeExt();
NetworkMessage* aNewRequest=(NetworkMessage*)aRequest->clone();
aNewRequest->setSender(getID());
aNewRequest->setRemoteSender(getID());
aNewRequest->setTarget(itsServer);
aNewRequest->setSequenceNumber(itsSeqNum);
post(itsServer,aNewRequest);
itsSeqNum++;
}
}
}
catch(Exception& ex)
{
DISPLAY("LocalRouter::onMessage(" << getName() << ") : " << ex.getMessage().c_str())
}
catch(...)
{
DISPLAY("LocalRouter::onMessage(" << getName() << ") : Unhandled exception")
}
TRACE("LocalRouter::onMessage - end")
}
Switch::Switch(const char* theName) : MessageProxy(theName)
{
TRACE("Switch::Switch - start")
itsActiveRouter=NULL;
itsSeqNum=0;
for(int i=0; i < MAXSESSIONS; i++)
{
itsSessions[i].proxy=0;
itsSessions[i].client=0;
itsSessions[i].seqnum=0;
}
TRACE("Switch::Switch - end")
}
Switch::~Switch()
{
TRACE("Switch::~Switch - start")
if(!isShuttingDown())
{
for(vector<MessageQueue*>::iterator i = itsRouters.begin(); i < itsRouters.end(); ++i)
{
MessageQueue* aQueue=*i;
aQueue->shutdown();
}
}
TRACE("Switch::~Switch - end")
}
MQHANDLE Switch::addRouting(const char* theHost,int thePort, const char* theTarget)
{
TRACE("Switch::addRouting - start")
wait();
string aName="RemoteRouter("+string(getName())+","+string(theHost)+","+string(theTarget)+")";
bool found=false;
MQHANDLE anHandle=0;
if(itsRouters.size()>0)
{
for(vector<MessageQueue*>::iterator i = itsRouters.begin(); i < itsRouters.end(); ++i)
{
MessageQueue* aQueue=*i;
if(aName.compare(aQueue->getName())==0)
{
found=true;
anHandle=aQueue->getID();
break;
}
}
}
if(!found)
{
RemoteRouter* aRouter=new RemoteRouter(aName.c_str(),theHost,thePort,theTarget);
itsRouters.push_back(aRouter);
anHandle=aRouter->getID();
if(itsActiveRouter==NULL) itsActiveRouter=aRouter;
}
release();
TRACE("Switch::addRouting - end")
return anHandle;
}
MQHANDLE Switch::addRouting(const char* theTarget)
{
TRACE("Switch::addRouting - start")
wait();
string aName="LocalRouter("+string(getName())+","+string(theTarget)+")";
bool found=false;
MQHANDLE anHandle=0;
if(itsRouters.size()>0)
{
for(vector<MessageQueue*>::iterator i = itsRouters.begin(); i < itsRouters.end(); ++i)
{
MessageQueue* aQueue=*i;
if(aName.compare(aQueue->getName())==0)
{
found=true;
anHandle=aQueue->getID();
break;
}
}
}
if(!found)
{
LocalRouter* aRouter=new LocalRouter(aName.c_str(),theTarget);
itsRouters.push_back(aRouter);
anHandle=aRouter->getID();
if(itsActiveRouter==NULL) itsActiveRouter=aRouter;
}
release();
TRACE("Switch::addRouting - end")
return anHandle;
}
MQHANDLE Switch::addRouting(MessageQueue* theTarget)
{
TRACE("Switch::addRouting - start")
wait();
MQHANDLE anHandle=0;
if(theTarget!=NULL)
{
itsRouters.push_back(theTarget);
anHandle=theTarget->getID();
if(itsActiveRouter==NULL) itsActiveRouter=theTarget;
}
release();
TRACE("Switch::addRouting - end")
return anHandle;
}
void Switch::addRouting(const char* theTopic,MQHANDLE theHandle)
{
TRACE("Switch::addRouting - start")
wait();
bool found=false;
if(itsMapping.size()>0)
{
for(vector< pair<string,MQHANDLE> >::iterator i = itsMapping.begin(); i < itsMapping.end(); ++i)
{
pair<string,MQHANDLE> aPair=*i;
if(aPair.first.compare(theTopic)==0 && aPair.second==theHandle)
{
found=true;
break;
}
}
}
if(!found && itsRouters.size()>0)
{
for(vector<MessageQueue*>::iterator i = itsRouters.begin(); i < itsRouters.end(); ++i)
{
MessageQueue* aQueue=*i;
if(aQueue->getID()==theHandle)
{
pair<string,MQHANDLE> aPair;
aPair.first=theTopic;
aPair.second=theHandle;
itsMapping.push_back(aPair);
TRACE("Mapping rule added")
break;
}
}
}
release();
TRACE("Switch::addRouting - end")
}
void Switch::resetRouting()
{
TRACE("Switch::resetRouting - start")
wait();
if(!isShuttingDown())
{
if(itsRouters.size()>0)
{
for(vector<MessageQueue*>::iterator i = itsRouters.begin(); i < itsRouters.end(); ++i)
{
MessageQueue* aQueue=*i;
aQueue->shutdown();
}
}
}
itsRouters.clear();
itsActiveRouter=NULL;
itsMapping.clear();
release();
TRACE("Switch::resetRouting - end")
}
void Switch::removeRouting(MQHANDLE theHandle)
{
TRACE("Switch::removeRouting - start")
wait();
if(!isShuttingDown())
{
for(vector<MessageQueue*>::iterator i = itsRouters.begin(); i < itsRouters.end(); ++i)
{
MessageQueue* aQueue=*i;
if(aQueue->getID()==theHandle)
{
aQueue->shutdown();
itsRouters.erase(i);
if(itsActiveRouter==aQueue) itsActiveRouter=NULL;
TRACE("Router removed")
break;
}
}
}
release();
TRACE("Switch::removeRouting - end")
}
void Switch::activate(MQHANDLE theHandle,const char* theTopic)
{
TRACE("Switch::activate - start")
wait();
for(vector<MessageQueue*>::iterator i = itsRouters.begin(); i < itsRouters.end(); ++i)
{
MessageQueue* aQueue=*i;
if(aQueue->getID()==theHandle)
{
itsActiveRouter=aQueue;
itsTopic=theTopic;
TRACE("Router activated")
break;
}
}
release();
TRACE("Switch::activate - end")
}
void Switch::addAlias(const char* theName)
{
TRACE("Switch::addAlias - start")
itsAlias.push_back(theName);
TRACE("Switch::addAlias - end")
}
bool Switch::is(const char* theName,MQHANDLE& theID)
{
TRACE("Switch::is - start")
TRACE("Its name=" << getName() << " Requested name=" << theName)
bool ret=false;
if(Thread::is(theName))
{
theID=itsID;
ret=true;
}
else if(itsAlias.size()>0)
{
for(vector<string>::iterator i = itsAlias.begin(); i < itsAlias.end(); ++i)
{
string& aString=*i;
if(aString.compare(theName)==0)
{
theID=itsID;
ret=true;
break;
}
}
}
TRACE(((ret) ? "Found" : "Not found"))
TRACE("Switch::is - end")
return ret;
}
void Switch::onMessage(Message* theMessage)
{
TRACE("Switch::onMessage - start")
if(theMessage->is("NetworkMessage") && !isShuttingDown())
{
NetworkMessage* aRequest=(NetworkMessage*)theMessage;
bool found=false;
if(itsRouters.size()>0)
{
for(vector<MessageQueue*>::iterator i = itsRouters.begin(); i < itsRouters.end(); ++i)
{
MessageQueue* aQueue=*i;
if(aQueue->getID()==aRequest->getSender())
{
found=true;
break;
}
}
}
if(found && aRequest->getRemoteSender()==0 && !aRequest->isBroadcasting())
{
TRACE("Handling message from server")
int anIndex=aRequest->getSequenceNumber() % MAXSESSIONS;
if(itsSessions[anIndex].proxy!=0 && isStillAvailable(itsSessions[anIndex].proxy))
{
_TIMEVAL current=Timer::timeExt();
long elapsed=Timer::subtractMillisecs(&itsSessions[anIndex].time,¤t);
if(elapsed < SESSION_EXPIRATION_TIME)
{
NetworkMessage* aReply=(NetworkMessage*)aRequest->clone();
aReply->setSender(getID());
aReply->setRemoteSender(0);
aReply->setTarget(itsSessions[anIndex].client);
aReply->setSequenceNumber(itsSessions[anIndex].seqnum);
post(itsSessions[anIndex].proxy,aReply);
}
itsSessions[anIndex].proxy=0;
itsSessions[anIndex].client=0;
itsSessions[anIndex].server=0;
itsSessions[anIndex].seqnum=0;
}
}
else if (!found && !aRequest->isBroadcasting() && !isShuttingDown())
{
TRACE("Handling message from client")
bool fired=false;
if(itsMapping.size()>0)
{
for(vector< pair<string,MQHANDLE> >::iterator i = itsMapping.begin(); i < itsMapping.end(); ++i)
{
pair<string,MQHANDLE> aPair=*i;
if(aPair.first.compare(aRequest->getTopic())==0)
{
unsigned short anIndex=itsSeqNum % MAXSESSIONS;
itsSessions[anIndex].proxy=aRequest->getSender();
itsSessions[anIndex].client=aRequest->getRemoteSender();
itsSessions[anIndex].seqnum=aRequest->getSequenceNumber();
itsSessions[anIndex].server=aPair.second;
itsSessions[anIndex].time=Timer::timeExt();
NetworkMessage* aNewRequest=(NetworkMessage*)aRequest->clone();
aNewRequest->setSender(getID());
aNewRequest->setRemoteSender(getID());
aNewRequest->setTarget(aPair.second);
aNewRequest->setSequenceNumber(itsSeqNum);
post(aPair.second,aNewRequest);
itsSeqNum++;
fired=true;
TRACE("Sent message with topic=" << aRequest->getTopic())
break;
}
}
}
if(!fired && itsActiveRouter!=NULL)
{
unsigned short anIndex=itsSeqNum % MAXSESSIONS;
itsSessions[anIndex].proxy=aRequest->getSender();
itsSessions[anIndex].client=aRequest->getRemoteSender();
itsSessions[anIndex].server=itsActiveRouter->getID();
itsSessions[anIndex].seqnum=aRequest->getSequenceNumber();
itsSessions[anIndex].time=Timer::timeExt();
NetworkMessage* aNewRequest=(NetworkMessage*)aRequest->clone();
if(itsTopic.size()>0) aNewRequest->setTopic(itsTopic);
aNewRequest->setSender(getID());
aNewRequest->setRemoteSender(getID());
aNewRequest->setTarget(itsActiveRouter->getID());
aNewRequest->setSequenceNumber(itsSeqNum);
itsActiveRouter->post(aNewRequest);
itsSeqNum++;
}
}
}
TRACE("Switch::onMessage - end")
}
string Switch::getConnectionAddress(MQHANDLE theCaller,int& thePort)
{
TRACE("Switch::getConnectionAddress - start")
string addr=LOCALHOST;
thePort=LOCALPORT;
if(!isShuttingDown())
{
for(unsigned i=0; i < MAXSESSIONS; i++)
{
if(itsSessions[i].server==theCaller)
{
MessageQueue* aQueue=lookup(itsSessions[i].proxy);
if(aQueue!=NULL)
{
if(string(aQueue->getName()).compare(MESSAGEPROXYHEADER)>=0)
{
MessageProxy* aProxy=(MessageProxy*)aQueue;
addr=aProxy->getConnectionAddress(getID(),thePort);
break;
}
}
}
}
}
TRACE("Switch::getConnectionAddress - end")
return addr;
}
LocalhostRouter::LocalhostRouter()
:MessageProxy((string(MESSAGEPROXYHEADER)+string(LOCALHOST)+",0)").c_str())
{
TRACE("LocalhostRouter::LocalhostRouter - start")
TRACE("LocalhostRouter::LocalhostRouter - end")
}
LocalhostRouter::~LocalhostRouter()
{
TRACE("LocalhostRouter::~LocalhostRouter - start")
TRACE("LocalhostRouter::~LocalhostRouter - end")
}
void LocalhostRouter::onMessage(Message* theMessage)
{
TRACE("LocalhostRouter::onMessage - start")
if(theMessage->is("NetworkMessage") && !isShuttingDown())
{
NetworkMessage* aMessage=(NetworkMessage*)theMessage;
if(!aMessage->isBroadcasting())
{
NetworkMessage* aNetworkMessage=(NetworkMessage*)aMessage->clone();
aNetworkMessage->setSender(getID());
aNetworkMessage->setRemoteSender(aMessage->getSender());
post(aMessage->getTarget(),aNetworkMessage);
TRACE("Message delivered")
}
}
else if(theMessage->is("LookupRequestMessage") && !isShuttingDown())
{
LookupRequestMessage* aMessage=(LookupRequestMessage*)theMessage;
MQHANDLE anHandle;
if(lookup(aMessage->getTarget().c_str(),anHandle))
{
LookupReplyMessage* aReply=new LookupReplyMessage(0,anHandle);
aReply->setSender(getID());
post(aMessage->getSender(),aReply);
}
else
{
LookupReplyMessage* aReply=new LookupReplyMessage();
aReply->setSender(getID());
post(aMessage->getSender(),aReply);
}
}
else if(theMessage->is("PingRequestMessage") && !isShuttingDown())
{
PingRequestMessage* aMessage=(PingRequestMessage*)theMessage;
PingReplyMessage* aNewMessage=new PingReplyMessage(aMessage->getSender());
aMessage->setSender(getID());
post(aMessage->getSender(),aNewMessage);
}
TRACE("LocalhostRouter::onMessage - end")
}
string LocalhostRouter::getConnectionAddress(MQHANDLE theHandle,int& thePort)
{
TRACE("LocalhostRouter::getConnectionAddress - start")
thePort=LOCALPORT;
TRACE("LocalhostRouter::getConnectionAddress - end")
return LOCALHOST;
}
syntax highlighted by Code2HTML, v. 0.9.1