///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007 Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#include <strstream>
using namespace std;
#define SILENT
#include "MessageProxy.h"
#include "Trace.h"
#include "Logger.h"
#include "GeneralHashFunctions.h"
#define SYNCVAL 0xbeef
#define MAX_CONNECTIONS 100
NetworkMessage::NetworkMessage(NetworkMessage& o) // ++ v1.5
:Message("NetworkMessage")
{
itsTopic=o.itsTopic;
itsBuffer=o.itsBuffer;
itsTarget=o.itsTarget;
itsSender=o.itsSender;
itsSeqNum=o.itsSeqNum;
itsUnsolicitedFlag=o.itsUnsolicitedFlag;
itsBroadcastFlag=o.itsBroadcastFlag;
}
NetworkMessage::NetworkMessage(char* theBuffer, unsigned short theLen)
:Message("NetworkMessage"),
itsTarget(0), itsRemoteSender(0), itsSeqNum(0),
itsUnsolicitedFlag(false), itsBroadcastFlag(false)
{
if(theLen > 0xFFFF - sizeof(NetworkMessage::NetworkMessageHeader))
throw ThreadException("NetworkMessage is exceding permitted size");
itsBuffer.assign(theBuffer,theLen);
}
NetworkMessage::NetworkMessage(string theBuffer)
:Message("NetworkMessage"),
itsTarget(0), itsRemoteSender(0),itsSeqNum(0),
itsUnsolicitedFlag(false), itsBroadcastFlag(false)
{
if(theBuffer.length() > 0xFFFF - sizeof(NetworkMessage::NetworkMessageHeader))
throw ThreadException("NetworkMessage is exceding permitted size");
itsBuffer=theBuffer;
}
string NetworkMessage::toString()
{
NetworkMessage::NetworkMessageHeader anHeader;
if(itsBuffer.length() > 0xFFFF - sizeof(NetworkMessage::NetworkMessageHeader))
throw ThreadException("NetworkMessage is exceding permitted size");
anHeader.sender=itsSender;
anHeader.seqnum=itsSeqNum; // ++ v1.1
anHeader.buflen=itsBuffer.length();
anHeader.topiclen=itsTopic.length(); // ++ v1.5
string aBuffer;
aBuffer.assign((char*)&anHeader,sizeof(anHeader));
aBuffer+=itsTopic; // ++ v1.5
aBuffer+=itsBuffer;
return aBuffer;
}
void NetworkMessage::toStream(ostream& theStream)
{
theStream.write(itsBuffer.c_str(),itsBuffer.length());
}
void NetworkMessage::code(Encription* theEncr)
{
itsBuffer=theEncr->code(itsBuffer);
}
void NetworkMessage::decode(Encription* theEncr)
{
itsBuffer=theEncr->decode(itsBuffer);
}
void NetworkMessage::inflate(Compression* theCompr)
{
itsBuffer=theCompr->inflate(itsBuffer);
}
void NetworkMessage::deflate(Compression* theCompr)
{
itsBuffer=theCompr->deflate(itsBuffer);
}
PingRequestMessage::PingRequestMessage(MQHANDLE theSenderID)
:Message("PingRequestMessage")
{
itsSender=theSenderID;
}
string PingRequestMessage::toString()
{
PingRequest anHeader;
anHeader.sender=itsSender;
string aBuffer;
aBuffer.assign((char*)&anHeader,sizeof(anHeader));
return aBuffer;
}
PingReplyMessage::PingReplyMessage(MQHANDLE theTarget)
:Message("PingReplyMessage")
{
itsTarget=theTarget;
}
LookupRequestMessage::LookupRequestMessage(const char* theName,MQHANDLE theSenderID)
:Message("LookupRequestMessage")
{
itsNameToLookup=theName;
itsSender=theSenderID;
}
string LookupRequestMessage::toString()
{
LookupRequest anHeader;
unsigned aLen=itsNameToLookup.length();
if(aLen > 0xFFFF - sizeof(anHeader))
throw ThreadException("LookupRequestMessage is exceding permitted size");
anHeader.sender=itsSender;
anHeader.namelen=aLen;
string aBuffer;
aBuffer.assign((char*)&anHeader,sizeof(anHeader));
aBuffer+=itsNameToLookup;
return aBuffer;
}
LookupReplyMessage::LookupReplyMessage()
:Message("LookupReplyMessage")
{
itsBuffer.fail=true;
itsBuffer.handle=0;
itsTarget=0;
}
LookupReplyMessage::LookupReplyMessage(MQHANDLE theTarget)
:Message("LookupReplyMessage")
{
itsBuffer.fail=true;
itsBuffer.handle=0;
itsTarget=theTarget;
}
LookupReplyMessage::LookupReplyMessage(LookupReplyMessage::LookupReply& theReply)
:Message("LookupReplyMessage")
{
itsBuffer.fail=theReply.fail;
itsBuffer.handle=theReply.handle;
itsTarget=0;
}
LookupReplyMessage::LookupReplyMessage(MQHANDLE theTarget,MQHANDLE theHandle)
:Message("LookupReplyMessage")
{
itsBuffer.fail=false;
itsBuffer.handle=theHandle;
itsTarget=theTarget;
}
string LookupReplyMessage::toString()
{
string aRet;
aRet.assign((char*)&itsBuffer,sizeof(itsBuffer));
return aRet;
}
Observer::Observer(const char* theName)
:MessageQueue(theName)
{
TRACE("Observer::Observer - start")
itsEncription=NULL;
itsCompression=NULL;
itsLastMessageProxy=0;
TRACE("Observer::Observer - end")
}
Observer::~Observer()
{
TRACE("Observer::~Observer - start")
if(itsEncription!=NULL)
delete itsEncription;
TRACE("Observer::~Observer - end")
}
void Observer::setEncription(Encription* theEncr)
{
TRACE("Observer::setEncription - start")
if(itsEncription!=NULL)
delete itsEncription;
itsEncription=theEncr;
TRACE("Observer::setEncription - end")
}
void Observer::setCompression(Compression* theCompr)
{
TRACE("Observer::setCompression - start")
if(itsCompression!=NULL)
delete itsCompression;
itsCompression=theCompr;
TRACE("Observer::setCompression - end")
}
void Observer::post(MQHANDLE theTarget,NetworkMessage* theMessage)
{
TRACE("Observer::post(static) - start")
if(itsCompression!=NULL)
theMessage->deflate(itsCompression);
if(itsEncription!=NULL)
theMessage->code(itsEncription);
MessageQueue::post(theTarget,theMessage);
TRACE("Observer::post(static) - end")
}
void Observer::publish(string theTopic,string theMessage)
{
TRACE("Observer::publish - start")
NetworkMessage* aMessage=new NetworkMessage(theMessage);
aMessage->setBroadcasting();
aMessage->setTopic(theTopic);
aMessage->setSender(getID());
if(itsCompression!=NULL)
aMessage->deflate(itsCompression);
if(itsEncription!=NULL)
aMessage->code(itsEncription);
MessageQueue::broadcast(aMessage);
TRACE("Observer::publish - end")
}
void Observer::onMessage(Message* theMessage)
{
TRACE("Observer::onMessage - start")
TRACE("Thread name=" << getName())
try
{
if(theMessage->is("Wakeup"))
{
TRACE("Call onWakeup")
onWakeup((Wakeup*)theMessage);
}
else if(theMessage->is("PingReplyMessage"))
{
TRACE("Call onPing")
onPing((PingReplyMessage*)theMessage);
}
else if(theMessage->is("LookupReplyMessage"))
{
TRACE("Call onLookup")
onLookup((LookupReplyMessage*)theMessage);
}
else if(theMessage->is("NetworkMessage"))
{
NetworkMessage* aRequest=(NetworkMessage*)theMessage;
itsLastMessageProxy=aRequest->getSender();
itsLastReceivedTopic=aRequest->getTopic();
if(aRequest->isUnsolicited())
{
TRACE("Call onUnsolicited")
if(itsEncription!=NULL) aRequest->decode(itsEncription);
if(itsCompression!=NULL) aRequest->inflate(itsCompression);
onUnsolicited(aRequest);
}
else if(aRequest->isBroadcasting())
{
TRACE("Call onBroadcast")
bool fire=false;
if(itsTopicList.size()>0)
{
TRACE("Find enabled topics")
for(vector<string>::iterator i = itsTopicList.begin(); i < itsTopicList.end(); ++i)
{
if(*i==aRequest->getTopic())
{
TRACE("Found topic=" << (*i).c_str())
fire=true;
}
}
}
if(fire==true)
{
if(itsEncription!=NULL) aRequest->decode(itsEncription);
if(itsCompression!=NULL) aRequest->inflate(itsCompression);
onBroadcast(aRequest);
}
}
else
{
TRACE("Call onNetworkMessage")
if(itsEncription!=NULL) aRequest->decode(itsEncription);
if(itsCompression!=NULL) aRequest->inflate(itsCompression);
NetworkMessage* aReply=onRequest(aRequest);
if(aReply!=NULL)
{
aReply->setSender(getID());
aReply->setTarget(aRequest->getRemoteSender());
aReply->setSequenceNumber(aRequest->getSequenceNumber());
post(aRequest->getSender(),aReply);
}
}
}
else
{
TRACE("Call onLocal")
onLocal(theMessage);
}
}
catch(Exception& ex)
{
DISPLAY("Observer::onMessage(" << getName() << ") : " << ex.getMessage().c_str())
}
catch(...)
{
DISPLAY("Observer::onMessage(" << getName() << ") : Unhandled exception")
}
TRACE("Observer::onMessage - end")
}
void Observer::decodeProperties(string& theBuffer,ListProperty& theProperties)
{
TRACE("Observer::decodeProperties - start")
theProperties.free();
istrstream aStream(theBuffer.data(),theBuffer.length());
theProperties.deserialize(aStream,true);
TRACE("Observer::decodeProperties - end")
}
void Observer::decodeProperties(char* theBuffer,unsigned long theLen,ListProperty& theProperties)
{
TRACE("Observer::decodeProperties - start")
theProperties.free();
istrstream aStream(theBuffer,theLen);
theProperties.deserialize(aStream,true);
TRACE("Observer::decodeProperties - end")
}
void Observer::encodeProperties(ListProperty& theProperties,string& theBuffer)
{
TRACE("Observer::encodeProperties - start")
ostrstream aStream;
theProperties.serialize(aStream);
int aLen=aStream.pcount();
char* aBuffer=aStream.str();
theBuffer.assign(aBuffer,aLen);
delete [] aBuffer;
TRACE("Observer::encodeProperties - end")
}
MessageProxy::MessageProxy(const char* theName)
:MessageQueue(theName)
{
TRACE("MessageProxy constructor - start")
TRACE("Name=" << theName)
itsSocket=NULL;
TRACE("MessageProxy constructor - end")
}
MessageProxy::MessageProxy(const char* theName,Socket* theSocket)
:MessageQueue(theName), itsSocket(theSocket)
{
TRACE("MessageProxy constructor - start")
TRACE("Name=" << theName)
#ifdef WIN32
DWORD tid = 0;
m_hThreadRx = (unsigned long*)CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)_mp_thread_proc,(Thread*)this,0,&tid);
if(m_hThreadRx == NULL)
throw ThreadException("Failed to create thread");
#else
int iret = pthread_create( &m_hThreadRx, NULL, _mp_thread_proc,this);
if(iret!=0)
throw ThreadException("Failed to create thread");
#endif
TRACE("MessageProxy constructor - end")
}
MessageProxy::~MessageProxy()
{
TRACE("MessageProxy destructor - start")
stop(false);
if(itsSocket!=NULL)
{
itsSocket->Close();
#ifdef WIN32
WaitForSingleObject(m_hThreadRx,INFINITE);
CloseHandle(m_hThreadRx);
#else
pthread_join(m_hThreadRx,NULL);
#endif
}
TRACE("MessageProxy destructor - end")
}
string MessageProxy::getConnectionAddress(MQHANDLE theCaller,int& thePort)
{
TRACE("MessageProxy::getConnectionAddress - start")
char anAddr[20];
string aName=getName();
istrstream aStream(aName.c_str(),aName.size());
aStream.ignore(sizeof(MESSAGEPROXYHEADER)-1);
aStream.getline(anAddr,sizeof(anAddr),',');
aStream >> thePort;
TRACE("MessageProxy::getConnectionAddress - end")
return anAddr;
}
void MessageProxy::onMessage(Message* theMessage)
{
TRACE("MessageProxy::onMessage - start")
TRACE("Thread name=" << getName())
header anHeader;
anHeader.sync=SYNCVAL;
TRACE("Message=" << theMessage->getClass())
try
{
if(theMessage->is("NetworkMessage"))
{
NetworkMessage* aMessage=(NetworkMessage*)theMessage;
if(aMessage->isUnsolicited())
anHeader.type=MQ_PROXY_UNSOLICITED;
else if(aMessage->isBroadcasting()) // ++ v1.5
anHeader.type=MQ_PROXY_BROADCAST;
else
anHeader.type=MQ_PROXY_MESSAGE;
anHeader.target=aMessage->getTarget();
}
else if(theMessage->is("LookupRequestMessage"))
{
anHeader.type=MQ_PROXY_LOOKUP_REQUEST;
anHeader.target=0;
}
else if(theMessage->is("LookupReplyMessage"))
{
anHeader.type=MQ_PROXY_LOOKUP_REPLY;
anHeader.target=((LookupReplyMessage*)theMessage)->getTarget();
}
else if(theMessage->is("PingRequestMessage"))
{
anHeader.type=MQ_PROXY_PING_REQUEST;
anHeader.target=0;
}
else if(theMessage->is("PingReplyMessage"))
{
anHeader.type=MQ_PROXY_PING_REPLY;
anHeader.target=((PingReplyMessage*)theMessage)->getTarget();
}
else
{
WARNING("Message not allowed. Skipped!")
return;
}
string aBuffer=theMessage->toString();
int aLen=aBuffer.length();
if(aLen + sizeof(NetworkMessage::NetworkMessageHeader) > 0xFFFF) // ++ v1.5
{
WARNING("Message too long. Dropped!")
return;
}
anHeader.msglen=aLen;
TRACE("Type=" << anHeader.type)
TRACE("Target=" << anHeader.target)
TRACE("MsgLen=" << anHeader.msglen)
if(anHeader.msglen>0)
{
DUMP("Tx header",(char*)&anHeader,sizeof(header));
//itsSocket->SendBuffer(&anHeader,sizeof(header));
aBuffer=string((char*)&anHeader,sizeof(header))+aBuffer;
DUMP("Tx buffer",(char*)aBuffer.data(),aBuffer.length());
itsSocket->SendBytes(aBuffer);
//BUFFER((char*)&anHeader,sizeof(header))
}
else
{
WARNING("Posted an empty network message. Skipped!")
}
}
catch(Exception& ex)
{
WARNING(ex.getMessage().c_str())
}
catch(...)
{
CRITICAL("Unhandled exception")
}
TRACE("MessageProxy::onMessage - end")
}
void MessageProxy::receive()
{
TRACE("MessageProxy::receive - start")
TRACE("Thread name=" << getName())
char* aBuffer=new char[0x10000];
while (true)
{
try
{
TESTCANCEL
TRACE("Wait a message")
header anHeader;
if(itsSocket->ReceiveBuffer(&anHeader,sizeof(header))==false)
{
WARNING("Socket Rx returns an error")
break;
}
TESTCANCEL
DUMP("Rx header",(char*)&anHeader,sizeof(header));
if(anHeader.sync==SYNCVAL)
{
TRACE("Valid sync")
TRACE("Message lenght=" << anHeader.msglen)
if(anHeader.msglen>0)
if(itsSocket->ReceiveBuffer(aBuffer,anHeader.msglen)==false)
{
WARNING("Socket Rx returns an error")
break;
}
//BUFFER((char*)&anHeader,sizeof(header))
TESTCANCEL
DUMP("Rx message",aBuffer,anHeader.msglen);
if(anHeader.type==MQ_PROXY_MESSAGE || anHeader.type==MQ_PROXY_UNSOLICITED || anHeader.type==MQ_PROXY_BROADCAST) // ++v1.5
{
TRACE("type==MQ_PROXY_MESSAGE OR MQ_PROXY_UNSOLICITED OR MQ_PROXY_BROADCAST")
NetworkMessage::NetworkMessageHeader* aNMHeader=(NetworkMessage::NetworkMessageHeader*)aBuffer;
if((aNMHeader->topiclen > 0xFFFF - sizeof(aNMHeader) - aNMHeader->buflen) || // ++ v1.5
(aNMHeader->buflen > 0xFFFF - sizeof(aNMHeader) - aNMHeader->topiclen))
{
WARNING("Buffer overflow detected. Drop connection!")
break;
}
DUMP("NetworkMessage Rx header",(char*)aNMHeader,sizeof(NetworkMessage::NetworkMessageHeader));
TRACE("Sender=" << aNMHeader->sender)
TRACE("Sequence=" << aNMHeader->seqnum)
TRACE("Topic lenght=" << aNMHeader->topiclen) // ++ v1.5
TRACE("Buffer lenght=" << aNMHeader->buflen)
//BUFFER((char*)aNMHeader,sizeof(NetworkMessage::NetworkMessageHeader));
char* aTopic=aBuffer+sizeof(NetworkMessage::NetworkMessageHeader); // ++ v1.5
char* aBufPtr=aTopic+aNMHeader->topiclen;
NetworkMessage* aNetworkMessage=new NetworkMessage(aBufPtr,aNMHeader->buflen);
if(aNMHeader->topiclen>0) // ++ v1.5
aNetworkMessage->setTopic(aTopic,aNMHeader->topiclen);
if(anHeader.type==MQ_PROXY_UNSOLICITED)
aNetworkMessage->setUnsolicited();
else if(anHeader.type==MQ_PROXY_BROADCAST) //++v1.5
aNetworkMessage->setBroadcasting();
aNetworkMessage->setSender(getID()); // v1.5
aNetworkMessage->setRemoteSender(aNMHeader->sender); // v1.5
aNetworkMessage->setTarget(anHeader.target);
aNetworkMessage->setSequenceNumber(aNMHeader->seqnum); // ++ v1.1
if(anHeader.type==MQ_PROXY_BROADCAST)
{
broadcast(aNetworkMessage);
TRACE("Message broadcasted")
}
else
{
post(anHeader.target,aNetworkMessage);
TRACE("Message delivered")
}
}
else if(anHeader.type==MQ_PROXY_LOOKUP_REQUEST)
{
TRACE("type==MQ_PROXY_LOOKUP_REQUEST")
LookupRequestMessage::LookupRequest* aLookup=(LookupRequestMessage::LookupRequest*)aBuffer;
if(aLookup->namelen > 0xFFFF - sizeof(aLookup))
{
WARNING("Buffer overflow detected. Drop connection!")
break;
}
DUMP("Lookup Rx header",(char*)aLookup,sizeof(aLookup));
const char* aNamePtr=aBuffer+sizeof(LookupRequestMessage::LookupRequest);
string aName;
aName.assign(aNamePtr,aLookup->namelen);
MQHANDLE anHandle;
if(lookup(aName.c_str(),anHandle))
{
TRACE("Lookup of " << aName.c_str() << " ok")
TRACE("Sender=" << aLookup->sender)
TRACE("Handle=" << anHandle)
LookupReplyMessage* aMessage=new LookupReplyMessage(aLookup->sender,anHandle); // ++ v1.5
aMessage->setSender(getID()); // ++ v1.5
post(aMessage); // v1.5
}
else
{
TRACE("Lookup failed")
TRACE("Sender=" << aLookup->sender)
LookupReplyMessage* aMessage=new LookupReplyMessage(aLookup->sender); // ++ v1.5
aMessage->setSender(getID()); // ++ v1.5
post(aMessage); // v1.5
}
}
else if(anHeader.type==MQ_PROXY_LOOKUP_REPLY)
{
TRACE("type==MQ_PROXY_LOOKUP_REPLY")
LookupReplyMessage::LookupReply* aLookup=(LookupReplyMessage::LookupReply*)aBuffer;
LookupReplyMessage* aReply;
if(aLookup->fail)
aReply=new LookupReplyMessage();
else
aReply=new LookupReplyMessage(*aLookup);
aReply->setSender(getID());
post(anHeader.target,aReply);
TRACE("Lookup delivered")
}
else if(anHeader.type==MQ_PROXY_PING_REQUEST)
{
TRACE("type==MQ_PROXY_PING_REQUEST")
PingRequestMessage::PingRequest* aPing=(PingRequestMessage::PingRequest*)aBuffer;
PingReplyMessage* aMessage=new PingReplyMessage(aPing->sender);
aMessage->setSender(getID()); // ++ v1.5
post(aMessage); // v1.5
}
else if(anHeader.type==MQ_PROXY_PING_REPLY)
{
TRACE("type==MQ_PROXY_PING_REPLY")
PingReplyMessage* aMessage=new PingReplyMessage(0); // ++ v1.5
aMessage->setSender(getID()); // ++ v1.5
post(anHeader.target,aMessage); // v1.5
}
else
{
WARNING("Invalid Rx type. Flush Rx channel.")
BUFFER((char*)&anHeader,sizeof(header))
string aBuffer=itsSocket->ReceiveBytes();
//BUFFER((char*)aBuffer.c_str(),aBuffer.length())
}
}
else
{
WARNING("Invalid sync. Flush Rx channel.")
BUFFER((char*)&anHeader,sizeof(header))
string aBuffer=itsSocket->ReceiveBytes();
//BUFFER((char*)aBuffer.c_str(),aBuffer.length())
}
}
catch(Exception& ex)
{
WARNING(ex.getMessage().c_str())
}
catch(...)
{
CRITICAL("Unhandled exception")
}
}
TRACE("Close socket and stop also Tx tread")
delete [] aBuffer;
Thread::stop(false);
itsSocket->Close();
WARNING("Connection broken")
TRACE("MessageProxy::receive - end")
}
Thread MessageProxyFactory::itsMutex("MessageProxyFactoryMutex");
void MessageProxyFactory::ping(const char* theHost, unsigned thePort,
MessageQueue* theSourceQueue)
{
TRACE("MessageFactory::ping(static) - start")
PingRequestMessage* aMessage=new PingRequestMessage(theSourceQueue->getID());
MessageProxyFactory::post(theHost,thePort,aMessage,theSourceQueue->getID());
TRACE("MessageFactory::ping(static) - end")
}
void MessageProxyFactory::lookupAt(const char* theHost, unsigned thePort,
const char* theRemoteQueueName,MessageQueue* theSourceQueue)
{
TRACE("MessageFactory::lookupAt(static) - start")
LookupRequestMessage* aMessage=new LookupRequestMessage(theRemoteQueueName,theSourceQueue->getID());
MessageProxyFactory::post(theHost,thePort,aMessage,theSourceQueue->getID());
TRACE("MessageFactory::lookupAt(static) - end")
}
void MessageProxyFactory::post(const char* theHost, unsigned thePort,Message* theMessage,MQHANDLE theSender)
{
TRACE("MessageFactory::post(static) - start")
ostrstream aStream;
aStream << MESSAGEPROXYHEADER << theHost << "," << thePort << ")" << ends;
char* aName = aStream.str();
TRACE("Proxy name=" << aName)
MQHANDLE anHandle;
itsMutex.wait(); // ++ v1.5
if(MessageQueue::lookup(aName,anHandle))
{
TRACE("Using existent connection")
TRACE("Handle=" << anHandle)
MessageQueue::post(anHandle,theMessage);
TRACE("Message sent")
}
else
{
TRACE("Create new connection")
Socket* aSocket=NULL;
MessageProxy* aProxy=NULL;
try
{
aSocket=new SocketClient(string(theHost),thePort);
TRACE("New client connection created")
aProxy=new MessageProxy(aName,aSocket);
aProxy->post(theMessage);
TRACE("Message sent")
char aValue[10];
ostrstream aStream(aValue,sizeof(aValue));
aStream << thePort << ends;
string aMsg=string("Connected to ")+string(theHost)+string(":")+aValue;
LOG(aMsg.c_str())
}
catch(Exception& exc)
{
if(theMessage->is("LookupRequestMessage")) Decoupler::deferredPost(theSender,new LookupReplyMessage());
if(aProxy!=NULL) delete aProxy;
else if(aSocket!=NULL) delete aSocket;
delete[] aName;
delete theMessage;
itsMutex.release();
string aMsg=string("Fail to create new server connection: ") + exc.getMessage();
LOG(aMsg.c_str())
return;
}
}
itsMutex.release(); // ++ v1.5
delete[] aName;
TRACE("MessageFactory::post(static) - end")
}
MessageProxyFactory::MessageProxyFactory(const char* theName,int theSocket)
:Thread(theName), SocketServer(theSocket,MAX_CONNECTIONS),
itsCount(0), itsPort(theSocket)
{
TRACE("MessageProxyFactory::MessageProxyFactory - start")
start();
TRACE("MessageProxyFactory::MessageProxyFactory - end")
}
MessageProxyFactory::~MessageProxyFactory()
{
TRACE("MessageProxyFactory::~MessageProxyFactory - start")
Close(); //Close socket
stop(false); //Wait exit of MessageProxyFactory::run
TRACE("MessageProxyFactory::~MessageProxyFactory - end")
}
void MessageProxyFactory::run()
{
TRACE("MessageProxyFactory::run - start")
while(true)
{
TESTCANCEL
if(Thread::isShuttingDown())
break;
Socket* aSocket=NULL;
MessageProxy* aProxy=NULL;
try
{
char aValue[10];
aSocket=Accept();
string anAddr=address();
unsigned short aPort=port();
ostrstream aMsgStream(aValue,sizeof(aValue));
aMsgStream << aPort << ends;
string aMsg=string("Connected to ")+anAddr+string(":")+aValue;
LOG(aMsg.c_str())
itsCount++;
ostrstream aStream;
aStream << MESSAGEPROXYHEADER << anAddr << "," << aPort << ")" << ends;
char* aName=aStream.str();
aProxy=new MessageProxy(aName,aSocket);
TRACE(aName << " proxy started")
delete [] aName;
onNewConnection(anAddr,aPort);
}
catch(Exception& exc)
{
string aMsg=string("Fail to create new server connection: ") + exc.getMessage();
WARNING(aMsg.c_str())
if(aProxy!=NULL)
delete aProxy;
else if(aSocket!=NULL)
delete aSocket;
}
}
TRACE("MessageProxyFactory::run - end")
}
string MessageProxyFactory::getUniqueNetID()
{
TRACE("MessageProxyFactory::getUniqueNetID - start")
string ret;
vector<NetAdapter>* aList=Socket::getAdapters();
string allmacs;
if(aList!=NULL && aList->size()>0)
{
for(vector<NetAdapter>::iterator i = aList->begin(); i < aList->end(); ++i)
{
NetAdapter& anAdapter=*i;
allmacs+=anAdapter.getMAC();
}
}
unsigned int anHash=APHash(allmacs);
ret+=string((char*)&anHash,sizeof(anHash));
_TIMEVAL now=Timer::timeExt();
ret+=string((char*)&now,sizeof(now));
::srand(Timer::time());
unsigned int aRand=::rand();
ret+=string((char*)&aRand,sizeof(aRand));
delete aList;
TRACE("MessageProxyFactory::getUniqueNetID - end")
return ret;
}
#ifdef WIN32
unsigned int _mp_thread_proc(void* param)
{
TRACE("Start _mp_thread_proc")
try
{
MessageProxy* tp = (MessageProxy*)param;
tp->receive();
}
catch(...)
{
DISPLAY("_mp_thread_proc: Unhandled exception")
}
TRACE("End _mp_thread_proc")
return 0;
}
#else
void* _mp_thread_proc(void* param)
{
TRACE("Start _mp_thread_proc")
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
try
{
MessageProxy* tp = (MessageProxy*)param;
tp->receive();
}
catch(...)
{
DISPLAY("_mp_thread_proc: Unhandled exception")
}
TRACE("End _mp_thread_proc")
pthread_exit(NULL);
return NULL;
}
#endif
syntax highlighted by Code2HTML, v. 0.9.1