/*
* Ascent MMORPG Server
* Copyright (C) 2005-2007 Ascent Team
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
*/
#include "../DatabaseEnv.h"
#ifdef DATABASE_SUPPORT_PGSQL
PostgreDatabase::PostgreDatabase() : Database(DATABASE_TYPE_PGSQL)
{
Connections = NULL;
InUseMarkers = NULL;
QueryBuffer = NULL;
mConnectionCount = -1; // Not connected.
mNextPing = getMSTime();
mQueryThread = NULL;
}
PostgreDatabase::~PostgreDatabase()
{
// Close connections..
for(int32 i = 0; i < mConnectionCount; ++i)
{
if(Connections)
if(Connections[i])
Disconnect(i);
delete [] QueryBuffer[i];
}
delete [] Connections;
delete [] InUseMarkers;
delete [] QueryBuffer;
delete [] DelayedQueryBuffer;
}
bool PostgreDatabase::Initialize(const char* Hostname, unsigned int port, const char* Username, const char* Password, const char* DatabaseName, uint32 ConnectionCount, uint32 BufferSize)
{
mConnectionCount = ConnectionCount;
// Build the connection string
stringstream ss;
ss << "host = " << Hostname << " port = " << port << " ";
ss << "user = '" << Username << "' ";
if(strlen(Password) > 0)
ss << "password = '" << Password << "' ";
ss << "dbname = '" << DatabaseName << "'";
mConnectionString = ss.str();
Connections = new PGconn*[mConnectionCount];
InUseMarkers = new bool[mConnectionCount];
QueryBuffer = new char*[mConnectionCount];
DelayedQueryBuffer = new char[BufferSize];
for(int i = 0; i < mConnectionCount; ++i)
{
Connections[i] = NULL;
InUseMarkers[i] = false;
QueryBuffer[i] = new char[BufferSize];
}
bool result = Connect();
if(!result) return false;
if(result && mConnectionCount > 1)
{
// Spawn MySQLDatabase thread
//ZThread::Thread t(new PostgreDatabaseThread(this));
//sLog.outString("sql: Spawned delayed MySQLDatabase query thread...");
}
return result;
}
bool PostgreDatabase::Connect()
{
sLog.outString("Connecting to PostgreSQL Database with [%s]", mConnectionString.c_str());
for(uint32 i = 0; i < mConnectionCount; ++i)
{
if(!Connect(i))
return false;
}
//sLog.outString("sql: %u MySQL connections established.", mConnectionCount);
return true;
}
bool PostgreDatabase::Connect(uint32 ConnectionIndex)
{
Connections[ConnectionIndex] = PQconnectdb(mConnectionString.c_str());
if(Connections[ConnectionIndex] == 0) return false;
if(PQstatus(Connections[ConnectionIndex]) != CONNECTION_OK)
{
// failed for some reason
sLog.outError("PostgreSQL connection failed: %s", PQerrorMessage(Connections[ConnectionIndex]));
// free the memory
Disconnect(ConnectionIndex);
return false;
}
return true;
}
bool PostgreDatabase::Disconnect(uint32 ConnectionIndex)
{
if(Connections[ConnectionIndex] == 0) return false;
PQfinish(Connections[ConnectionIndex]);
Connections[ConnectionIndex] = 0;
return true;
}
uint32 PostgreDatabase::GetConnection()
{
int32 index = -1;
while(index == -1)
{
for(uint32 i = 0; i < mConnectionCount; ++i)
{
if(Connections[i] && InUseMarkers[i] == false)
{
index = i;
break;
}
}
Sleep(5);
}
return index;
}
void PostgreDatabase::Shutdown()
{
sLog.outString("sql: Closing all PostgreSQL connections...");
for(uint32 i = 0; i < mConnectionCount; ++i)
Disconnect(i);
sLog.outString("sql: %u connections closed.", mConnectionCount);
}
PGresult * PostgreDatabase::SendQuery(uint32 ConnectionIndex, const char* Sql, bool Self)
{
PGresult * res = PQexec(Connections[ConnectionIndex], Sql);
return res;
}
QueryResult * PostgreDatabase::Query(const char* QueryString, ...)
{
if(QueryString == NULL) return NULL;
va_list vlist;
va_start(vlist, QueryString);
mSearchMutex.acquire();
// Find a free connection
uint32 i = GetConnection();
// Mark the connection as busy
InUseMarkers[i] = true;
mSearchMutex.release();
// Apply parameters
vsprintf(QueryBuffer[i], QueryString, vlist);
va_end(vlist);
// Send the query
PGresult * res = SendQuery(i, QueryBuffer[i], false);
InUseMarkers[i] = false;
// Get the error code
ExecStatusType result = PQresultStatus(res);
if(result != PGRES_TUPLES_OK)
{
sLog.outError("Query failed: %s", PQresultErrorMessage(res));
// command failed.
PQclear(res);
return 0;
}
// Better check the row count.. we don't want to return an empty query..
if(PQntuples(res) == 0)
{
// oh noes!
PQclear(res);
return 0;
}
// get number of columns
uint32 FieldCount = PQnfields(res);
// get number of rows
uint32 RowCount = PQntuples(res);
// Create the QueryResult
PostgreQueryResult * qResult = new PostgreQueryResult(res, FieldCount, RowCount);
return qResult;
}
bool PostgreDatabase::Execute(const char* QueryString, ...)
{
if(QueryString == NULL) return false;
va_list vlist;
va_start(vlist, QueryString);
if(mQueryThread == 0)
{
DelayedQueryBufferMutex.acquire();
vsprintf(DelayedQueryBuffer, QueryString, vlist);
bool res = WaitExecute(DelayedQueryBuffer);
DelayedQueryBufferMutex.release();
return res;
}
/*DelayedQueryBufferMutex.acquire();
vsprintf(DelayedQueryBuffer, QueryString, vlist);
mQueryThread->AddQuery(DelayedQueryBuffer);
DelayedQueryBufferMutex.release();*/
return false;
}
bool PostgreDatabase::WaitExecute(const char* QueryString, ...)
{
if(QueryString == NULL) return false;
va_list vlist;
va_start(vlist, QueryString);
mSearchMutex.acquire();
uint32 Connection = GetConnection();
InUseMarkers[Connection] = true;
mSearchMutex.release();
vsprintf(QueryBuffer[Connection], QueryString, vlist);
PGresult * res = SendQuery(Connection, QueryBuffer[Connection], false);
if(res == 0) return false;
InUseMarkers[Connection] = false;
ExecStatusType result = PQresultStatus(res);
bool passed = false;
if(result == PGRES_TUPLES_OK || result == PGRES_COMMAND_OK)
passed = true;
else
sLog.outError("Execute failed because of [%s]", PQresultErrorMessage(res));
// free up the memory
PQclear(res);
return passed;
}
PostgreQueryResult::PostgreQueryResult(PGresult * res, uint32 FieldCount, uint32 RowCount) : QueryResult(FieldCount, RowCount, DATABASE_TYPE_PGSQL)
{
// set result for later deletion and use
mResult = res;
// starting at row 0
mRow = 0;
// retreieve the data
NextRow();
}
PostgreQueryResult::~PostgreQueryResult()
{
}
void PostgreQueryResult::Destroy()
{
PQclear(mResult);
mResult = 0;
}
bool PostgreQueryResult::NextRow()
{
// check if we reached the end
if(mRow == mRowCount) return false;
// get each field and set it in result
char * value;
for(uint32 i = 0; i < mFieldCount; ++i)
{
value = PQgetvalue(mResult, mRow, i);
if(value == 0) return false;
mCurrentRow[i].SetValue(value);
}
mRow++;
return true;
}
void PostgreDatabase::CheckConnections()
{
// Check every 30 seconds (TODO: MAKE CUSTOMIZABLE)
if(getMSTime() < mNextPing)
return;
mNextPing = getMSTime() + 60000;
for(uint32 i = 0; i < mConnectionCount; ++i)
{
if(Connections[i] != 0 && PQstatus(Connections[i]) != CONNECTION_OK)
{
// disconnect and reconnect
Disconnect(i);
Connect(i);
}
}
}
#endif