///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007 Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#include "Session.h"
#include "Logger.h"
#include "FileSystem.h"
#include <string>
using namespace std;
//#define SILENT
class MyClient : public Client
{
public:
MyClient(const char* theName, const char* theHost,int thePort, const char* theTarget)
:Client(theName,theHost,thePort,theTarget) {/* TO-DO */};
virtual ~MyClient() {/* TO-DO */};
protected:
virtual void success(string theBuffer) {/* TO-DO */};
virtual void fail(string theError) {/* TO-DO */};
};
// Server1-ThreadA: background service that receives data from an external
// com-based application (for example from 50 marketplaces, 10 or more dataframes per second),
// stores them in database and in a flatfile (both for logging, reporting and reuse)
// and sends then this data to a second service.
typedef struct _Record
{
int Value1;
string Value2;
short Value3;
} Record;
class ThreadA : public Observer
{
protected:
Directory* itsDir;
File* itsFile;
ListProperty itsStructure;
MyClient* itsClient;
public:
ThreadA(const char* theThreadName,const char* theFileName,const char* theHost,int thePort, const char* theTarget)
: Observer(theThreadName)
{
TRACE("ThreadA::ThreadA - start")
itsClient=new MyClient((string("Client(")+theThreadName+string(")")).c_str(),theHost,thePort,theTarget);
itsDir=Directory::getCurrent();
itsFile=itsDir->create(theFileName);
itsFile->create();
subscribe("ControlTopic");
Message* anEvent=new Message("ThreadAEvent");
anEvent->setSender(getID());
Decoupler::deferredPost(getID(),anEvent);
TRACE("ThreadA::ThreadA - end")
};
virtual ~ThreadA()
{
TRACE("ThreadA::~ThreadA - start")
delete itsDir;
TRACE("ThreadA::~ThreadA - end")
};
protected:
virtual Record* receive()
{
TRACE("ThreadA::receive - start")
// Simulate a socket receive
Record* aRecord=new Record;
sleep(100);
aRecord->Value1=10;
aRecord->Value2="A string";
aRecord->Value3=3;
TRACE("ThreadA::receive - end")
return aRecord;
};
virtual string serialize(Record* theRecord)
{
TRACE("ThreadA::serialize - start")
itsStructure.free();
LongIntProperty* aLongIntProperty=new LongIntProperty("Value1");
aLongIntProperty->set(theRecord->Value1);
itsStructure.add(aLongIntProperty);
StringProperty* aStringProperty=new StringProperty("Value2");
aStringProperty->set(theRecord->Value2);
itsStructure.add(aStringProperty);
ShortIntProperty* aShortIntProperty=new ShortIntProperty("Value3");
aShortIntProperty->set(theRecord->Value3);
itsStructure.add(aShortIntProperty);
string aString;
encodeProperties(itsStructure,aString);
TRACE("ThreadA::serialize - end")
return aString;
}
virtual void onLocal(Message* theMessage)
{
TRACE("ThreadA::onLocal - start")
if(theMessage->is("ThreadAEvent") && theMessage->getSender()==getID() && !isShuttingDown())
{
TRACE("Receive from ext application")
Record* aRecord=receive();
string aMsg=serialize(aRecord);
TRACE("Append record in log file")
fstream& aStream=itsFile->get();
aStream.write(aMsg.c_str(),aMsg.size());
TRACE("Send record to ThreadB")
itsClient->sendMessage(aMsg);
delete aRecord;
TRACE("Recall this function")
Message* anEvent=new Message("ThreadAEvent");
anEvent->setSender(getID());
Decoupler::deferredPost(getID(),anEvent);
}
TRACE("ThreadA::onLocal - end")
};
virtual void onBroadcast(NetworkMessage* theMessage)
{
TRACE("ThreadA::onBroadcast - start")
string aMessage=theMessage->toString();
// Do something
TRACE("ThreadA::onBroadcast - end")
};
};
// Server2-ThreadB: this service works with the data from tread A (some computations for each marketplace
// in a seperate thread) and sends then the prepared dataframes to thread C.
typedef struct _Dataframe
{
int Data1;
int Data2;
int Data3;
} Dataframe;
class ThreadB : public Server
{
protected:
MyClient* itsClient;
ListProperty itsStructure;
public:
ThreadB(const char* theThreadName,const char* theHost,int thePort, const char* theTarget)
: Server(theThreadName)
{
TRACE("ThreadB::ThreadB - start")
itsClient=new MyClient((string("Client(")+theThreadName+string(")")).c_str(),theHost,thePort,theTarget);
subscribe("ControlTopic");
TRACE("ThreadB::ThreadB - end")
};
virtual ~ThreadB()
{
TRACE("ThreadB::~ThreadB - start")
TRACE("ThreadB::~ThreadB - end")
};
protected:
virtual Record* deserialize(string theBuffer)
{
TRACE("ThreadB::deserialize - start")
decodeProperties(theBuffer,itsStructure);
Record* aRecord=new Record;
TRACE("Retrieving data from structure")
Property* aProperty=itsStructure.get("Value1");
if(aProperty!=NULL && aProperty->is(PROPERTY_LONGINT))
aRecord->Value1=((LongIntProperty*)aProperty)->get();
aProperty=itsStructure.get("Value2");
if(aProperty!=NULL && aProperty->is(PROPERTY_STRING))
aRecord->Value2=((StringProperty*)aProperty)->get();
aProperty=itsStructure.get("Value3");
if(aProperty!=NULL && aProperty->is(PROPERTY_SHORTINT))
aRecord->Value3=((ShortIntProperty*)aProperty)->get();
TRACE("ThreadB::deserialize - end")
return aRecord;
};
virtual Dataframe* compute(Record* theRecord)
{
TRACE("ThreadB::compute - start")
Dataframe* aDataframe=new Dataframe;
aDataframe->Data1=1;
aDataframe->Data1=2;
aDataframe->Data1=3;
return aDataframe;
TRACE("ThreadB::compute - start")
};
virtual string serialize(Dataframe* theDataframe)
{
TRACE("ThreadB::serialize - start")
itsStructure.free();
LongIntProperty* aLongIntProperty=new LongIntProperty("Data1");
aLongIntProperty->set(theDataframe->Data1);
itsStructure.add(aLongIntProperty);
aLongIntProperty=new LongIntProperty("Data2");
aLongIntProperty->set(theDataframe->Data2);
itsStructure.add(aLongIntProperty);
aLongIntProperty=new LongIntProperty("Data3");
aLongIntProperty->set(theDataframe->Data3);
itsStructure.add(aLongIntProperty);
string aString;
encodeProperties(itsStructure,aString);
TRACE("ThreadB::serialize - end")
return aString;
}
virtual string service(string theBuffer)
{
TRACE("ThreadB::service - start")
Record* aRecord=deserialize(theBuffer);
Dataframe* aDataframe=compute(aRecord);
string aMsg=serialize(aDataframe);
itsClient->sendMessage(aMsg);
delete aRecord;
delete aDataframe;
TRACE("ThreadB::service - end")
return "OK";
};
virtual void onBroadcast(NetworkMessage* theMessage)
{
TRACE("ThreadB::onBroadcast - start")
string aMessage=theMessage->toString();
// Do something
TRACE("ThreadB::onBroadcast - end")
};
};
// Server1-ThreadC: this service receives data from thread B and do someting
class ThreadC : public Server
{
protected:
ListProperty itsStructure;
public:
ThreadC(const char* theThreadName)
: Server(theThreadName)
{
TRACE("ThreadC::ThreadC - start")
subscribe("ControlTopic");
TRACE("ThreadC::ThreadC - end")
};
virtual ~ThreadC()
{
TRACE("ThreadC::~ThreadC - start")
TRACE("ThreadC::~ThreadC - end")
};
protected:
virtual Dataframe* deserialize(string theBuffer)
{
TRACE("ThreadC::deserialize - start")
decodeProperties(theBuffer,itsStructure);
Dataframe* aDataframe=new Dataframe;
TRACE("Retrieving data from structure")
Property* aProperty=itsStructure.get("Data1");
if(aProperty!=NULL && aProperty->is(PROPERTY_LONGINT))
aDataframe->Data1=((LongIntProperty*)aProperty)->get();
aProperty=itsStructure.get("Data2");
if(aProperty!=NULL && aProperty->is(PROPERTY_LONGINT))
aDataframe->Data2=((LongIntProperty*)aProperty)->get();
aProperty=itsStructure.get("Data3");
if(aProperty!=NULL && aProperty->is(PROPERTY_LONGINT))
aDataframe->Data3=((LongIntProperty*)aProperty)->get();
TRACE("ThreadC::deserialize - end")
return aDataframe;
};
virtual void compute(Dataframe* theDataframe)
{
TRACE("ThreadC::compute - start")
// Do something
TRACE("ThreadC::compute - end")
};
virtual string service(string theBuffer)
{
TRACE("ThreadC::service - start")
Dataframe* aDataframe=deserialize(theBuffer);
compute(aDataframe);
delete aDataframe;
TRACE("ThreadC::service - end")
return "OK";
};
virtual void onBroadcast(NetworkMessage* theMessage)
{
TRACE("ThreadC::onBroadcast - start")
string aMessage=theMessage->toString();
// Do something
TRACE("ThreadC::onBroadcast - end")
};
};
// Server2-ThreadD: the mainservice coordinates the first three services
class ThreadD : public Observer
{
private:
public:
ThreadD(const char* theName) : Observer(theName)
{
TRACE("ThreadD::ThreadD - start")
SCHEDULE(this,2000);
TRACE("ThreadD::ThreadD - end")
};
virtual ~ThreadD()
{
TRACE("ThreadD::~ThreadD - start")
TRACE("ThreadD::~ThreadD - end")
};
protected:
virtual void onWakeup(Wakeup* theMessage)
{
TRACE("ThreadD::onWakeup - start")
publish("ControlTopic","A command");
TRACE("ThreadD::onWakeup - end")
};
};
class MyMessageProxyFactory : public MessageProxyFactory
{
protected:
public:
MyMessageProxyFactory(const char* theFactoryName,int theSocket)
:MessageProxyFactory(theFactoryName,theSocket) {};
~MyMessageProxyFactory() {};
protected:
virtual void onNewConnection(string theAddress,unsigned short thePort)
{
ThreadB* aThreadB=new ThreadB("MyThreadB",theAddress.c_str(),thePort,"MyThreadC");
};
};
void main_sleep(int val)
{
DISPLAY("...wait " << val << " secs...")
Thread::sleep(val*1000);
}
int main(int argv,char* argc[])
{
DISPLAY("MQ4CPP peer.cpp")
DISPLAY("This example shows how to build a peer to peer system")
int servertype=0;
int lport=0;
char* host=NULL;
int hport=0;
if(argv < 3)
{
DISPLAY("Server1 usage: peer -1 hostip port")
DISPLAY("Server2 usage: peer -2 port")
return 0;
}
else if(string(argc[1]).compare("-1")==0 && argv==4)
{
servertype=1;
DISPLAY("Server2 host name=" << argc[2])
host=argc[2];
DISPLAY("Server2 host port=" << argc[3])
hport=atoi(argc[3]);
}
else if(string(argc[1]).compare("-2")==0 && argv==3)
{
servertype=2;
DISPLAY("Server1 host name=" << argc[2])
lport=atoi(argc[2]);
}
else
{
DISPLAY("Server1 usage: peer -1 hostip port")
DISPLAY("Server2 usage: peer -2 port")
return 0;
}
try
{
if(servertype==1)
{
DISPLAY("Starting server1 threads...")
STARTLOGGER("server1.log")
LOG("!!!!!!! peer.cpp - server1 !!!!!!!")
ThreadA* aThreadA=new ThreadA("MyThreadA","records.log",host,hport,"MyThreadB") ;
ThreadC* aThreadC=new ThreadC("MyThreadC");
main_sleep(100);
}
else if(servertype==2)
{
DISPLAY("Starting server2 threads...")
STARTLOGGER("server2.log")
LOG("!!!!!!! peer.cpp - server2 !!!!!!!")
MyMessageProxyFactory aFactory("MyFactory",lport);
ThreadD* aThreadD=new ThreadD("MyThreadD");
main_sleep(100);
}
DISPLAY("...stopping threads...")
Thread::shutdownInProgress();
Thread::sleep(100);
STOPLOGGER()
STOPREGISTRY()
STOPTIMER()
}
catch(Exception& ex)
{
TRACE(ex.getMessage().c_str())
}
catch(...)
{
TRACE("Unhandled exception")
}
DISPLAY("...done!")
return 0;
}
syntax highlighted by Code2HTML, v. 0.9.1