///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007 Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#define SILENT
#include "StoreForward.h"
#include "Logger.h"
#define LOGEXT ".tlog"
#define LOGSRC "*.tlog"
#define FWAIT 2*60*1000
#define SCAN_TIME 1000
MessageStorer::MessageStorer(const char* theName, const char* theWorkingPath, const char* theHost, short thePort,const char* theRemoteService)
:Observer(theName)
{
TRACE("MessageStorer::MessageStorer - start")
itsHost=theHost;
itsPort=thePort;
itsService=theRemoteService;
itsCurDir=Directory::getCurrent();
itsTlogDir=(Directory*)itsCurDir->get(theWorkingPath);
if(itsTlogDir==NULL)
{
itsTlogDir=itsCurDir->mkdir(theWorkingPath);
TRACE(itsTlogDir->getName() << " created")
}
itsStartTime=Timer::time();
itsMsgCnt=0;
TRACE("MessageStorer::MessageStorer - end")
}
MessageStorer::~MessageStorer()
{
TRACE("MessageStorer::~MessageStorer - start")
delete itsCurDir;
TRACE("MessageStorer::~MessageStorer - end")
}
void MessageStorer::send(string theBuffer)
{
TRACE("MessageStorer::send - start")
TRACE("Host=" << itsHost)
TRACE("Port=" << itsPort)
TRACE("Service=" << itsService)
unsigned long aTime=Timer::time();
TRACE("Time=" << aTime)
ListProperty itsPropertiesList;
StringProperty* aSource=new StringProperty("Source");
aSource->set(getName());
itsPropertiesList.add(aSource);
LongIntProperty* aTimestamp=new LongIntProperty("Timestamp");
aTimestamp->set(aTime);
itsPropertiesList.add(aTimestamp);
StringProperty* anHost=new StringProperty("Host");
anHost->set(itsHost);
itsPropertiesList.add(anHost);
ShortIntProperty* aPort=new ShortIntProperty("Port");
aPort->set(itsPort);
itsPropertiesList.add(aPort);
StringProperty* aService=new StringProperty("Service");
aService->set(itsService);
itsPropertiesList.add(aService);
StringProperty* aMsg=new StringProperty("Message");
aMsg->set(theBuffer);
itsPropertiesList.add(aMsg);
char aFileName[256];
ostrstream aName(aFileName,sizeof(aFileName));
unsigned long anID=((itsStartTime & 0xffff) << 16) + itsMsgCnt;
aName << getName() << "." << anID << LOGEXT << ends;
TRACE("Creating '" << aFileName << "' at '" << itsTlogDir->getName() << "'")
File* aFile=itsTlogDir->create(aFileName);
fstream& aStream=aFile->create();
TRACE("Serializing properties....")
itsPropertiesList.serialize(aStream);
aFile->close();
itsMsgCnt++;
TRACE("MessageStorer::send - end")
}
TargetHost::TargetHost(const char* theName, const char* theHost,int thePort, const char* theTarget)
:Client(theName,theHost,thePort,theTarget)
{
TRACE("TargetHost::TargetHost - start")
itsFile=NULL;
itsState=STOP;
itsTime=Timer::time();
TRACE("TargetHost::TargetHost - end")
}
TargetHost::~TargetHost()
{
TRACE("TargetHost::~TargetHost - start")
if(itsFile!=NULL)
delete itsFile;
TRACE("TargetHost::~TargetHost - end")
}
string TargetHost::getFileName()
{
if(itsFile!=NULL)
return itsFile->getName();
return "";
}
bool TargetHost::send(string theBuffer, string theFileName)
{
TRACE("TargetHost::send - start")
bool ret=Client::send(theBuffer);
if(ret)
{
itsFile=new File(theFileName);
itsState=SENDING;
}
TRACE("TargetHost::send - end")
return ret;
}
void TargetHost::success(string theBuffer)
{
TRACE("TargetHost::success - start")
itsFile->remove();
delete itsFile;
itsFile=NULL;
itsState=SUCCESS;
TRACE("TargetHost::success - end")
}
void TargetHost::fail(string theError)
{
TRACE("TargetHost::fail - start")
delete itsFile;
itsFile=NULL;
itsState=FAIL;
char aMessage[1024];
ostrstream aStream(aMessage,sizeof(aMessage));
aStream << "Fail to send message " << "' to service '" << itsTarget
<< "' hosted on '" << itsHost << ":" << itsPort << "'";
WARNING(aMessage);
TRACE("TargetHost::fail - end")
}
MessageForwarder::MessageForwarder(const char* theName,const char* theWorkingPath)
:Observer(theName)
{
TRACE("MessageForwarder::MessageForwarder - start")
itsCurDir=Directory::getCurrent();
itsTlogDir=(Directory*)itsCurDir->get(theWorkingPath);
if(itsTlogDir==NULL)
{
itsTlogDir=itsCurDir->mkdir(theWorkingPath);
TRACE(itsTlogDir->getName() << " created")
}
itsLastScanTime=Timer::time();
SCHEDULE(this,SCAN_TIME);
TRACE("MessageForwarder::MessageForwarder - end")
}
MessageForwarder::~MessageForwarder()
{
TRACE("MessageForwarder::~MessageForwarder - start")
if(!isShuttingDown())
{
for(vector<TargetHost*>::iterator i = itsHostList.begin(); i < itsHostList.end(); ++i)
delete *i;
}
itsHostList.clear();
delete itsCurDir;
TRACE("MessageForwarder::~MessageForwarder - end")
}
void MessageForwarder::onWakeup(Wakeup* theMessage)
{
TRACE("MessageForwarder::onWakeup - start")
itsLastScanTime=Timer::time();
scan();
purge();
TRACE("MessageForwarder::onWakeup - end")
}
void MessageForwarder::scan()
{
TRACE("MessageForwarder::scan - start")
try
{
itsTlogDir->search(LOGSRC);
for(vector<Persistent*>::iterator i = itsTlogDir->getIterator(); itsTlogDir->testIterator(i) ; ++i)
{
Persistent* anObj=*i;
if(anObj->is("File"))
{
File* aFile=(File*)anObj;
string aFileName=aFile->getName();
TRACE("Processing file name " << aFileName)
bool found=false;
TRACE("Cache size=" << itsHostList.size())
for(vector<TargetHost*>::iterator t = itsHostList.begin(); t < itsHostList.end(); ++t)
{
TargetHost* aClient=*t;
if(aClient->getFileName()==aFileName)
{
TRACE("File name already recorded")
found=true;
break;
}
}
if(!found)
{
TRACE("Adding in queue")
fstream& aStream=aFile->open();
ListProperty aPropertiesList;
aPropertiesList.deserialize(aStream,true);
aFile->close();
TRACE("Find properties")
string aSource=((StringProperty*)aPropertiesList.get("Source"))->get();
unsigned long aTimestamp=((LongIntProperty*)aPropertiesList.get("Timestamp"))->get();
string anHost=((StringProperty*)aPropertiesList.get("Host"))->get();
unsigned short aPort=((ShortIntProperty*)aPropertiesList.get("Port"))->get();
string aService=((StringProperty*)aPropertiesList.get("Service"))->get();
string aMessage=((StringProperty*)aPropertiesList.get("Message"))->get();
string aFullName=aFile->encodeFullName();
char aClientName[256];
ostrstream aName(aClientName,sizeof(aClientName));
aName << getName() << "(" << aFileName << ")" << ends;
TargetHost* aClient=new TargetHost(aClientName,anHost.c_str(),aPort, aService.c_str());
itsHostList.push_back(aClient);
aClient->send(aMessage,aFullName);
TRACE("Added file " << aFullName)
}
}
}
}
catch(Exception& exc)
{
TRACE(exc.getMessage())
}
TRACE("MessageForwarder::scan - end")
}
void MessageForwarder::purge()
{
TRACE("MessageForwarder::purge - start")
unsigned long aCurTime=Timer::time();
if(!isShuttingDown())
{
for(vector<TargetHost*>::iterator i = itsHostList.begin(); i < itsHostList.end(); ++i)
{
TargetHost* aClient=*i;
switch(aClient->getState())
{
case TargetHost::SUCCESS:
itsHostList.erase(i);
delete aClient;
break;
case TargetHost::FAIL:
if(aClient->getCreationTime()-aCurTime > FWAIT)
{
itsHostList.erase(i);
delete aClient;
}
break;
}
}
}
TRACE("MessageForwarder::purge - end")
}
syntax highlighted by Code2HTML, v. 0.9.1