///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007 Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#define SILENT
#include "RequestReply.h"
#include "Logger.h"
#include <string>
#include <strstream>
using namespace std;
#define REMOTE_OK "OK:"
#define REMOTE_EXCEPTION "EXCEPTION:"
#define REMOTE_TIMEOUT 5
#define RETRYMAX 5
#define RETRYLOOKUP 3
Client::Client(const char* theName, const char* theTarget)
:Observer(theName)
{
TRACE("Client::Client - start")
TRACE("Queue name=" << getName())
itsProxy=0;
itsServer=0;
itsMsgCnt=0;
itsHost.empty();
itsPort=0;
itsTarget=theTarget;
itsConnected=false;
itsMessage=NULL;
itsSendTime=0;
itsFailoverCnt=0;
itsRetryCount=0;
bool res=MessageQueue::lookup(theTarget,itsProxy);
if(!res)
throw ThreadException("Local service not started");
itsServer=itsProxy;
itsConnected=true;
SCHEDULE(this,500);
TRACE("Client::Client - end")
}
Client::Client(const char* theName, const char* theHost,int thePort, const char* theTarget)
:Observer(theName)
{
TRACE("Client::Client - start")
TRACE("Queue name=" << getName())
itsProxy=0;
itsServer=0;
itsMsgCnt=0;
itsHost=theHost;
itsPort=thePort;
itsTarget=theTarget;
itsConnected=false;
itsMessage=NULL;
itsSendTime=0;
itsFailoverCnt=0;
itsRetryCount=0;
SCHEDULE(this,500);
lookup();
TRACE("Client::Client - end")
}
Client::~Client()
{
TRACE("Client::~Client - start")
if(itsMessage!=NULL)
delete itsMessage;
for (vector<FailoverEntry*>::iterator i = itsFailoverList.begin(); i < itsFailoverList.end(); ++i)
delete *i;
itsFailoverList.clear();
TRACE("Client::~Client - end")
}
bool Client::test(const char* theHost,int thePort, const char* theTarget)
{
TRACE("Client::test - start")
bool ret=false;
wait();
ret=(itsHost.compare(theHost)==0)&&(thePort==itsPort)&&(itsTarget.compare(theTarget)==0);
release();
TRACE("Client::test - end")
return ret;
}
bool Client::isConnected()
{
wait();
TRACE("Client::isConnected - start")
bool ret=false;
if(itsConnected==false && itsProxy==0) // Never connected
ret=true; // Allow the client to estabilish the connection
else if(itsConnected==true && isStillAvailable(itsProxy)) // Continue to be connected
ret=true;
else
ret=false; // Connection lost
release();
TRACE("Client::isConnected - end")
return ret;
}
void Client::setTopic(const char* theTopic)
{
wait();
TRACE("Client::setTopic - start")
itsTopic=theTopic;
release();
TRACE("Client::setTopic - end")
}
void Client::addFailoverHost(char* theHost,int thePort)
{
TRACE("Client::addFailoverHost - start")
wait(); //++v1.4
FailoverEntry* anEntry=new FailoverEntry;
anEntry->host=theHost;
anEntry->port=thePort;
itsFailoverList.push_back(anEntry);
release(); //++v1.4
TRACE("Client::addFailoverHost - end")
}
void Client::lookup(bool findHost)
{
TRACE("Client::lookup - start")
itsRetryCount=0;
if(itsFailoverList.empty())
{
TRACE("Start lookup default host")
if(itsHost.size()==0)
{
bool res=MessageQueue::lookup(itsTarget.c_str(),itsProxy);
if(res)
{
itsServer=itsProxy;
itsConnected=true;
}
}
else
{
MessageProxyFactory::lookupAt(itsHost.c_str(),itsPort,itsTarget.c_str(),this);
}
}
else
{
if(findHost==true)
{
++itsFailoverCnt;
if(itsFailoverCnt > itsFailoverList.size())
itsFailoverCnt=0;
}
if(itsFailoverCnt==0)
{
TRACE("Start lookup default host")
if(itsHost.size()==0)
{
bool res=MessageQueue::lookup(itsTarget.c_str(),itsProxy);
if(res)
{
itsServer=itsProxy;
itsConnected=true;
}
}
else
{
MessageProxyFactory::lookupAt(itsHost.c_str(),itsPort,itsTarget.c_str(),this);
}
}
else
{
WARNING("Start to lookup an alternative host")
FailoverEntry* anEntry=itsFailoverList[itsFailoverCnt-1];
MessageProxyFactory::lookupAt(anEntry->host.c_str(),anEntry->port,itsTarget.c_str(),this);
}
}
TRACE("Client::lookup - end")
}
void Client::onLookup(LookupReplyMessage* theMessage)
{
TRACE("Client::onLookup - start")
itsRetryCount=0;
if(itsConnected==false && !theMessage->isFailed())
{
itsRetryCount=0;
itsServer=theMessage->getHandle();
itsProxy=theMessage->getSender();
itsConnected=true;
LOG("Remote thread lookup ok.")
if(itsMessage!=NULL)
{
LOG("Transmition of queued message")
postToProxy();
}
}
TRACE("Client::onLookup - end")
}
void Client::onWakeup(Wakeup* theMessage)
{
TRACE("Client::onWakeup - start")
if(itsConnected==false || (itsConnected==true && !isStillAvailable(itsProxy)))
{
itsConnected=false;
TRACE("Retry count=" << itsRetryCount)
if(++itsRetryCount>RETRYMAX) // ++ v1.2
{
WARNING("Lost peer connection")
if(itsMessage!=NULL)
{
reset();
fail("Lost connection");
}
itsRetryCount=0;
}
else if(itsRetryCount>RETRYLOOKUP) // ++ v1.4
{
TRACE("Try to lookup service")
lookup(true);
}
}
else if(itsMessage!=NULL && (Timer::time() - itsSendTime > REMOTE_TIMEOUT))
{
if(++itsRetryCount>RETRYMAX) // ++ v1.2
{
WARNING("Peer timeout")
reset();
fail("Timeout");
}
else
{
WARNING("Try to retransmit last message")
postToProxy();
}
}
TRACE("Client::onWakeup - end")
}
void Client::postToProxy()
{
TRACE("Client::postToProxy - start")
if(itsMessage!=NULL) // ++ v1.5
{
NetworkMessage* aMessage=(NetworkMessage*)itsMessage->clone(); //++ v1.5
aMessage->setSender(getID());
aMessage->setTarget(itsServer);
aMessage->setTopic(itsTopic);
itsSendTime=Timer::time();
post(itsProxy,aMessage);
}
TRACE("Client::postToProxy - end")
}
bool Client::sendMessage(string theBuffer) //++v1.4
{
TRACE("Client::sendMessage - start")
wait();
bool ret=send(theBuffer);
release();
TRACE("Client::sendMessage - end")
return ret;
}
bool Client::send(string theBuffer)
{
TRACE("Client::send - start")
bool ret=false;
if(itsMessage==NULL)
{
itsMessage=new NetworkMessage(theBuffer);
itsMessage->setSender(getID());
itsMessage->setSequenceNumber(itsMsgCnt);
itsMessage->setTopic(itsTopic);
if(itsConnected==true && isStillAvailable(itsProxy))
{
TRACE("Already connected. Immediate posting.")
postToProxy();
}
ret=true;
}
else
{
WARNING("Client::send : overlaying request during transmition")
}
TRACE("Client::send - end")
return ret;
}
NetworkMessage* Client::onRequest(NetworkMessage* theMessage)
{
TRACE("Client::onRequest - start")
if(theMessage->getSequenceNumber()==itsMsgCnt)
{
reset(); // ++ v1.2
string response=theMessage->get();
if(response.substr(0,sizeof(REMOTE_OK)-1).compare(REMOTE_OK)==0)
{
TRACE("Service OK")
delete itsMessage;
itsMessage=NULL;
success(response.substr(sizeof(REMOTE_OK)-1,string::npos));
}
else if(response.substr(0,sizeof(REMOTE_EXCEPTION)-1).compare(REMOTE_EXCEPTION)==0)
{
WARNING((string("Service Error/Exception='")+ response + string("'")).c_str())
delete itsMessage;
itsMessage=NULL;
fail(response.substr(sizeof(REMOTE_EXCEPTION)-1,string::npos));
}
else
{
WARNING("Client::onRequest: skipped message with bad message header")
}
}
else
{
WARNING("Client::onRequest: skipped message with bad sequence number")
}
TRACE("Client::onRequest - end")
return NULL; // No reply
}
void Client::reset() // ++ v1.2
{
TRACE("Client::reset - start")
delete itsMessage;
itsMessage=NULL;
itsSendTime=0;
itsRetryCount=0;
itsMsgCnt++;
TRACE("Client::reset - end")
}
Server::Server(const char* theName) : Observer(theName)
{
TRACE("Server::Server - start")
TRACE("Server::Server - end")
}
Server::~Server()
{
TRACE("Server::~Server - start")
TRACE("Server::~Server - end")
}
NetworkMessage* Server::onRequest(NetworkMessage* theMessage)
{
TRACE("Server::onRequest - start")
NetworkMessage* aMessage=NULL;
try
{
TRACE("Service completed with success")
string aBuffer=string(REMOTE_OK) + service(theMessage->get());
aMessage=new NetworkMessage(aBuffer);
}
catch(Exception& exc)
{
WARNING((string("Exception=") + exc.getMessage()).c_str())
ostrstream aStream;
aStream << REMOTE_EXCEPTION << exc.getMessage().c_str() << ends;
char* aString=aStream.str();
int aLen=strlen(aString)+1;
aMessage=new NetworkMessage(aString,aLen);
delete [] aString;
}
catch(...)
{
CRITICAL("Service returns unhandled exception")
ostrstream aStream;
aStream << REMOTE_EXCEPTION << "Unhandled exception" << ends;
char* aString=aStream.str();
int aLen=strlen(aString)+1;
aMessage=new NetworkMessage(aString,aLen);
delete [] aString;
}
TRACE("Server::onRequest - end")
return aMessage; // Send reply message
}
syntax highlighted by Code2HTML, v. 0.9.1