///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007 Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#define SILENT
#include "Session.h"
#include "Trace.h"
ReplicationHost::ReplicationHost(const char* theName, const char* theHost,int thePort, const char* theTarget)
:Client(theName,theHost,thePort,theTarget)
{
TRACE("ReplicationHost::ReplicationHost - start")
itsState=STOP;
TRACE("ReplicationHost::ReplicationHost - end")
}
ReplicationHost::~ReplicationHost()
{
TRACE("ReplicationHost::~ReplicationHost - start")
TRACE("ReplicationHost::~ReplicationHost - end")
}
bool ReplicationHost::send(string theBuffer)
{
TRACE("ReplicationHost::send - start")
bool ret=Client::send(theBuffer);
if(ret)
itsState=WAIT;
TRACE("ReplicationHost::send - end")
return ret;
}
void ReplicationHost::success(string theBuffer)
{
TRACE("ReplicationHost::success - start")
itsState=SUCCESS;
TRACE("ReplicationHost::success - end")
}
void ReplicationHost::fail(string theError)
{
TRACE("ReplicationHost::fail - start")
itsState=FAIL;
TRACE("ReplicationHost::fail - end")
}
Session::Session(const char* theName,bool theAutoCommit) : Server(theName)
{
TRACE("Session::Session - start")
itsModified=false;
itsAutoCommit=theAutoCommit;
TRACE("Session::Session - end")
}
Session::~Session()
{
TRACE("Session::~Session - start")
if(!isShuttingDown())
{
for(vector<ReplicationHost*>::iterator i = itsReplicaList.begin(); i < itsReplicaList.end(); ++i)
delete *i;
}
itsReplicaList.clear();
TRACE("Session::~Session - end")
}
void Session::addReplicationHost(char* theHostName,int thePort)
{
TRACE("Session::addReplicationHost - start")
ostrstream aNewName;
aNewName << getName() << "(" << itsReplicaList.size() << ")" << ends;
char* aString=aNewName.str();
TRACE("Added " << aString)
wait();
itsReplicaList.push_back(new ReplicationHost(aString,theHostName,thePort,getName()));
release();
delete [] aString;
TRACE("Session::addReplicationHost - end")
}
char Session::getChar(const char* thePropertyName)
{
TRACE("Session::getChar - start")
TRACE("Property=" << thePropertyName)
char ret=0;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if(aProperty!=NULL)
{
TRACE("Property found in ListProperty")
if(aProperty->is(PROPERTY_CHAR))
ret=((CharProperty*)aProperty)->get();
}
release();
TRACE("Value=" << (int)ret)
TRACE("Session::getChar - end")
return ret;
}
void Session::setChar(const char* thePropertyName,char theValue)
{
TRACE("Session::setChar - start")
TRACE("Property=" << thePropertyName)
TRACE("Value=" << (int)theValue)
itsModified=true;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if (aProperty == NULL)
{
TRACE("Add new Property in ListProperty")
CharProperty* aProperty=new CharProperty(thePropertyName);
aProperty->set(theValue);
itsPropertiesList.add(aProperty);
}
else
{
TRACE("Property already stored in ListProperty")
if(aProperty->is(PROPERTY_CHAR))
((CharProperty*)aProperty)->set(theValue);
}
if(itsAutoCommit)
replication();
release();
TRACE("Session::setChar - end")
}
short Session::getShortInt(const char* thePropertyName)
{
TRACE("Session::getShortInt - start")
TRACE("Property=" << thePropertyName)
short ret;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if(aProperty!=NULL)
{
TRACE("Property found in ListProperty")
if(aProperty->is(PROPERTY_SHORTINT))
ret=((ShortIntProperty*)aProperty)->get();
}
release();
TRACE("Value=" << ret)
TRACE("Session::getShortInt - end")
return ret;
}
void Session::setShortInt(const char* thePropertyName,short theValue)
{
TRACE("Session::setShortInt - start")
TRACE("Property=" << thePropertyName)
itsModified=true;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if (aProperty == NULL)
{
TRACE("Add new Property in ListProperty")
ShortIntProperty* aProperty=new ShortIntProperty(thePropertyName);
aProperty->set(theValue);
itsPropertiesList.add(aProperty);
}
else
{
TRACE("Property already stored in ListProperty")
if(aProperty->is(PROPERTY_SHORTINT))
((ShortIntProperty*)aProperty)->set(theValue);
}
if(itsAutoCommit)
replication();
release();
TRACE("Session::setShortInt - end")
}
long Session::getLong(const char* thePropertyName)
{
TRACE("Session::getLong - start")
TRACE("Property=" << thePropertyName)
long ret=0;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if(aProperty!=NULL)
{
TRACE("Property found in ListProperty")
if(aProperty->is(PROPERTY_LONGINT))
ret=((LongIntProperty*)aProperty)->get();
}
release();
TRACE("Value=" << ret)
TRACE("Session::getLong - end")
return ret;
}
void Session::setLong(const char* thePropertyName,long theValue)
{
TRACE("Session::setLong - start")
TRACE("Property=" << thePropertyName)
TRACE("Value=" << theValue)
itsModified=true;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if (aProperty == NULL)
{
TRACE("Add new Property in ListProperty")
LongIntProperty* aProperty=new LongIntProperty(thePropertyName);
aProperty->set(theValue);
itsPropertiesList.add(aProperty);
}
else
{
TRACE("Property already stored in ListProperty")
if(aProperty->is(PROPERTY_LONGINT))
((LongIntProperty*)aProperty)->set(theValue);
}
if(itsAutoCommit)
replication();
release();
TRACE("Session::setLong - end")
}
string Session::getString(const char* thePropertyName)
{
TRACE("Session::getString - start")
TRACE("Property=" << thePropertyName)
string ret;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if(aProperty!=NULL)
{
TRACE("Property found in ListProperty")
if(aProperty->is(PROPERTY_STRING))
ret=((StringProperty*)aProperty)->get();
}
release();
TRACE("Value=" << ret)
TRACE("Session::getString - end")
return ret;
}
void Session::setString(const char* thePropertyName,const char* theValue)
{
TRACE("Session::setString - start")
TRACE("Property=" << thePropertyName)
TRACE("Value=" << theValue)
itsModified=true;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if (aProperty == NULL)
{
TRACE("Add new Property in ListProperty")
StringProperty* aProperty=new StringProperty(thePropertyName);
aProperty->set(theValue);
itsPropertiesList.add(aProperty);
}
else
{
TRACE("Property already stored in ListProperty")
if(aProperty->is(PROPERTY_STRING))
((StringProperty*)aProperty)->set(theValue);
}
if(itsAutoCommit)
replication();
release();
TRACE("Session::setString - end")
}
ListProperty* Session::getList(const char* thePropertyName) //++v1.3
{
TRACE("Session::getList - start")
TRACE("Property=" << thePropertyName)
ListProperty* ret=NULL;
wait();
Property* aProperty=itsPropertiesList.get(thePropertyName);
if(aProperty!=NULL)
{
TRACE("Property found in ListProperty")
if(aProperty->is(PROPERTY_LIST))
ret=(ListProperty*)aProperty;
}
release();
TRACE("Session::getList - end")
return ret;
}
void Session::setList(const char* thePropertyName,ListProperty* theValue) //++v1.3
{
TRACE("Session::setList - start")
TRACE("Property=" << thePropertyName)
itsModified=true;
wait();
itsPropertiesList.remove(thePropertyName);
itsPropertiesList.add(theValue);
if(itsAutoCommit)
replication();
release();
TRACE("Session::setList - end")
}
void Session::commit()
{
TRACE("Session::commit - start")
wait();
if(itsModified)
replication();
release();
TRACE("Session::commit - end")
}
void Session::replication()
{
TRACE("Session::replication - start")
if(itsReplicaList.size()>0)
{
TRACE("Found " << itsReplicaList.size() << " replications host(s)")
ostrstream aStream;
itsPropertiesList.serialize(aStream);
TRACE("Serialization done")
string aString;
int aLen=aStream.pcount();
char* aBuffer=aStream.str();
DUMP("Session::replication Tx buffer",aBuffer,aLen);
aString.assign(aBuffer,aLen);
delete [] aBuffer;
TRACE("Sending to all replication host")
for(vector<ReplicationHost*>::iterator i = itsReplicaList.begin(); i < itsReplicaList.end(); ++i)
{
ReplicationHost* aClient=*i;
aClient->sendMessage(aString);
}
}
itsModified=false;
TRACE("Session::replication - end")
}
string Session::service(string theBuffer)
{
TRACE("Session::service - start")
DUMP("Session::service Rx buffer",(char*)theBuffer.data(),theBuffer.length());
istrstream aStream(theBuffer.data(),theBuffer.length());
wait();
itsPropertiesList.deserialize(aStream,true);
release();
TRACE("Session::service - end")
return "";
}
bool Session::store(const char* theFileName)
{
TRACE("Session::store - start")
TRACE("File name=" << theFileName)
bool ret=false;
try
{
ofstream aStream(theFileName);
wait();
itsPropertiesList.serialize(aStream);
release();
aStream.close();
ret=true;
}
catch(...)
{
release();
TRACE("Exception during storing of file " << theFileName)
}
TRACE("Session::store - end")
return ret;
}
bool Session::load(const char* theFileName)
{
TRACE("Session::load - start")
TRACE("File name=" << theFileName)
bool ret=false;
try
{
ifstream aStream(theFileName);
if(!aStream==false)
{
wait();
itsPropertiesList.free();
TRACE("Repository freed")
itsPropertiesList.deserialize(aStream,true);
TRACE("Deserialization done")
replication();
TRACE("Replication done")
release();
aStream.close();
ret=true;
}
else
{
TRACE("File not found")
}
}
catch(...)
{
release();
TRACE("Exception during loading of file " << theFileName)
}
TRACE("Session::load - end")
return ret;
}
StatefulServer::StatefulServer(const char* theName) : Server(theName)
{
TRACE("StatefulServer::StatefulServer - start")
ostrstream aStream;
aStream << "Session(" << getName() << ")" << ends;
char* aString=aStream.str();
itsSession=new Session(aString);
delete [] aString;
TRACE("StatefulServer::StatefulServer - end")
}
StatefulServer::~StatefulServer()
{
TRACE("StatefulServer::~StatefulServer - start")
if(!isShuttingDown())
delete itsSession;
TRACE("StatefulServer::~StatefulServer - end")
}
void StatefulServer::addReplicationHost(char* theHostName,int thePort)
{
TRACE("StatefulServer::addReplicationHost - start")
itsSession->addReplicationHost(theHostName,thePort);
TRACE("StatefulServer::addReplicationHost - end")
}
syntax highlighted by Code2HTML, v. 0.9.1