//-< DATABASE.CPP >--------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *
// Last update: 14-Jan-99 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// Database memory management, query execution, scheme evaluation
//-------------------------------------------------------------------*--------*
// #define FASTDB_VERBOSE_LOGGING
#define INSIDE_FASTDB
#include <ctype.h>
#include <math.h>
#include <sys/stat.h>
#include "fastdb.h"
#include "compiler.h"
#include "hashtab.h"
#include "ttree.h"
#include "symtab.h"
#include "os/OsFS.h"
#include "os/OsSysLog.h"
// Define STRING_COMPARE to be the function that does comparisons the way we want.
#ifdef USE_LOCALE_SETTINGS
#ifdef IGNORE_CASE
#define STRING_COMPARE(x, y) stricoll(x, y)
#else
#define STRING_COMPARE(x, y) strcoll(x, y)
#endif
#else
#ifdef IGNORE_CASE
#include <os/OsDefs.h>
#define STRING_COMPARE(x, y) strcasecmp(x, y)
#else
#define STRING_COMPARE(x, y) strcmp(x, y)
#endif
#endif
#ifdef IGNORE_CASE
#define GET_CHAR(c) toupper((byte)(c))
#else
#define GET_CHAR(c) (c)
#endif
dbNullReference null;
char const* const dbMetaTableName = "Metatable";
unsigned dbDatabase::dbParallelScanThreshold = 1000;
/**
* Helper method to return the process id
*
* @return
*/
int
getProcessID()
{
return static_cast< int >
#ifdef WIN32
(GetCurrentProcessId());
#else
(getpid());
#endif
}
size_t dbDatabase::internalObjectSize[] = {
0,
dbPageSize,
sizeof(dbTtree),
sizeof(dbTtreeNode),
sizeof(dbHashTable),
sizeof(dbHashTableItem),
dbPageSize
};
int dbDatabase::getVersion()
{
return FASTDB_VERSION;
}
inline void convertIntToString(dbInheritedAttribute& iattr,
dbSynthesizedAttribute& sattr)
{
char buf[32];
sattr.array.size = sprintf(buf, INT8_FORMAT, sattr.ivalue) + 1;
sattr.array.base = dbStringValue::create(buf, iattr);
}
inline void convertRealToString(dbInheritedAttribute& iattr,
dbSynthesizedAttribute& sattr)
{
char buf[32];
sattr.array.size = sprintf(buf, "%f", sattr.fvalue) + 1;
sattr.array.base = dbStringValue::create(buf, iattr);
}
inline void concatenateStrings(dbInheritedAttribute& iattr,
dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
char* str =
dbStringValue::create(sattr.array.size + sattr.array.size - 1, iattr);
memcpy(str, sattr.array.base, sattr.array.size-1);
memcpy(str + sattr.array.size - 1, sattr2.array.base, sattr2.array.size);
sattr.array.base = str;
sattr.array.size += sattr2.array.size-1;
}
inline int compareStringsForEquality(dbSynthesizedAttribute& sattr1,
dbSynthesizedAttribute& sattr2)
{
return STRING_COMPARE(sattr1.array.base, sattr2.array.base);
}
inline int compareStrings(dbSynthesizedAttribute& sattr1,
dbSynthesizedAttribute& sattr2)
{
return STRING_COMPARE(sattr1.array.base, sattr2.array.base);
}
inline bool matchStrings(dbSynthesizedAttribute& sattr1,
dbSynthesizedAttribute& sattr2,
char escapeChar)
{
char *str = sattr1.array.base;
char *pattern = sattr2.array.base;
char *wildcard = NULL;
char *strpos = NULL;
while ( true ) {
int ch = GET_CHAR(*str);
if ( *pattern == dbMatchAnySubstring ) {
wildcard = ++pattern;
strpos = str;
} else if ( ch == '\0' ) {
return(*pattern == '\0');
} else if ( *pattern == escapeChar && GET_CHAR(pattern[1]) == ch ) {
str += 1;
pattern += 2;
} else if ( *pattern != escapeChar
&& (ch == GET_CHAR(*pattern)
|| *pattern == dbMatchAnyOneChar) )
{
str += 1;
pattern += 1;
} else if ( wildcard ) {
str = ++strpos;
pattern = wildcard;
} else {
return false;
}
}
}
inline bool matchStrings(dbSynthesizedAttribute& sattr1,
dbSynthesizedAttribute& sattr2)
{
char *str = sattr1.array.base;
char *pattern = sattr2.array.base;
char *wildcard = NULL;
char *strpos = NULL;
while ( true ) {
int ch = GET_CHAR(*str);
if ( *pattern == dbMatchAnySubstring ) {
wildcard = ++pattern;
strpos = str;
} else if ( ch == '\0' ) {
return(*pattern == '\0');
} else if ( ch == GET_CHAR(*pattern) || *pattern == dbMatchAnyOneChar ) {
str += 1;
pattern += 1;
} else if ( wildcard ) {
str = ++strpos;
pattern = wildcard;
} else {
return false;
}
}
}
inline void lowercaseString(dbInheritedAttribute& iattr,
dbSynthesizedAttribute& sattr)
{
char *dst = dbStringValue::create(sattr.array.size, iattr);
char *src = sattr.array.base;
sattr.array.base = dst;
while ( (*dst++ = tolower(byte(*src++))) != '\0' );
}
inline void uppercaseString(dbInheritedAttribute& iattr,
dbSynthesizedAttribute& sattr)
{
char *dst = dbStringValue::create(sattr.array.size, iattr);
char *src = sattr.array.base;
sattr.array.base = dst;
while ( (*dst++ = toupper(byte(*src++))) != '\0' );
}
inline void copyString(dbInheritedAttribute& iattr,
dbSynthesizedAttribute& sattr, char* str)
{
sattr.array.base = dbStringValue::create(str, iattr);
sattr.array.size = strlen(str) + 1;
delete[] str;
}
inline void searchArrayOfBool(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
bool *p = (bool*)sattr2.array.base;
int n = sattr2.array.size;
bool v = (bool)sattr.bvalue;
while ( --n >= 0 ) {
if ( v == *p++ ) {
sattr.bvalue = true;
return;
}
}
sattr.bvalue = false;
}
inline void searchArrayOfInt1(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
int1 *p = (int1*)sattr2.array.base;
int n = sattr2.array.size;
int1 v = (int1)sattr.ivalue;
while ( --n >= 0 ) {
if ( v == *p++ ) {
sattr.bvalue = true;
return;
}
}
sattr.bvalue = false;
}
inline void searchArrayOfInt2(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
int2 *p = (int2*)sattr2.array.base;
int n = sattr2.array.size;
int2 v = (int2)sattr.ivalue;
while ( --n >= 0 ) {
if ( v == *p++ ) {
sattr.bvalue = true;
return;
}
}
sattr.bvalue = false;
}
inline void searchArrayOfInt4(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
int4 *p = (int4*)sattr2.array.base;
int n = sattr2.array.size;
int4 v = (int4)sattr.ivalue;
while ( --n >= 0 ) {
if ( v == *p++ ) {
sattr.bvalue = true;
return;
}
}
sattr.bvalue = false;
}
inline void searchArrayOfInt8(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
db_int8 *p = (db_int8*)sattr2.array.base;
int n = sattr2.array.size;
db_int8 v = sattr.ivalue;
while ( --n >= 0 ) {
if ( v == *p ) {
sattr.bvalue = true;
return;
}
p += 1;
}
sattr.bvalue = false;
}
inline void searchArrayOfReal4(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
real4* p = (real4*)sattr2.array.base;
int n = sattr2.array.size;
real4 v = (real4)sattr.fvalue;
while ( --n >= 0 ) {
if ( v == *p++ ) {
sattr.bvalue = true;
return;
}
}
sattr.bvalue = false;
}
inline void searchArrayOfReal8(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
real8 *p = (real8*)sattr2.array.base;
int n = sattr2.array.size;
real8 v = sattr.fvalue;
while ( --n >= 0 ) {
if ( v == *p ) {
sattr.bvalue = true;
return;
}
p += 1;
}
sattr.bvalue = false;
}
inline void searchArrayOfReference(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
oid_t *p = (oid_t*)sattr2.array.base;
int n = sattr2.array.size;
oid_t v = sattr.oid;
while ( --n >= 0 ) {
if ( v == *p ) {
sattr.bvalue = true;
return;
}
p += 1;
}
sattr.bvalue = false;
}
inline void searchArrayOfString(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
dbVarying *p = (dbVarying*)sattr2.array.base;
int n = sattr2.array.size;
char* str = sattr.array.base;
char* base = (char*)sattr2.base;
while ( --n >= 0 ) {
if ( strcmp(base + p->offs, str) == 0 ) {
sattr.bvalue = true;
return;
}
p += 1;
}
sattr.bvalue = false;
}
inline void searchInString(dbSynthesizedAttribute& sattr,
dbSynthesizedAttribute& sattr2)
{
if ( sattr.array.size > sattr2.array.size ) {
sattr.bvalue = false;
} else if ( sattr2.array.size > dbBMsearchThreshold ) {
int len = sattr.array.size - 2;
int n = sattr2.array.size - 1;
int i, j, k;
int shift[256];
byte* pattern = (byte*)sattr.array.base;
byte* str = (byte*)sattr2.array.base;
for ( i = 0; i < (int)itemsof(shift); i++ ) {
shift[i] = len+1;
}
for ( i = 0; i < len; i++ ) {
shift[pattern[i]] = len-i;
}
for ( i = len; i < n; i += shift[str[i]] ) {
j = len;
k = i;
while ( pattern[j] == str[k] ) {
k -= 1;
if ( --j < 0 ) {
sattr.bvalue = true;
return;
}
}
}
sattr.bvalue = false;
} else {
sattr.bvalue = strstr(sattr2.array.base, sattr.array.base) != NULL;
}
}
inline db_int8 powerIntInt(db_int8 x, db_int8 y)
{
db_int8 res = 1;
if ( y < 0 ) {
x = 1/x;
y = -y;
}
while ( y != 0 ) {
if ( y & 1 ) {
res *= x;
}
x *= x;
y >>= 1;
}
return res;
}
inline real8 powerRealInt(real8 x, db_int8 y)
{
real8 res = 1.0;
if ( y < 0 ) {
x = 1/x;
y = -y;
}
while ( y != 0 ) {
if ( y & 1 ) {
res *= x;
}
x *= x;
y >>= 1;
}
return res;
}
bool dbDatabase::evaluate(dbExprNode* expr, oid_t oid, dbTable* table, dbAnyCursor* cursor)
{
dbInheritedAttribute iattr;
dbSynthesizedAttribute sattr;
iattr.db = this;
iattr.oid = oid;
iattr.table = table;
iattr.record = (byte*)getRow(oid);
iattr.paramBase = (size_t)cursor->paramBase;
execute(expr, iattr, sattr);
return sattr.bvalue != 0;
}
void _fastcall dbDatabase::execute(dbExprNode* expr,
dbInheritedAttribute& iattr,
dbSynthesizedAttribute& sattr)
{
dbSynthesizedAttribute sattr2, sattr3;
switch ( expr->cop ) {
case dbvmVoid:
sattr.bvalue = true; // empty condition
return;
case dbvmCurrent:
sattr.oid = iattr.oid;
return;
case dbvmFirst:
sattr.oid = iattr.table->firstRow;
return;
case dbvmLast:
sattr.oid = iattr.table->lastRow;
return;
case dbvmLoadBool:
execute(expr->operand[0], iattr, sattr);
sattr.bvalue = *(bool*)(sattr.base+expr->offs);
return;
case dbvmLoadInt1:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = *(int1*)(sattr.base+expr->offs);
return;
case dbvmLoadInt2:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = *(int2*)(sattr.base+expr->offs);
return;
case dbvmLoadInt4:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = *(int4*)(sattr.base+expr->offs);
return;
case dbvmLoadInt8:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = *(db_int8*)(sattr.base+expr->offs);
return;
case dbvmLoadReal4:
execute(expr->operand[0], iattr, sattr);
sattr.fvalue = *(real4*)(sattr.base+expr->offs);
return;
case dbvmLoadReal8:
execute(expr->operand[0], iattr, sattr);
sattr.fvalue = *(real8*)(sattr.base+expr->offs);
return;
case dbvmLoadReference:
execute(expr->operand[0], iattr, sattr);
sattr.oid = *(oid_t*)(sattr.base+expr->offs);
return;
case dbvmLoadArray:
case dbvmLoadString:
execute(expr->operand[0], iattr, sattr2);
sattr.array.base = (char*)sattr2.base
+ ((dbVarying*)(sattr2.base + expr->offs))->offs;
sattr.array.size = ((dbVarying*)(sattr2.base + expr->offs))->size;
return;
case dbvmLoadRawBinary:
execute(expr->operand[0], iattr, sattr);
sattr.raw = (void*)(sattr.base+expr->offs);
return;
case dbvmLoadSelfBool:
sattr.bvalue = *(bool*)(iattr.record+expr->offs);
return;
case dbvmLoadSelfInt1:
sattr.ivalue = *(int1*)(iattr.record+expr->offs);
return;
case dbvmLoadSelfInt2:
sattr.ivalue = *(int2*)(iattr.record+expr->offs);
return;
case dbvmLoadSelfInt4:
sattr.ivalue = *(int4*)(iattr.record+expr->offs);
return;
case dbvmLoadSelfInt8:
sattr.ivalue = *(db_int8*)(iattr.record+expr->offs);
return;
case dbvmLoadSelfReal4:
sattr.fvalue = *(real4*)(iattr.record+expr->offs);
return;
case dbvmLoadSelfReal8:
sattr.fvalue = *(real8*)(iattr.record+expr->offs);
return;
case dbvmLoadSelfReference:
sattr.oid = *(oid_t*)(iattr.record+expr->offs);
return;
case dbvmLoadSelfArray:
case dbvmLoadSelfString:
sattr.array.base = (char*)iattr.record +
((dbVarying*)(iattr.record + expr->offs))->offs;
sattr.array.size = ((dbVarying*)(iattr.record + expr->offs))->size;
return;
case dbvmLoadSelfRawBinary:
sattr.raw = (void*)(iattr.record+expr->offs);
return;
case dbvmInvokeMethodBool:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr.bvalue);
sattr.bvalue = *(bool*)&sattr.bvalue;
return;
case dbvmInvokeMethodInt1:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr.ivalue);
sattr.ivalue = *(int1*)&sattr.ivalue;
return;
case dbvmInvokeMethodInt2:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr.ivalue);
sattr.ivalue = *(int2*)&sattr.ivalue;
return;
case dbvmInvokeMethodInt4:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr.ivalue);
sattr.ivalue = *(int4*)&sattr.ivalue;
return;
case dbvmInvokeMethodInt8:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr.ivalue);
return;
case dbvmInvokeMethodReal4:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr.fvalue);
sattr.fvalue = *(real4*)&sattr.fvalue;
return;
case dbvmInvokeMethodReal8:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr.fvalue);
return;
case dbvmInvokeMethodReference:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr.oid);
return;
case dbvmInvokeMethodString:
execute(expr->ref.base, iattr, sattr);
expr->ref.field->method->invoke(sattr.base, &sattr2.array.base);
sattr.array.size = strlen(sattr2.array.base) + 1;
sattr.array.base = dbStringValue::create(sattr2.array.base, iattr);
delete[] sattr2.array.base;
return;
case dbvmInvokeSelfMethodBool:
expr->ref.field->method->invoke(iattr.record, &sattr.bvalue);
sattr.bvalue = *(bool*)&sattr.bvalue;
return;
case dbvmInvokeSelfMethodInt1:
expr->ref.field->method->invoke(iattr.record, &sattr.ivalue);
sattr.ivalue = *(int1*)&sattr.ivalue;
return;
case dbvmInvokeSelfMethodInt2:
expr->ref.field->method->invoke(iattr.record, &sattr.ivalue);
sattr.ivalue = *(int2*)&sattr.ivalue;
return;
case dbvmInvokeSelfMethodInt4:
expr->ref.field->method->invoke(iattr.record, &sattr.ivalue);
sattr.ivalue = *(int4*)&sattr.ivalue;
return;
case dbvmInvokeSelfMethodInt8:
expr->ref.field->method->invoke(iattr.record, &sattr.ivalue);
return;
case dbvmInvokeSelfMethodReal4:
expr->ref.field->method->invoke(iattr.record, &sattr.fvalue);
sattr.fvalue = *(real4*)&sattr.fvalue;
return;
case dbvmInvokeSelfMethodReal8:
expr->ref.field->method->invoke(iattr.record, &sattr.fvalue);
return;
case dbvmInvokeSelfMethodReference:
expr->ref.field->method->invoke(iattr.record, &sattr.oid);
return;
case dbvmInvokeSelfMethodString:
expr->ref.field->method->invoke(iattr.record, &sattr2.array.base);
sattr.array.size = strlen(sattr2.array.base) + 1;
sattr.array.base = dbStringValue::create(sattr2.array.base, iattr);
delete[] sattr2.array.base;
return;
case dbvmLength:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = sattr.array.size;
return;
case dbvmStringLength:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = sattr.array.size - 1;
return;
case dbvmGetAt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( (nat8)sattr2.ivalue >= (nat8)sattr.array.size ) {
if ( expr->operand[1]->cop == dbvmVariable ) {
longjmp(iattr.exists_iterator[expr->operand[1]->offs].unwind, 1);
}
iattr.removeTemporaries();
iattr.db->handleError(IndexOutOfRangeError, NULL,
int(sattr2.ivalue));
}
sattr.base = (byte*)sattr.array.base + int(sattr2.ivalue)*expr->offs;
return;
case dbvmCharAt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( (nat8)sattr2.ivalue >= (nat8)(sattr.array.size-1) ) {
if ( expr->operand[1]->cop == dbvmVariable ) {
longjmp(iattr.exists_iterator[expr->operand[1]->offs].unwind, 1);
}
iattr.removeTemporaries();
iattr.db->handleError(IndexOutOfRangeError, NULL,
int(sattr2.ivalue));
}
sattr.ivalue = (byte)sattr.array.base[int(sattr2.ivalue)];
return;
case dbvmExists:
iattr.exists_iterator[expr->offs].index = 0;
if ( setjmp(iattr.exists_iterator[expr->offs].unwind) == 0 ) {
do {
execute(expr->operand[0], iattr, sattr);
iattr.exists_iterator[expr->offs].index += 1;
} while ( !sattr.bvalue );
} else {
sattr.bvalue = false;
}
return;
case dbvmVariable:
sattr.ivalue = iattr.exists_iterator[expr->offs].index;
return;
case dbvmLoadVarBool:
sattr.bvalue = *(bool*)((char*)expr->var + iattr.paramBase);
return;
case dbvmLoadVarInt1:
sattr.ivalue = *(int1*)((char*)expr->var + iattr.paramBase);
return;
case dbvmLoadVarInt2:
sattr.ivalue = *(int2*)((char*)expr->var + iattr.paramBase);
return;
case dbvmLoadVarInt4:
sattr.ivalue = *(int4*)((char*)expr->var + iattr.paramBase);
return;
case dbvmLoadVarInt8:
sattr.ivalue = *(db_int8*)((char*)expr->var + iattr.paramBase);
return;
case dbvmLoadVarReal4:
sattr.fvalue = *(real4*)((char*)expr->var + iattr.paramBase);
return;
case dbvmLoadVarReal8:
sattr.fvalue = *(real8*)((char*)expr->var + iattr.paramBase);
return;
case dbvmLoadVarReference:
sattr.oid = *(oid_t*)((char*)expr->var + iattr.paramBase);
return;
case dbvmLoadVarString:
sattr.array.base = (char*)((char*)expr->var + iattr.paramBase);
sattr.array.size = strlen((char*)sattr.array.base) + 1;
return;
case dbvmLoadVarStringPtr:
sattr.array.base = *(char**)((char*)expr->var + iattr.paramBase);
sattr.array.size = strlen((char*)sattr.array.base) + 1;
return;
case dbvmLoadVarArray:
sattr.array.base = (char*)((dbAnyArray*)((char*)expr->var + iattr.paramBase))->base();
sattr.array.size = ((dbAnyArray*)((char*)expr->var + iattr.paramBase))->length();
return;
case dbvmLoadVarArrayPtr:
{
dbAnyArray* arr = *(dbAnyArray**)((char*)expr->var + iattr.paramBase);
sattr.array.base = (char*)arr->base();
sattr.array.size = arr->length();
return;
}
case dbvmLoadVarRawBinary:
sattr.raw = (void*)((char*)expr->var + iattr.paramBase);
return;
#ifdef USE_STD_STRING
case dbvmLoadVarStdString:
sattr.array.base = (char*)((std::string*)((char*)expr->var + iattr.paramBase))->c_str();
sattr.array.size = ((std::string*)((char*)expr->var + iattr.paramBase))->length() + 1;
return;
#endif
case dbvmLoadTrue:
sattr.bvalue = true;
return;
case dbvmLoadFalse:
sattr.bvalue = false;
return;
case dbvmLoadNull:
sattr.oid = 0;
return;
case dbvmLoadIntConstant:
sattr.ivalue = expr->ivalue;
return;
case dbvmLoadRealConstant:
sattr.fvalue = expr->fvalue;
return;
case dbvmLoadStringConstant:
sattr.array.base = expr->svalue.str;
sattr.array.size = expr->svalue.len;
return;
case dbvmOrBool:
execute(expr->operand[0], iattr, sattr);
if ( sattr.bvalue == 0 ) {
execute(expr->operand[1], iattr, sattr);
}
return;
case dbvmAndBool:
execute(expr->operand[0], iattr, sattr);
if ( sattr.bvalue != 0 ) {
execute(expr->operand[1], iattr, sattr);
}
return;
case dbvmNotBool:
execute(expr->operand[0], iattr, sattr);
sattr.bvalue = !sattr.bvalue;
return;
case dbvmIsNull:
execute(expr->operand[0], iattr, sattr);
sattr.bvalue = sattr.oid == 0;
return;
case dbvmNegInt:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = -sattr.ivalue;
return;
case dbvmAddInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.ivalue += sattr2.ivalue;
return;
case dbvmSubInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.ivalue -= sattr2.ivalue;
return;
case dbvmMulInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.ivalue *= sattr2.ivalue;
return;
case dbvmDivInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( sattr2.ivalue == 0 ) {
iattr.removeTemporaries();
iattr.db->handleError(ArithmeticError, "Division by zero");
} else {
sattr.ivalue /= sattr2.ivalue;
}
return;
case dbvmAndInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.ivalue &= sattr2.ivalue;
return;
case dbvmOrInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.ivalue |= sattr2.ivalue;
return;
case dbvmNotInt:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = ~sattr.ivalue;
return;
case dbvmAbsInt:
execute(expr->operand[0], iattr, sattr);
if ( sattr.ivalue < 0 ) {
sattr.ivalue = -sattr.ivalue;
}
return;
case dbvmPowerInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( sattr.ivalue == 2 ) {
sattr.ivalue = sattr2.ivalue < 64
? (nat8)1 << (int)sattr2.ivalue : 0;
} else if ( sattr.ivalue == 0 && sattr2.ivalue < 0 ) {
iattr.removeTemporaries();
iattr.db->handleError(ArithmeticError,
"Raise zero to negative power");
} else {
sattr.ivalue = powerIntInt(sattr.ivalue, sattr2.ivalue);
}
return;
case dbvmEqInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.ivalue == sattr2.ivalue;
return;
case dbvmNeInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.ivalue != sattr2.ivalue;
return;
case dbvmGtInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.ivalue > sattr2.ivalue;
return;
case dbvmGeInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.ivalue >= sattr2.ivalue;
return;
case dbvmLtInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.ivalue < sattr2.ivalue;
return;
case dbvmLeInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.ivalue <= sattr2.ivalue;
return;
case dbvmBetweenInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( sattr.ivalue < sattr2.ivalue ) {
sattr.bvalue = false;
} else {
execute(expr->operand[2], iattr, sattr2);
sattr.bvalue = sattr.ivalue <= sattr2.ivalue;
}
return;
case dbvmNegReal:
execute(expr->operand[0], iattr, sattr);
sattr.fvalue = -sattr.fvalue;
return;
case dbvmAddReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.fvalue += sattr2.fvalue;
return;
case dbvmSubReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.fvalue -= sattr2.fvalue;
return;
case dbvmMulReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.fvalue *= sattr2.fvalue;
return;
case dbvmDivReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( sattr2.fvalue == 0.0 ) {
iattr.removeTemporaries();
iattr.db->handleError(ArithmeticError, "Division by zero");
} else {
sattr.fvalue /= sattr2.fvalue;
}
return;
case dbvmAbsReal:
execute(expr->operand[0], iattr, sattr);
if ( sattr.fvalue < 0 ) {
sattr.fvalue = -sattr.fvalue;
}
return;
case dbvmPowerReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( sattr.fvalue < 0 ) {
iattr.removeTemporaries();
iattr.db->handleError(ArithmeticError,
"Power operator returns complex result");
} else if ( sattr.fvalue == 0.0 && sattr2.fvalue < 0 ) {
iattr.removeTemporaries();
iattr.db->handleError(ArithmeticError,
"Raise zero to negative power");
} else {
sattr.fvalue = pow(sattr.fvalue, sattr2.fvalue);
}
return;
case dbvmPowerRealInt:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( sattr.fvalue == 0.0 && sattr2.ivalue < 0 ) {
iattr.removeTemporaries();
iattr.db->handleError(ArithmeticError,
"Raise zero to negative power");
} else {
sattr.fvalue = powerRealInt(sattr.fvalue, sattr2.ivalue);
}
return;
case dbvmEqReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.fvalue == sattr2.fvalue;
return;
case dbvmNeReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.fvalue != sattr2.fvalue;
return;
case dbvmGtReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.fvalue > sattr2.fvalue;
return;
case dbvmGeReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.fvalue >= sattr2.fvalue;
return;
case dbvmLtReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.fvalue < sattr2.fvalue;
return;
case dbvmLeReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.fvalue <= sattr2.fvalue;
return;
case dbvmBetweenReal:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( sattr.fvalue < sattr2.fvalue ) {
sattr.bvalue = false;
} else {
execute(expr->operand[2], iattr, sattr2);
sattr.bvalue = sattr.fvalue <= sattr2.fvalue;
}
return;
case dbvmEqBinary:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = (*(dbUDTComparator)expr->func.fptr)(sattr.raw, sattr2.raw, expr->offs) == 0;
return;
case dbvmNeBinary:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = (*(dbUDTComparator)expr->func.fptr)(sattr.raw, sattr2.raw, expr->offs) != 0;
return;
case dbvmGtBinary:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = (*(dbUDTComparator)expr->func.fptr)(sattr.raw, sattr2.raw, expr->offs) > 0;
return;
case dbvmGeBinary:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = (*(dbUDTComparator)expr->func.fptr)(sattr.raw, sattr2.raw, expr->offs) >= 0;
return;
case dbvmLtBinary:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = (*(dbUDTComparator)expr->func.fptr)(sattr.raw, sattr2.raw, expr->offs) < 0;
return;
case dbvmLeBinary:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = (*(dbUDTComparator)expr->func.fptr)(sattr.raw, sattr2.raw, expr->offs) <= 0;
return;
case dbvmBetweenBinary:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( (*(dbUDTComparator)expr->func.fptr)(sattr.raw, sattr2.raw, expr->offs) < 0 ) {
sattr.bvalue = false;
} else {
execute(expr->operand[2], iattr, sattr2);
sattr.bvalue = (*(dbUDTComparator)expr->func.fptr)(sattr.raw, sattr2.raw, expr->offs) <= 0;
}
return;
case dbvmIntToReference:
execute(expr->operand[0], iattr, sattr);
sattr.oid = (oid_t)sattr.ivalue;
return;
case dbvmIntToReal:
execute(expr->operand[0], iattr, sattr);
sattr.fvalue = (real8)sattr.ivalue;
return;
case dbvmRealToInt:
execute(expr->operand[0], iattr, sattr);
sattr.ivalue = (db_int8)sattr.fvalue;
return;
case dbvmIntToString:
execute(expr->operand[0], iattr, sattr);
convertIntToString(iattr, sattr);
return;
case dbvmRealToString:
execute(expr->operand[0], iattr, sattr);
convertRealToString(iattr, sattr);
return;
case dbvmStringConcat:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
concatenateStrings(iattr, sattr, sattr2);
return;
case dbvmUpperString:
execute(expr->operand[0], iattr, sattr);
uppercaseString(iattr, sattr);
return;
case dbvmLowerString:
execute(expr->operand[0], iattr, sattr);
lowercaseString(iattr, sattr);
return;
case dbvmEqString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = compareStrings(sattr, sattr2) == 0;
return;
case dbvmNeString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = compareStrings(sattr, sattr2) != 0;
return;
case dbvmGtString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = compareStrings(sattr, sattr2) > 0;
return;
case dbvmGeString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = compareStrings(sattr, sattr2) >= 0;
return;
case dbvmLtString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = compareStrings(sattr, sattr2) < 0;
return;
case dbvmLeString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = compareStrings(sattr, sattr2) <= 0;
return;
case dbvmLikeString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = matchStrings(sattr, sattr2);
return;
case dbvmLikeEscapeString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
execute(expr->operand[2], iattr, sattr3);
sattr.bvalue = matchStrings(sattr, sattr2, *sattr3.array.base);
return;
case dbvmBetweenString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
if ( compareStrings(sattr, sattr2) < 0 ) {
sattr.bvalue = false;
} else {
execute(expr->operand[2], iattr, sattr2);
sattr.bvalue = compareStrings(sattr, sattr2) <= 0;
}
return;
case dbvmEqBool:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.bvalue == sattr2.bvalue;
return;
case dbvmNeBool:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.bvalue != sattr2.bvalue;
return;
case dbvmEqReference:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.oid == sattr2.oid;
return;
case dbvmNeReference:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
sattr.bvalue = sattr.oid != sattr2.oid;
return;
case dbvmDeref:
execute(expr->operand[0], iattr, sattr);
if ( sattr.oid == 0 ) {
iattr.removeTemporaries();
iattr.db->handleError(NullReferenceError);
}
assert(!(iattr.db->currIndex[sattr.oid]
& (dbInternalObjectMarker|dbFreeHandleMarker)));
sattr.base = iattr.db->baseAddr + iattr.db->currIndex[sattr.oid];
return;
case dbvmFuncArg2Bool:
sattr.bvalue = (*(bool(*)(dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0));
return;
case dbvmFuncArg2Int:
sattr.ivalue = (*(db_int8(*)(dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0));
return;
case dbvmFuncArg2Real:
sattr.fvalue = (*(real8(*)(dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0));
return;
case dbvmFuncArg2Str:
copyString(iattr, sattr,
(*(char*(*)(dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0)));
return;
case dbvmFuncArgArg2Bool:
sattr.bvalue = (*(bool(*)(dbUserFunctionArgument const&, dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0),
dbUserFunctionArgument(expr, iattr, sattr, 1));
return;
case dbvmFuncArgArg2Int:
sattr.ivalue = (*(db_int8(*)(dbUserFunctionArgument const&, dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0),
dbUserFunctionArgument(expr, iattr, sattr, 1));
return;
case dbvmFuncArgArg2Real:
sattr.fvalue = (*(real8(*)(dbUserFunctionArgument const&, dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0),
dbUserFunctionArgument(expr, iattr, sattr, 1));
return;
case dbvmFuncArgArg2Str:
copyString(iattr, sattr,
(*(char*(*)(dbUserFunctionArgument const&, dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0),
dbUserFunctionArgument(expr, iattr, sattr, 1)));
return;
case dbvmFuncArgArgArg2Bool:
sattr.bvalue = (*(bool(*)(dbUserFunctionArgument const&, dbUserFunctionArgument const&, dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0),
dbUserFunctionArgument(expr, iattr, sattr, 1),
dbUserFunctionArgument(expr, iattr, sattr, 2));
return;
case dbvmFuncArgArgArg2Int:
sattr.ivalue = (*(db_int8(*)(dbUserFunctionArgument const&, dbUserFunctionArgument const&, dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0),
dbUserFunctionArgument(expr, iattr, sattr, 1),
dbUserFunctionArgument(expr, iattr, sattr, 2));
return;
case dbvmFuncArgArgArg2Real:
sattr.fvalue = (*(real8(*)(dbUserFunctionArgument const&, dbUserFunctionArgument const&, dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0),
dbUserFunctionArgument(expr, iattr, sattr, 1),
dbUserFunctionArgument(expr, iattr, sattr, 2));
return;
case dbvmFuncArgArgArg2Str:
copyString(iattr, sattr,
(*(char*(*)(dbUserFunctionArgument const&, dbUserFunctionArgument const&, dbUserFunctionArgument const&))expr->func.fptr)
(dbUserFunctionArgument(expr, iattr, sattr, 0),
dbUserFunctionArgument(expr, iattr, sattr, 1),
dbUserFunctionArgument(expr, iattr, sattr, 2)));
return;
case dbvmFuncInt2Bool:
execute(expr->func.arg[0], iattr, sattr);
sattr.bvalue = (*(bool(*)(db_int8))expr->func.fptr)(sattr.ivalue);
return;
case dbvmFuncReal2Bool:
execute(expr->func.arg[0], iattr, sattr);
sattr.bvalue = (*(bool(*)(real8))expr->func.fptr)(sattr.fvalue);
return;
case dbvmFuncStr2Bool:
execute(expr->func.arg[0], iattr, sattr);
sattr.bvalue =
(*(bool(*)(char const*))expr->func.fptr)(sattr.array.base);
return;
case dbvmFuncInt2Int:
execute(expr->func.arg[0], iattr, sattr);
sattr.ivalue = (*(db_int8(*)(db_int8))expr->func.fptr)(sattr.ivalue);
return;
case dbvmFuncReal2Int:
execute(expr->func.arg[0], iattr, sattr);
sattr.ivalue = (*(db_int8(*)(real8))expr->func.fptr)(sattr.fvalue);
return;
case dbvmFuncStr2Int:
execute(expr->func.arg[0], iattr, sattr);
sattr.ivalue =
(*(db_int8(*)(char const*))expr->func.fptr)(sattr.array.base);
return;
case dbvmFuncInt2Real:
execute(expr->func.arg[0], iattr, sattr);
sattr.fvalue = (*(real8(*)(db_int8))expr->func.fptr)(sattr.ivalue);
return;
case dbvmFuncReal2Real:
execute(expr->func.arg[0], iattr, sattr);
sattr.fvalue = (*(real8(*)(real8))expr->func.fptr)(sattr.fvalue);
return;
case dbvmFuncStr2Real:
execute(expr->func.arg[0], iattr, sattr);
sattr.fvalue =
(*(real8(*)(char const*))expr->func.fptr)(sattr.array.base);
return;
case dbvmFuncInt2Str:
execute(expr->func.arg[0], iattr, sattr);
copyString(iattr, sattr,
(*(char*(*)(db_int8))expr->func.fptr)(sattr.ivalue));
return;
case dbvmFuncReal2Str:
execute(expr->func.arg[0], iattr, sattr);
copyString(iattr, sattr,
(*(char*(*)(real8))expr->func.fptr)(sattr.fvalue));
return;
case dbvmFuncStr2Str:
execute(expr->func.arg[0], iattr, sattr);
copyString(iattr, sattr,
(*(char*(*)(char const*))expr->func.fptr)(sattr.array.base));
return;
case dbvmInArrayBool:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfBool(sattr, sattr2);
return;
case dbvmInArrayInt1:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfInt1(sattr, sattr2);
return;
case dbvmInArrayInt2:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfInt2(sattr, sattr2);
return;
case dbvmInArrayInt4:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfInt4(sattr, sattr2);
return;
case dbvmInArrayInt8:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfInt8(sattr, sattr2);
return;
case dbvmInArrayReal4:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfReal4(sattr, sattr2);
return;
case dbvmInArrayReal8:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfReal8(sattr, sattr2);
return;
case dbvmInArrayString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfString(sattr, sattr2);
return;
case dbvmInArrayReference:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchArrayOfReference(sattr, sattr2);
return;
case dbvmInString:
execute(expr->operand[0], iattr, sattr);
execute(expr->operand[1], iattr, sattr2);
searchInString(sattr, sattr2);
return;
default:
assert(false);
}
}
void dbDatabase::handleError(dbErrorClass error, char const* msg, int arg)
{
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::handleError - msg=%s, arg=%d",
msg, arg);
}
# endif
if ( errorHandler != NULL ) {
(*errorHandler)(error, msg, arg);
}
#ifdef THROW_EXCEPTION_ON_ERROR
if ( error != NoError ) {
if ( error == DatabaseOpenError ) {
fprintf(stderr, "%s\n", msg);
} else {
throw dbException(error, msg, arg);
}
}
#else
char buf[256];
switch ( error ) {
case QueryError:
OsSysLog::add(FAC_DB, PRI_ERR, "%s in position %d\n", msg, arg);
return;
case ArithmeticError:
OsSysLog::add(FAC_DB, PRI_ERR, "%s\n", msg);
break;
case IndexOutOfRangeError:
OsSysLog::add(FAC_DB, PRI_ERR, "Index %d is out of range\n", arg);
break;
case DatabaseOpenError:
OsSysLog::add(FAC_DB, PRI_ERR, "%s\n", msg);
return;
case FileError:
OsSysLog::add(FAC_DB, PRI_ERR, "%s: %s\n", msg,
dbFile::errorText(arg, buf, sizeof(buf)));
break;
case OutOfMemoryError:
OsSysLog::add(FAC_DB, PRI_ERR,"Not enough memory: failed to allocate %d bytes\n",arg);
break;
case NullReferenceError:
OsSysLog::add(FAC_DB, PRI_ERR, "Null object reference is accessed\n");
break;
case Deadlock:
OsSysLog::add(FAC_DB, PRI_ERR, "Deadlock is caused by upgrading shared locks to exclusive");
break;
case LockRevoked:
OsSysLog::add(FAC_DB, PRI_ERR, "Lock is revoked by some other client");
break;
case InconsistentInverseReference:
OsSysLog::add(FAC_DB, PRI_ERR, "%s\n", msg);
return;
case DatabaseReadOnly:
OsSysLog::add(FAC_DB, PRI_ERR, "Attempt to modify readonly database");
break;
default:
return;
}
abort();
#endif
}
void dbDatabase::initializeMetaTable()
{
static struct {
char const* name;
int type;
int size;
int offs;
} metaTableFields[] = {
{ "name", dbField::tpString, sizeof(dbVarying),
offsetof(dbTable, name)},
{ "fields", dbField::tpArray, sizeof(dbVarying),
offsetof(dbTable, fields)},
{ "fields[]", dbField::tpStructure, sizeof(dbField), 0},
{ "fields[].name", dbField::tpString, sizeof(dbVarying),
offsetof(dbField, name)},
{ "fields[].tableName",dbField::tpString,sizeof(dbVarying),
offsetof(dbField, tableName)},
{ "fields[].inverse", dbField::tpString, sizeof(dbVarying),
offsetof(dbField, inverse)},
{ "fields[].type", dbField::tpInt4, 4, offsetof(dbField, type)},
{ "fields[].offset", dbField::tpInt4, 4, offsetof(dbField, offset)},
{ "fields[].size", dbField::tpInt4, 4, offsetof(dbField, size)},
{ "fields[].hashTable", dbField::tpReference, sizeof(oid_t),
offsetof(dbField, hashTable)},
{ "fields[].tTree", dbField::tpReference, sizeof(oid_t),
offsetof(dbField, tTree)},
{ "fixedSize", dbField::tpInt4, 4, offsetof(dbTable, fixedSize)},
{ "nRows", dbField::tpInt4, 4, offsetof(dbTable, nRows)},
{ "nColumns", dbField::tpInt4, 4, offsetof(dbTable, nColumns)},
{ "firstRow", dbField::tpReference, sizeof(oid_t), offsetof(dbTable, firstRow)},
{ "lastRow", dbField::tpReference, sizeof(oid_t), offsetof(dbTable, lastRow)}
#ifdef AUTOINCREMENT_SUPPORT
,{ "count", dbField::tpInt4, 4, offsetof(dbTable, count)}
#endif
};
unsigned i;
size_t varyingSize = strlen(dbMetaTableName)+1;
for ( i = 0; i < itemsof(metaTableFields); i++ ) {
varyingSize += strlen(metaTableFields[i].name) + 3;
}
offs_t metaTableOffs = allocate(sizeof(dbTable)
+ sizeof(dbField)*itemsof(metaTableFields)
+ varyingSize);
index[0][dbMetaTableId] = metaTableOffs;
dbTable* table = (dbTable*)(baseAddr + metaTableOffs);
table->size = sizeof(dbTable) + sizeof(dbField)*itemsof(metaTableFields)
+ varyingSize;
table->next = table->prev = 0;
int offs = sizeof(dbTable) + sizeof(dbField)*itemsof(metaTableFields);
table->name.offs = offs;
table->name.size = strlen(dbMetaTableName)+1;
strcpy((char*)table + offs, dbMetaTableName);
offs += table->name.size;
table->fields.offs = sizeof(dbTable);
table->fields.size = itemsof(metaTableFields);
table->fixedSize = sizeof(dbTable);
table->nRows = 0;
table->nColumns = 5;
table->firstRow = 0;
table->lastRow = 0;
#ifdef AUTOINCREMENT_SUPPORT
table->count = 0;
#endif
dbField* field = (dbField*)((char*)table + table->fields.offs);
offs -= sizeof(dbTable);
for ( i = 0; i < itemsof(metaTableFields); i++ ) {
field->name.offs = offs;
field->name.size = strlen(metaTableFields[i].name) + 1;
strcpy((char*)field + offs, metaTableFields[i].name);
offs += field->name.size;
field->tableName.offs = offs;
field->tableName.size = 1;
*((char*)field + offs++) = '\0';
field->inverse.offs = offs;
field->inverse.size = 1;
*((char*)field + offs++) = '\0';
field->type = metaTableFields[i].type;
field->size = metaTableFields[i].size;
field->offset = metaTableFields[i].offs;
field->hashTable = 0;
field->tTree = 0;
field += 1;
offs -= sizeof(dbField);
}
}
bool dbDatabase::open(char const* dbName, char const* fiName,
time_t waitLockTimeoutMsec, time_t commitDelaySec)
{
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "Entering dbDatabase::open");
}
# endif
dbWaitLockTimeout = waitLockTimeoutMsec;
delete[] databaseName;
delete[] fileName;
commitDelay = 0;
commitTimeout = 0;
commitTimerStarted = 0;
backupFileName = NULL;
backupPeriod = 0;
opened = false;
stopDelayedCommitThread = false;
databaseNameLen = strlen(dbName);
char* name = new char[databaseNameLen+16];
sprintf(name, "%s.in", dbName);
databaseName = name;
if ( fiName == NULL ) {
fileName = new char[databaseNameLen + 5];
sprintf(fileName, "%s.fdb", dbName);
} else {
fileName = new char[strlen(fiName)+1];
strcpy(fileName, fiName);
}
dbInitializationMutex::initializationStatus status;
status = initMutex.initialize(name);
if ( status == dbInitializationMutex::InitializationError ) {
handleError(DatabaseOpenError,
"Failed to start database initialization");
return false;
}
sprintf(name, "%s.dm", dbName);
if ( !shm.open(name) ) {
handleError(DatabaseOpenError, "Failed to open database monitor");
return false;
}
monitor = shm.get();
sprintf(name, "%s.ws", dbName);
if ( !writeSem.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize database writers semaphore");
return false;
}
sprintf(name, "%s.rs", dbName);
if ( !readSem.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize database readers semaphore");
return false;
}
sprintf(name, "%s.us", dbName);
if ( !upgradeSem.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize database upgrade semaphore");
return false;
}
sprintf(name, "%s.bce", dbName);
if ( !backupCompletedEvent.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize database backup completed event");
return false;
}
if ( commitDelaySec != 0 ) {
sprintf(name, "%s.dce", dbName);
if ( !delayedCommitStopTimerEvent.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize delayed commit event");
return false;
}
delayedCommitStartTimerEvent.open();
commitThreadSyncEvent.open();
}
backupInitEvent.open();
backupFileName = NULL;
allocatedSize = 0;
size_t indexSize = initIndexSize < dbFirstUserId
? size_t(dbFirstUserId) : initIndexSize;
indexSize = DOALIGN(indexSize, dbHandlesPerPage);
size_t fileSize = initSize ? initSize : dbDefaultInitDatabaseSize;
if ( fileSize < indexSize*sizeof(offs_t)*4 ) {
fileSize = indexSize*sizeof(offs_t)*4;
}
fileSize = DOALIGN(fileSize, dbBitmapSegmentSize);
#ifdef DISKLESS_CONFIGURATION
mmapSize = fileSize;
#else
mmapSize = 0;
#endif
for ( int i = dbBitmapId + dbBitmapPages; --i >= 0; ) {
bitmapPageAvailableSpace[i] = INT_MAX;
}
currRBitmapPage = currPBitmapPage = dbBitmapId;
currRBitmapOffs = currPBitmapOffs = 0;
reservedChain = NULL;
tables = NULL;
modified = false;
threadContextList.reset();
attach();
if ( status == dbInitializationMutex::NotYetInitialized ) {
sprintf(name, "%s.cs", dbName);
if ( !cs.create(name, &monitor->sem) ) {
handleError(DatabaseOpenError,
"Failed to initialize database monitor");
return false;
}
if ( accessType == dbConcurrentUpdate ) {
sprintf(name, "%s.mcs", dbName);
if ( !mutatorCS.create(name, &monitor->mutatorSem) ) {
handleError(DatabaseOpenError,
"Failed to initialize database monitor");
return false;
}
}
readSem.reset();
writeSem.reset();
upgradeSem.reset();
monitor->nReaders = 0;
monitor->nWriters = 0;
monitor->nWaitReaders = 0;
monitor->nWaitWriters = 0;
monitor->waitForUpgrade = false;
monitor->version = version = 1;
monitor->users = 0;
monitor->backupInProgress = 0;
monitor->forceCommitCount = 0;
monitor->lastDeadlockRecoveryTime = 0;
monitor->delayedCommitContext = NULL;
monitor->concurrentTransId = 1;
monitor->commitInProgress = false;
monitor->uncommittedChanges = false;
memset(monitor->dirtyPagesMap, 0, dbDirtyPageBitmapSize);
sprintf(databaseName, "%s.%d", dbName, version);
int rc = file.open(fileName, databaseName,
accessType == dbReadOnly || accessType == dbConcurrentRead, fileSize, false);
if ( rc != dbFile::ok )
{
char msgbuf[64];
file.errorText(rc, msgbuf, sizeof msgbuf);
TRACE_MSG(("File open error: %s\n", msgbuf));
handleError(DatabaseOpenError, "Failed to create database file");
return false;
}
baseAddr = (byte*)file.getAddr();
fileSize = file.getSize();
header = (dbHeader*)baseAddr;
updatedRecordId = 0;
if ( (unsigned)header->curr > 1 ) {
handleError(DatabaseOpenError, "Database file was corrupted: "
"invalid root index");
return false;
}
if ( header->initialized != 1 ) {
if ( accessType == dbReadOnly || accessType == dbConcurrentRead ) {
handleError(DatabaseOpenError, "Can not open uninitialized "
"file in read only mode");
return false;
}
monitor->curr = header->curr = 0;
header->size = fileSize;
size_t used = dbPageSize;
header->root[0].index = used;
header->root[0].indexSize = indexSize;
header->root[0].indexUsed = dbFirstUserId;
header->root[0].freeList = 0;
used += indexSize*sizeof(offs_t);
header->root[1].index = used;
header->root[1].indexSize = indexSize;
header->root[1].indexUsed = dbFirstUserId;
header->root[1].freeList = 0;
used += indexSize*sizeof(offs_t);
header->root[0].shadowIndex = header->root[1].index;
header->root[1].shadowIndex = header->root[0].index;
header->root[0].shadowIndexSize = indexSize;
header->root[1].shadowIndexSize = indexSize;
header->majorVersion= FASTDB_MAJOR_VERSION;
header->minorVersion = FASTDB_MINOR_VERSION;
index[0] = (offs_t*)(baseAddr + header->root[0].index);
index[1] = (offs_t*)(baseAddr + header->root[1].index);
index[0][dbInvalidId] = dbFreeHandleMarker;
size_t bitmapPages =
(used + dbPageSize*(dbAllocationQuantum*8-1) - 1)
/ (dbPageSize*(dbAllocationQuantum*8-1));
memset(baseAddr+used, 0xFF, (used + bitmapPages*dbPageSize)
/ (dbAllocationQuantum*8));
size_t i;
for ( i = 0; i < bitmapPages; i++ ) {
index[0][dbBitmapId + i] = used + dbPageObjectMarker;
used += dbPageSize;
}
while ( i < dbBitmapPages ) {
index[0][dbBitmapId+i] = dbFreeHandleMarker;
i += 1;
}
currIndex = index[0];
currIndexSize = dbFirstUserId;
committedIndexSize = 0;
initializeMetaTable();
header->dirty = true;
memcpy(index[1], index[0], currIndexSize*sizeof(offs_t));
file.markAsDirty(0, used);
file.flush(true);
header->initialized = true;
file.markAsDirty(0, sizeof(dbHeader));
file.flush(true);
} else {
monitor->curr = header->curr;
if ( header->dirty ) {
TRACE_MSG(("Database was not normally closed: "
"start recovery\n"));
if ( accessType == dbReadOnly || accessType == dbConcurrentRead ) {
handleError(DatabaseOpenError,
"Can not open dirty file in read only moode");
return false;
}
recovery();
TRACE_MSG(("Recovery completed\n"));
} else {
if ( file.getSize() != header->size ) {
handleError(DatabaseOpenError, "Database file was "
"corrupted: file size in header differs "
"from actual file size");
return false;
}
}
}
if ( !loadScheme(true) ) {
return false;
}
initMutex.done();
} else {
sprintf(name, "%s.cs", dbName);
if ( !cs.open(name, &monitor->sem) ) {
handleError(DatabaseOpenError, "Failed to open shared semaphore");
return false;
}
if ( accessType == dbConcurrentUpdate ) {
sprintf(name, "%s.mcs", dbName);
if ( !mutatorCS.open(name, &monitor->mutatorSem) ) {
handleError(DatabaseOpenError, "Failed to open shared semaphore");
return false;
}
}
version = 0;
if ( !loadScheme(false) ) {
return false;
}
}
cs.enter();
monitor->users += 1;
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"dbDatabase::open - users (%d)", monitor->users );
}
# endif
cs.leave();
opened = true;
if ( commitDelaySec != 0 ) {
dbCriticalSection cs(delayedCommitStartTimerMutex);
commitTimeout = commitDelay = commitDelaySec;
commitThread.create((dbThread::thread_proc_t)delayedCommitProc, this);
commitThreadSyncEvent.wait(delayedCommitStartTimerMutex);
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::open");
}
# endif
return true;
}
void dbDatabase::scheduleBackup(char const* fileName, time_t period)
{
if ( backupFileName == NULL ) {
backupFileName = new char[strlen(fileName) + 1];
strcpy(backupFileName, fileName);
backupPeriod = period;
backupThread.create((dbThread::thread_proc_t)backupSchedulerProc, this);
}
}
void dbDatabase::backupScheduler()
{
backupThread.setPriority(dbThread::THR_PRI_LOW);
dbCriticalSection cs(backupMutex);
while ( true ) {
time_t timeout = backupPeriod;
if ( backupFileName[strlen(backupFileName)-1] != '?' ) {
struct stat st;
if ( ::stat(backupFileName, &st) == 0 ) {
time_t howOld = time(NULL) - st.st_atime;
if ( timeout < howOld ) {
timeout = 0;
} else {
timeout -= howOld;
}
}
}
backupInitEvent.wait(backupMutex, timeout*1000);
if ( backupFileName != NULL ) {
if ( backupFileName[strlen(backupFileName)-1] == '?' ) {
time_t currTime = time(NULL);
char* fileName = new char[strlen(backupFileName) + 32];
struct tm* t = localtime(&currTime);
sprintf(fileName, "%.*s-%04d.%02d.%02d_%02d.%02d.%02d",
(int)strlen(backupFileName)-1, backupFileName,
t->tm_year + 1900, t->tm_mon+1, t->tm_mday,
t->tm_hour, t->tm_min, t->tm_sec);
backup(fileName, false);
delete[] fileName;
} else {
char* newFileName = new char[strlen(backupFileName) + 5];
sprintf(newFileName,"%s.new", backupFileName);
backup(newFileName, false);
::remove(backupFileName);
::rename(newFileName, backupFileName);
delete[] newFileName;
}
} else {
return;
}
}
}
void dbDatabase::recovery()
{
int curr = header->curr;
header->size = file.getSize();
header->root[1-curr].indexUsed = header->root[curr].indexUsed;
header->root[1-curr].freeList = header->root[curr].freeList;
header->root[1-curr].index = header->root[curr].shadowIndex;
header->root[1-curr].indexSize =
header->root[curr].shadowIndexSize;
header->root[1-curr].shadowIndex = header->root[curr].index;
header->root[1-curr].shadowIndexSize =
header->root[curr].indexSize;
offs_t* dst = (offs_t*)(baseAddr+header->root[1-curr].index);
offs_t* src = (offs_t*)(baseAddr+header->root[curr].index);
currIndex = dst;
for ( int i = 0, n = header->root[curr].indexUsed; i < n; i++ ) {
if ( dst[i] != src[i] ) {
dst[i] = src[i];
file.markAsDirty(header->root[1-curr].index + i*sizeof(offs_t), sizeof(offs_t));
}
}
//
// Restore consistency of table rows l2-list
//
restoreTablesConsistency();
file.markAsDirty(0, sizeof(dbHeader));
}
void dbDatabase::restoreTablesConsistency()
{
dbTable* table = (dbTable*)getRow(dbMetaTableId);
oid_t lastId = table->lastRow;
if ( lastId != 0 ) {
dbRecord* record = getRow(lastId);
if ( record->next != 0 ) {
record->next = 0;
file.markAsDirty(currIndex[lastId], sizeof(dbTable));
}
}
oid_t tableId = table->firstRow;
while ( tableId != 0 ) {
table = (dbTable*)getRow(tableId);
lastId = table->lastRow;
if ( lastId != 0 ) {
dbRecord* record = getRow(lastId);
if ( record->next != 0 ) {
record->next = 0;
file.markAsDirty(currIndex[lastId], sizeof(dbTable));
}
}
tableId = table->next;
}
}
void dbDatabase::setConcurrency(unsigned nThreads)
{
if ( nThreads == 0 ) { // autodetect number of processors
nThreads = dbThread::numberOfProcessors();
}
if ( nThreads > dbMaxParallelSearchThreads ) {
nThreads = dbMaxParallelSearchThreads;
}
parThreads = nThreads;
}
bool dbDatabase::loadScheme(bool alter)
{
if ( !beginTransaction(accessType != dbReadOnly && accessType != dbConcurrentRead
? dbExclusiveLock : dbSharedLock) )
{
return false;
}
dbTableDescriptor *desc, *next;
dbTable* metaTable = (dbTable*)getRow(dbMetaTableId);
oid_t first = metaTable->firstRow;
oid_t last = metaTable->lastRow;
int nTables = metaTable->nRows;
oid_t tableId = first;
for ( desc = dbTableDescriptor::chain; desc != NULL; desc = next ) {
next = desc->next;
if ( desc->db != NULL && desc->db != DETACHED_TABLE && desc->db != this ) {
continue;
}
if ( desc->db == DETACHED_TABLE ) {
desc = desc->clone();
}
dbFieldDescriptor* fd;
for ( fd = desc->firstField; fd != NULL; fd = fd->nextField ) {
fd->tTree = 0;
fd->hashTable = 0;
fd->attr &= ~dbFieldDescriptor::Updated;
}
int n = nTables;
while ( --n >= 0 ) {
dbTable* table = (dbTable*)getRow(tableId);
oid_t next = table->next;
if ( strcmp(desc->name, (char*)table + table->name.offs) == 0 ) {
if ( !desc->equal(table) ) {
if ( !alter ) {
handleError(DatabaseOpenError, "Incompatible class"
" definition in application");
return false;
}
modified = true;
if ( table->nRows == 0 ) {
TRACE_MSG(("Replace definition of table '%s'\n",
desc->name));
updateTableDescriptor(desc, tableId);
} else {
reformatTable(tableId, desc);
}
} else {
linkTable(desc, tableId);
}
desc->setFlags();
break;
}
if ( tableId == last ) {
tableId = first;
} else {
tableId = next;
}
}
if ( n < 0 ) { // no match found
if ( accessType == dbReadOnly || accessType == dbConcurrentRead ) {
handleError(DatabaseOpenError, "New table definition can not "
"be added to read only database");
return false;
} else {
TRACE_MSG(("Create new table '%s' in database\n", desc->name));
addNewTable(desc);
modified = true;
}
}
if ( accessType != dbReadOnly && accessType != dbConcurrentRead ) {
if ( !addIndices(alter, desc) ) {
handleError(DatabaseOpenError, "Failed to alter indices with"
" active applications");
rollback();
return false;
}
}
}
for ( desc = tables; desc != NULL; desc = desc->nextDbTable ) {
if ( desc->cloneOf != NULL ) {
for ( dbFieldDescriptor *fd = desc->firstField; fd != NULL; fd = fd->nextField )
{
if ( fd->refTable != NULL ) {
fd->refTable = lookupTable(fd->refTable);
}
}
}
desc->checkRelationship();
}
commit();
return true;
}
void dbDatabase::reformatTable(oid_t tableId, dbTableDescriptor* desc)
{
dbTable* table = (dbTable*)putRow(tableId);
if ( desc->match(table, confirmDeleteColumns) ) {
TRACE_MSG(("New version of table '%s' is compatible with old one\n",
desc->name));
updateTableDescriptor(desc, tableId);
} else {
TRACE_MSG(("Reformat table '%s'\n", desc->name));
oid_t oid = table->firstRow;
updateTableDescriptor(desc, tableId);
while ( oid != 0 ) {
dbRecord* record = getRow(oid);
size_t size =
desc->columns->calculateNewRecordSize((byte*)record,
desc->fixedSize);
offs_t offs = currIndex[oid];
record = putRow(oid, size);
byte* dst = (byte*)record;
byte* src = baseAddr + offs;
desc->columns->convertRecord(dst, src, desc->fixedSize);
oid = record->next;
}
}
}
void dbDatabase::deleteTable(dbTableDescriptor* desc)
{
beginTransaction(dbExclusiveLock);
modified = true;
dbTable* table = (dbTable*)putRow(desc->tableId);
oid_t rowId = table->firstRow;
table->firstRow = table->lastRow = 0;
table->nRows = 0;
while ( rowId != 0 ) {
dbRecord* record = getRow(rowId);
oid_t nextId = record->next;
size_t size = record->size;
removeInverseReferences(desc, rowId);
if ( rowId < committedIndexSize && index[0][rowId] == index[1][rowId] ) {
cloneBitmap(currIndex[rowId], size);
} else {
deallocate(currIndex[rowId], size);
}
freeId(rowId);
rowId = nextId;
}
dbFieldDescriptor* fd;
for ( fd = desc->hashedFields; fd != NULL; fd = fd->nextHashedField ) {
dbHashTable::purge(this, fd->hashTable);
}
for ( fd = desc->indexedFields; fd != NULL; fd = fd->nextIndexedField ) {
dbTtree::purge(this, fd->tTree);
}
}
void dbDatabase::dropHashTable(dbFieldDescriptor* fd)
{
beginTransaction(dbExclusiveLock);
modified = true;
dbHashTable::drop(this, fd->hashTable);
fd->hashTable = 0;
fd->indexType &= ~HASHED;
dbFieldDescriptor** fpp = &fd->defTable->hashedFields;
while ( *fpp != fd ) {
fpp = &(*fpp)->nextHashedField;
}
*fpp = fd->nextHashedField;
dbTable* table = (dbTable*)putRow(fd->defTable->tableId);
dbField* field = (dbField*)((char*)table + table->fields.offs);
field[fd->fieldNo].hashTable = 0;
}
void dbDatabase::dropIndex(dbFieldDescriptor* fd)
{
beginTransaction(dbExclusiveLock);
modified = true;
dbTtree::drop(this, fd->tTree);
fd->tTree = 0;
fd->indexType &= ~INDEXED;
dbFieldDescriptor** fpp = &fd->defTable->indexedFields;
while ( *fpp != fd ) {
fpp = &(*fpp)->nextIndexedField;
}
*fpp = fd->nextIndexedField;
dbTable* table = (dbTable*)putRow(fd->defTable->tableId);
dbField* field = (dbField*)((char*)table + table->fields.offs);
field[fd->fieldNo].tTree = 0;
}
void dbDatabase::createHashTable(dbFieldDescriptor* fd)
{
beginTransaction(dbExclusiveLock);
modified = true;
dbTable* table = (dbTable*)getRow(fd->defTable->tableId);
int nRows = table->nRows;
fd->hashTable = dbHashTable::allocate(this, 2*nRows);
fd->attr &= ~dbFieldDescriptor::Updated;
fd->nextHashedField = fd->defTable->hashedFields;
fd->defTable->hashedFields = fd;
fd->indexType |= HASHED;
table = (dbTable*)putRow(fd->defTable->tableId);
dbField* field = (dbField*)((char*)table + table->fields.offs);
field[fd->fieldNo].hashTable = fd->hashTable;
for ( oid_t oid = table->firstRow; oid != 0; oid = getRow(oid)->next ) {
dbHashTable::insert(this, fd->hashTable, oid, fd->type, fd->dbsSize, fd->dbsOffs,
nRows);
}
}
void dbDatabase::createIndex(dbFieldDescriptor* fd)
{
beginTransaction(dbExclusiveLock);
modified = true;
fd->tTree = dbTtree::allocate(this);
fd->attr &= ~dbFieldDescriptor::Updated;
fd->nextIndexedField = fd->defTable->indexedFields;
fd->defTable->indexedFields = fd;
fd->indexType |= INDEXED;
dbTable* table = (dbTable*)putRow(fd->defTable->tableId);
dbField* field = (dbField*)((char*)table + table->fields.offs);
field[fd->fieldNo].tTree = fd->tTree;
for ( oid_t oid = table->firstRow; oid != 0; oid = getRow(oid)->next ) {
dbTtree::insert(this, fd->tTree, oid, fd->type, fd->dbsSize, fd->comparator, fd->dbsOffs);
}
}
void dbDatabase::dropTable(dbTableDescriptor* desc)
{
deleteTable(desc);
freeRow(dbMetaTableId, desc->tableId);
dbFieldDescriptor* fd;
for ( fd = desc->hashedFields; fd != NULL; fd = fd->nextHashedField ) {
dbHashTable::drop(this, fd->hashTable);
}
for ( fd = desc->indexedFields; fd != NULL; fd = fd->nextIndexedField ) {
dbTtree::drop(this, fd->tTree);
}
}
bool dbDatabase::addIndices(bool alter, dbTableDescriptor* desc)
{
dbFieldDescriptor* fd;
oid_t tableId = desc->tableId;
dbTable* table = (dbTable*)getRow(tableId);
int nRows = table->nRows;
oid_t firstId = table->firstRow;
int nNewIndices = 0;
int nDelIndices = 0;
for ( fd = desc->firstField; fd != NULL; fd = fd->nextField ) {
if ( (fd->indexType & HASHED) && fd->type != dbField::tpStructure ) {
if ( fd->hashTable == 0 ) {
if ( !alter && tableId < committedIndexSize
&& index[0][tableId] == index[1][tableId] )
{
// If there are some other active applications which
// can use this table, then they will not know
// about newly created index, which leads to inconsistency
return false;
}
fd->hashTable = dbHashTable::allocate(this, nRows);
nNewIndices += 1;
TRACE_MSG(("Create hash table for field '%s'\n", fd->name));
}
} else if ( fd->hashTable != 0 ) {
if ( alter ) {
TRACE_MSG(("Remove hash table for field '%s'\n", fd->name));
nDelIndices += 1;
fd->hashTable = 0;
} else {
return false;
}
}
if ( (fd->indexType & INDEXED) && fd->type != dbField::tpStructure ) {
if ( fd->tTree == 0 ) {
if ( !alter && tableId < committedIndexSize
&& index[0][tableId] == index[1][tableId] )
{
return false;
}
fd->tTree = dbTtree::allocate(this);
nNewIndices += 1;
TRACE_MSG(("Create index for field '%s'\n", fd->name));
}
} else if ( fd->tTree != 0 ) {
if ( alter ) {
nDelIndices += 1;
TRACE_MSG(("Remove index for field '%s'\n", fd->name));
fd->tTree = 0;
} else {
return false;
}
}
}
if ( nNewIndices > 0 ) {
modified = true;
for ( oid_t rowId = firstId; rowId != 0; rowId = getRow(rowId)->next ) {
for ( fd = desc->hashedFields; fd != NULL; fd=fd->nextHashedField ) {
if ( fd->hashTable >= committedIndexSize
|| index[0][fd->hashTable] != index[1][fd->hashTable] )
{
dbHashTable::insert(this, fd->hashTable, rowId,
fd->type, fd->dbsSize, fd->dbsOffs, 2*nRows);
}
}
for ( fd=desc->indexedFields; fd != NULL; fd=fd->nextIndexedField ) {
if ( fd->tTree >= committedIndexSize
|| index[0][fd->tTree] != index[1][fd->tTree] )
{
dbTtree::insert(this, fd->tTree, rowId,
fd->type, fd->dbsSize, fd->comparator, fd->dbsOffs);
}
}
}
}
if ( nNewIndices + nDelIndices != 0 ) {
table = (dbTable*)putRow(tableId);
offs_t fieldOffs = currIndex[tableId] + table->fields.offs;
for ( fd = desc->firstField; fd != NULL; fd = fd->nextField ) {
dbField* field = (dbField*)(baseAddr + fieldOffs);
if ( field->hashTable != fd->hashTable ) {
if ( field->hashTable != 0 ) {
assert(fd->hashTable == 0);
modified = true;
dbHashTable::drop(this, field->hashTable);
field = (dbField*)(baseAddr + fieldOffs);
}
field->hashTable = fd->hashTable;
}
if ( field->tTree != fd->tTree ) {
if ( field->tTree != 0 ) {
assert(fd->tTree == 0);
modified = true;
dbTtree::drop(this, field->tTree);
field = (dbField*)(baseAddr + fieldOffs);
}
field->tTree = fd->tTree;
}
fieldOffs += sizeof(dbField);
}
}
return true;
}
void dbDatabase::updateTableDescriptor(dbTableDescriptor* desc, oid_t tableId)
{
size_t newSize = sizeof(dbTable) + desc->nFields*sizeof(dbField)
+ desc->totalNamesLength();
linkTable(desc, tableId);
dbTable* table = (dbTable*)getRow(tableId);
int nRows = table->nRows;
oid_t first = table->firstRow;
oid_t last = table->lastRow;
#ifdef AUTOINCREMENT_SUPPORT
desc->autoincrementCount = table->count;
#endif
int nFields = table->fields.size;
offs_t fieldOffs = currIndex[tableId] + table->fields.offs;
while ( --nFields >= 0 ) {
dbField* field = (dbField*)(baseAddr + fieldOffs);
fieldOffs += sizeof(dbField);
oid_t hashTableId = field->hashTable;
oid_t tTreeId = field->tTree;
if ( hashTableId != 0 ) {
dbHashTable::drop(this, hashTableId);
}
if ( tTreeId != 0 ) {
dbTtree::drop(this, tTreeId);
}
}
table = (dbTable*)putRow(tableId, newSize);
desc->storeInDatabase(table);
table->firstRow = first;
table->lastRow = last;
table->nRows = nRows;
}
oid_t dbDatabase::addNewTable(dbTableDescriptor* desc)
{
oid_t tableId = allocateRow(dbMetaTableId,
sizeof(dbTable) + desc->nFields*sizeof(dbField)
+ desc->totalNamesLength());
linkTable(desc, tableId);
desc->storeInDatabase((dbTable*)getRow(tableId));
return tableId;
}
void dbDatabase::close()
{
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "Entering dbDatabase::close");
}
# endif
detach();
if ( backupFileName != NULL ) {
{
dbCriticalSection cs(backupMutex);
delete[] backupFileName;
backupFileName = NULL;
backupInitEvent.signal();
}
backupThread.join();
}
if ( commitDelay != 0 ) {
delayedCommitStopTimerEvent.signal();
{
dbCriticalSection cs(delayedCommitStartTimerMutex);
stopDelayedCommitThread = true;
delayedCommitStartTimerEvent.signal();
}
commitDelay = 0;
commitThread.join();
delayedCommitStartTimerEvent.close();
}
{
dbCriticalSection cs(threadContextListMutex);
while ( !threadContextList.isEmpty() ) {
delete (dbDatabaseThreadContext*)threadContextList.next;
}
}
backupInitEvent.close();
delete[] databaseName;
delete[] fileName;
databaseName = NULL;
fileName = NULL;
opened = false;
cs.enter();
monitor->users -= 1;
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"dbDatabase::close - users (%d)", monitor->users );
}
# endif
if ( header != NULL && header->dirty && accessType != dbReadOnly && accessType != dbConcurrentRead && monitor->nWriters == 0 ) {
file.flush(true);
header->dirty = false;
file.markAsDirty(0, sizeof(dbHeader));
}
cs.leave();
dbTableDescriptor *desc, *next;
for ( desc = tables; desc != NULL; desc = next ) {
next = desc->nextDbTable;
desc->tableId = 0;
if ( desc->cloneOf != NULL ) {
delete desc;
} else if ( !desc->fixedDatabase ) {
desc->db = NULL;
}
}
file.close();
if ( initMutex.close() ) {
cs.erase();
shm.erase();
readSem.erase();
writeSem.erase();
upgradeSem.erase();
backupCompletedEvent.erase();
file.erase();
if ( commitDelay != 0 ) {
delayedCommitStopTimerEvent.erase();
}
if ( accessType == dbConcurrentUpdate ) {
mutatorCS.erase();
}
initMutex.erase();
} else {
cs.close();
shm.close();
readSem.close();
writeSem.close();
upgradeSem.close();
backupCompletedEvent.close();
if ( commitDelay != 0 ) {
delayedCommitStopTimerEvent.close();
}
if ( accessType == dbConcurrentUpdate ) {
mutatorCS.close();
}
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "Leaving dbDatabase::close");
}
# endif
}
dbTableDescriptor* dbDatabase::lookupTable(dbTableDescriptor* origDesc)
{
for ( dbTableDescriptor* desc = tables; desc != NULL; desc = desc->nextDbTable )
{
if ( desc == origDesc || desc->cloneOf == origDesc ) {
return desc;
}
}
return NULL;
}
void dbDatabase::attach()
{
if ( threadContext.get() == NULL ) {
dbDatabaseThreadContext* ctx = new dbDatabaseThreadContext();
{
dbCriticalSection cs(threadContextListMutex);
threadContextList.link(ctx);
}
threadContext.set(ctx);
}
}
void dbDatabase::detach(int flags)
{
if ( flags & COMMIT ) {
commit();
} else {
monitor->uncommittedChanges = true;
precommit();
}
if ( flags & DESTROY_CONTEXT ) {
dbDatabaseThreadContext* ctx = threadContext.get();
if ( commitDelay != 0 ) {
dbCriticalSection cs(delayedCommitStopTimerMutex);
if ( monitor->delayedCommitContext == ctx && ctx->commitDelayed ) {
ctx->removeContext = true;
} else {
dbCriticalSection cs(threadContextListMutex);
delete ctx;
}
} else {
dbCriticalSection cs(threadContextListMutex);
delete ctx;
}
threadContext.set(NULL);
}
}
bool dbDatabase::existsInverseReference(dbExprNode* expr, int nExistsClauses)
{
while ( true ) {
switch ( expr->cop ) {
case dbvmLoadSelfReference:
case dbvmLoadSelfArray:
return expr->ref.field->inverseRef != NULL;
case dbvmLoadReference:
if ( expr->ref.field->attr & dbFieldDescriptor::ComponentOfArray ) {
expr = expr->ref.base;
continue;
}
// no break
case dbvmLoadArray:
if ( expr->ref.field->inverseRef == NULL ) {
return false;
}
expr = expr->ref.base;
continue;
case dbvmGetAt:
if ( expr->operand[1]->cop != dbvmVariable
|| expr->operand[1]->offs != --nExistsClauses )
{
return false;
}
expr = expr->operand[0];
continue;
case dbvmDeref:
expr = expr->operand[0];
continue;
default:
return false;
}
}
}
bool dbDatabase::followInverseReference(dbExprNode* expr, dbExprNode* andExpr,
dbAnyCursor* cursor, oid_t iref)
{
while ( expr->cop == dbvmGetAt || expr->cop == dbvmDeref ||
(expr->cop == dbvmLoadReference
&& (expr->ref.field->attr & dbFieldDescriptor::ComponentOfArray)) )
{
expr = expr->operand[0];
}
dbTable* table = (dbTable*)getRow(cursor->table->tableId);
dbFieldDescriptor* fd = expr->ref.field->inverseRef;
if ( fd->type == dbField::tpArray ) {
byte* rec = (byte*)getRow(iref);
dbVarying* arr = (dbVarying*)(rec + fd->dbsOffs);
oid_t* refs = (oid_t*)(rec + arr->offs);
if ( expr->cop >= dbvmLoadSelfReference ) {
for ( int n = arr->size; --n >= 0; ) {
oid_t oid = *refs++;
if ( oid != 0 ) {
if ( andExpr == NULL || evaluate(andExpr, oid, table, cursor) ) {
if ( !cursor->add(oid) ) {
return false;
}
}
}
}
} else {
for ( int n = arr->size; --n >= 0; ) {
oid_t oid = *refs++;
if ( oid != 0 ) {
if ( !followInverseReference(expr->ref.base, andExpr,
cursor, oid) )
{
return false;
}
}
}
}
} else {
assert(fd->type == dbField::tpReference);
oid_t oid = *(oid_t*)((byte*)getRow(iref) + fd->dbsOffs);
if ( oid != 0 ) {
if ( expr->cop >= dbvmLoadSelfReference ) {
if ( andExpr == NULL || evaluate(andExpr, oid, table, cursor) ) {
if ( !cursor->add(oid) ) {
return false;
}
}
} else {
if ( !followInverseReference(expr->ref.base, andExpr,
cursor, oid) )
{
return false;
}
}
}
}
return true;
}
static int referenceComparator(void* p, void* q, size_t) {
return *(oid_t*)p < *(oid_t*)q ? -1 : *(oid_t*)p == *(oid_t*)q ? 0 : 1;
}
bool dbDatabase::isIndexApplicable(dbAnyCursor* cursor,
dbExprNode* expr, dbExprNode* andExpr,
dbFieldDescriptor* &indexedField)
{
int nExistsClauses = 0;
while ( expr->cop == dbvmExists ) {
expr = expr->operand[0];
nExistsClauses += 1;
}
int cmpCop = expr->cop;
if ( dbExprNode::nodeOperands[cmpCop] < 2 ) {
return false;
}
unsigned loadCop = expr->operand[0]->cop;
if ( loadCop - dbvmLoadSelfBool > dbvmLoadSelfRawBinary - dbvmLoadSelfBool
&& loadCop - dbvmLoadBool > dbvmLoadRawBinary - dbvmLoadBool )
{
return false;
}
dbFieldDescriptor* field = expr->operand[0]->ref.field;
if ( field->hashTable == 0 && field->tTree == 0 ) {
return false;
}
if ( loadCop >= dbvmLoadSelfBool ) {
if ( isIndexApplicable(cursor, expr, andExpr) ) {
indexedField = field;
return true;
}
} else if ( existsInverseReference(expr->operand[0]->ref.base, nExistsClauses) )
{
dbAnyCursor tmpCursor(*field->defTable, dbCursorViewOnly, NULL);
tmpCursor.paramBase = cursor->paramBase;
if ( isIndexApplicable(&tmpCursor, expr, NULL) ) {
expr = expr->operand[0]->ref.base;
indexedField = field;
cursor->checkForDuplicates();
if ( andExpr != NULL ) {
andExpr = andExpr->operand[1];
}
for ( dbSelection::segment* curr = tmpCursor.selection.first;
curr != NULL;
curr = curr->next )
{
for ( int i = 0, n = curr->nRows; i < n; i++ ) {
if ( !followInverseReference(expr, andExpr,
cursor, curr->rows[i]) )
{
return true;
}
}
}
return true;
}
} else if ( expr->operand[0]->ref.base->cop == dbvmDeref ) {
dbExprNode* ref = expr->operand[0]->ref.base->operand[0];
if ( ref->cop == dbvmLoadSelfReference ) {
dbFieldDescriptor* refField = ref->ref.field;
if ( refField->hashTable == 0 && refField->tTree == 0 ) {
return false;
}
assert(refField->type == dbField::tpReference);
dbAnyCursor tmpCursor(*refField->defTable, dbCursorViewOnly, NULL);
tmpCursor.paramBase = cursor->paramBase;
if ( isIndexApplicable(&tmpCursor, expr, NULL) ) {
oid_t oid;
indexedField = refField;
dbSearchContext sc;
sc.db = this;
sc.type = dbField::tpReference;
sc.sizeofType = sizeof(oid_t);
sc.offs = refField->dbsOffs;
sc.cursor = cursor;
sc.comparator = &referenceComparator;
sc.condition = andExpr ? andExpr->operand[1] : (dbExprNode*)0;
sc.firstKey = sc.lastKey = (char*)&oid;
sc.firstKeyInclusion = sc.lastKeyInclusion = true;
for ( dbSelection::segment* curr = tmpCursor.selection.first;
curr != NULL;
curr = curr->next )
{
for ( int i = 0, n = curr->nRows; i < n; i++ ) {
oid = curr->rows[i];
sc.probes = 0;
if ( refField->hashTable != 0 ) {
dbHashTable::find(this, refField->hashTable, sc);
TRACE_MSG(("Hash table search for field %s.%s: %d probes\n",
refField->defTable->name, refField->longName, sc.probes));
} else {
dbTtree::find(this, refField->tTree, sc);
TRACE_MSG(("Index search for field %s.%s: %d probes\n",
refField->defTable->name, refField->longName, sc.probes));
}
}
}
return true;
}
}
}
return false;
}
inline char* findWildcard(char* pattern, char* escape)
{
if ( escape == NULL ) {
while ( *pattern != dbMatchAnyOneChar &&
*pattern != dbMatchAnySubstring )
{
if ( *pattern++ == '\0' ) {
return NULL;
}
}
} else {
char esc = *escape;
while ( *pattern != dbMatchAnyOneChar &&
*pattern != dbMatchAnySubstring &&
*pattern != esc )
{
if ( *pattern++ == '\0' ) {
return NULL;
}
}
}
return pattern;
}
bool dbDatabase::isIndexApplicable(dbAnyCursor* cursor,
dbExprNode* expr, dbExprNode* andExpr)
{
int n = dbExprNode::nodeOperands[expr->cop];
dbFieldDescriptor* field = expr->operand[0]->ref.field;
dbSearchContext sc;
size_t paramBase = (size_t)cursor->paramBase;
union {
bool b;
int1 i1;
int2 i2;
int4 i4;
db_int8 i8;
real4 f4;
real8 f8;
oid_t oid;
char* s;
} literal[2];
bool strop = false;
bool binop = false;
char* s;
literal[0].i8 = 0;
literal[1].i8 = 0;
for ( int i = 0; i < n-1; i++ ) {
bool bval = false;
db_int8 ival = 0;
real8 fval = 0;
oid_t oid = 0;
char* sval = NULL;
dbExprNode* opd = expr->operand[i+1];
switch ( opd->cop ) {
case dbvmLoadVarBool:
bval = *(bool*)((char*)opd->var + paramBase);
break;
case dbvmLoadVarInt1:
ival = *(int1*)((char*)opd->var + paramBase);
break;
case dbvmLoadVarInt2:
ival = *(int2*)((char*)opd->var + paramBase);
break;
case dbvmLoadVarInt4:
ival = *(int4*)((char*)opd->var + paramBase);
break;
case dbvmLoadVarInt8:
ival = *(db_int8*)((char*)opd->var + paramBase);
break;
case dbvmLoadVarReference:
oid = *(oid_t*)((char*)opd->var + paramBase);
break;
case dbvmLoadVarReal4:
fval = *(real4*)((char*)opd->var + paramBase);
break;
case dbvmLoadVarReal8:
fval = *(real8*)((char*)opd->var + paramBase);
break;
case dbvmLoadVarString:
sval = (char*)((char*)opd->var + paramBase);
strop = true;
break;
#ifdef USE_STD_STRING
case dbvmLoadVarStdString:
sval = (char*)((std::string*)((char*)opd->var + paramBase))->c_str();
strop = true;
break;
#endif
case dbvmLoadVarStringPtr:
sval = *(char**)((char*)opd->var + paramBase);
strop = true;
break;
case dbvmLoadTrue:
bval = true;
break;
case dbvmLoadFalse:
bval = false;
break;
case dbvmLoadIntConstant:
ival = opd->ivalue;
break;
case dbvmLoadRealConstant:
fval = opd->fvalue;
break;
case dbvmLoadStringConstant:
sval = (char*)opd->svalue.str;
strop = true;
break;
case dbvmLoadVarRawBinary:
sval = (char*)((char*)opd->var + paramBase);
strop = true;
binop = true;
break;
default:
return false;
}
switch ( field->type ) {
case dbField::tpBool:
literal[i].b = bval;
break;
case dbField::tpInt1:
literal[i].i1 = (int1)ival;
break;
case dbField::tpInt2:
literal[i].i2 = (int2)ival;
break;
case dbField::tpInt4:
literal[i].i4 = (int4)ival;
break;
case dbField::tpInt8:
literal[i].i8 = ival;
break;
case dbField::tpReference:
literal[i].oid = oid;
break;
case dbField::tpReal4:
literal[i].f4 = (real4)fval;
break;
case dbField::tpReal8:
literal[i].f8 = fval;
break;
case dbField::tpString:
literal[i].s = sval;
break;
case dbField::tpRawBinary:
literal[i].s = sval;
break;
default:
assert(false);
}
}
sc.db = this;
sc.type = field->type;
sc.sizeofType = field->dbsSize;
sc.offs = field->dbsOffs;
sc.comparator = field->comparator;
sc.cursor = cursor;
sc.condition = andExpr ? andExpr->operand[1] : (dbExprNode*)0;
sc.probes = 0;
switch ( expr->cop ) {
case dbvmEqReference:
case dbvmEqInt:
case dbvmEqReal:
case dbvmEqString:
case dbvmEqBinary:
sc.firstKey = sc.lastKey =
strop ? literal[0].s : (char*)&literal[0];
sc.firstKeyInclusion = sc.lastKeyInclusion = true;
if ( field->hashTable != 0 ) {
dbHashTable::find(this, field->hashTable, sc);
TRACE_MSG(("Hash table search for field %s.%s: %d probes\n",
field->defTable->name, field->longName, sc.probes));
} else {
dbTtree::find(this, field->tTree, sc);
TRACE_MSG(("Index search for field %s.%s: %d probes\n",
field->defTable->name, field->longName, sc.probes));
}
return true;
case dbvmGtInt:
case dbvmGtReal:
case dbvmGtString:
case dbvmGtBinary:
if ( field->tTree != 0 ) {
sc.firstKey = strop ? literal[0].s : (char*)&literal[0];
sc.lastKey = NULL;
sc.firstKeyInclusion = false;
dbTtree::find(this, field->tTree, sc);
TRACE_MSG(("Index search for field %s.%s: %d probes\n",
field->defTable->name, field->longName, sc.probes));
return true;
}
return false;
case dbvmGeInt:
case dbvmGeReal:
case dbvmGeString:
case dbvmGeBinary:
if ( field->tTree != 0 ) {
sc.firstKey = strop ? literal[0].s : (char*)&literal[0];
sc.lastKey = NULL;
sc.firstKeyInclusion = true;
dbTtree::find(this, field->tTree, sc);
TRACE_MSG(("Index search for field %s.%s: %d probes\n",
field->defTable->name, field->longName, sc.probes));
return true;
}
return false;
case dbvmLtInt:
case dbvmLtReal:
case dbvmLtString:
case dbvmLtBinary:
if ( field->tTree != 0 ) {
sc.firstKey = NULL;
sc.lastKey = strop ? literal[0].s : (char*)&literal[0];
sc.lastKeyInclusion = false;
dbTtree::find(this, field->tTree, sc);
TRACE_MSG(("Index search for field %s.%s: %d probes\n",
field->defTable->name, field->longName, sc.probes));
return true;
}
return false;
case dbvmLeInt:
case dbvmLeReal:
case dbvmLeString:
case dbvmLeBinary:
if ( field->tTree != 0 ) {
sc.firstKey = NULL;
sc.lastKey = strop ? literal[0].s : (char*)&literal[0];
sc.lastKeyInclusion = true;
dbTtree::find(this, field->tTree, sc);
TRACE_MSG(("Index search for field %s.%s: %d probes\n",
field->defTable->name, field->longName, sc.probes));
return true;
}
return false;
case dbvmBetweenInt:
case dbvmBetweenReal:
case dbvmBetweenString:
case dbvmBetweenBinary:
if ( field->hashTable != 0 &&
((strop && ((binop && memcmp(literal[0].s, literal[1].s, sc.sizeofType) == 0)
|| (!binop && strcmp(literal[0].s, literal[1].s) == 0)))
|| (!strop && literal[0].i8 == literal[1].i8)) )
{
sc.firstKey = strop ? literal[0].s : (char*)&literal[0];
dbHashTable::find(this, field->hashTable, sc);
TRACE_MSG(("Hash table search for field %s.%s: %d probes\n",
field->defTable->name, field->longName, sc.probes));
return true;
} else if ( field->tTree != 0 ) {
sc.firstKey = strop ? literal[0].s : (char*)&literal[0];
sc.firstKeyInclusion = true;
sc.lastKey = strop ? literal[1].s : (char*)&literal[1];
sc.lastKeyInclusion = true;
dbTtree::find(this, field->tTree, sc);
TRACE_MSG(("Index search for field %s.%s: %d probes\n",
field->defTable->name, field->longName, sc.probes));
return true;
}
return false;
case dbvmLikeString:
case dbvmLikeEscapeString:
if ( (s = findWildcard(literal[0].s, literal[1].s)) == NULL
|| ((s[1] == '\0' || s != literal[0].s) && field->tTree != 0) )
{
if ( s == NULL ) {
sc.firstKey = sc.lastKey = literal[0].s;
sc.firstKeyInclusion = sc.lastKeyInclusion = true;
if ( field->hashTable != 0 ) {
dbHashTable::find(this, field->hashTable, sc);
TRACE_MSG(("Hash table search for field %s.%s: "
"%d probes\n", field->defTable->name,
field->longName, sc.probes));
} else {
dbTtree::find(this, field->tTree, sc);
TRACE_MSG(("Index search for field %s.%s: %d probes\n",
field->defTable->name, field->longName,
sc.probes));
}
} else {
int len = s - literal[0].s;
if ( len == 0 ) {
if ( *s != dbMatchAnySubstring ) {
return false;
}
sc.firstKey = NULL;
sc.lastKey = NULL;
dbTtree::find(this, field->tTree, sc);
TRACE_MSG(("Use index for ordering records by field "
"%s.%s\n", field->defTable->name,
field->longName));
} else {
sc.firstKey = new char[len+1];
memcpy(sc.firstKey, literal[0].s, len);
sc.firstKey[len] = '\0';
sc.firstKeyInclusion = true;
sc.lastKey = new char[len+4];
memcpy(sc.lastKey, literal[0].s, len);
if ( (byte)sc.lastKey[len-1] != 0xFF ) {
sc.lastKey[len-1] += 1;
sc.lastKey[len] = '\0';
sc.lastKeyInclusion = false;
} else {
sc.lastKey[len] = (char)0xFF;
sc.lastKey[len+1] = (char)0xFF;
sc.lastKey[len+2] = (char)0xFF;
sc.lastKey[len+3] = '\0';
sc.lastKeyInclusion = true;
}
if ( s[0] != dbMatchAnySubstring || s[1] != '\0' ) {
// Records selected by index do not necessarily
// match the pattern, so include pattern matching in
// condition expression
if ( andExpr == NULL ) {
if ( expr->operand[0]->cop != dbvmLoadSelfString ) {
dbExprNode load(dbvmLoadSelfString,
expr->operand[0]->ref.field);
dbExprNode like(expr->cop, &load,
expr->operand[1],
expr->operand[2]);
sc.condition = &like;
dbTtree::find(this, field->tTree, sc);
like.cop = dbvmVoid;// do not destruct operands
} else {
sc.condition = expr;
dbTtree::find(this, field->tTree, sc);
}
} else {
sc.condition = andExpr;
dbTtree::find(this, field->tTree, sc);
}
} else {
dbTtree::find(this, field->tTree, sc);
}
TRACE_MSG(("Index search for prefix in LIKE expression "
"for field %s.%s: %d probes\n",
field->defTable->name, field->longName,
sc.probes));
delete[] sc.firstKey;
delete[] sc.lastKey;
}
}
return true;
}
}
return false;
}
struct SearchThreadArgument {
dbParallelQueryContext* ctx;
int id;
};
static void thread_proc parallelSearch(void* arg)
{
SearchThreadArgument* sa = (SearchThreadArgument*)arg;
sa->ctx->search(sa->id);
}
void dbDatabase::traverse(dbAnyCursor* cursor, dbQuery& query)
{
const int defaultStackSize = 1024;
oid_t buf[defaultStackSize];
oid_t *stack = buf;
int stackSize = defaultStackSize;
int sp = 0, len;
dbAnyArray* arr;
oid_t oid, *refs;
dbTable* table = (dbTable*)getRow(cursor->table->tableId);
void* root = (void*)query.root;
switch ( query.startFrom ) {
case dbCompiledQuery::StartFromFirst:
oid = table->firstRow;
if ( oid != 0 ) {
stack[sp++] = oid;
}
break;
case dbCompiledQuery::StartFromLast:
oid = table->lastRow;
if ( oid != 0 ) {
stack[sp++] = oid;
}
break;
case dbCompiledQuery::StartFromRef:
oid = *(oid_t*)root;
if ( oid != 0 ) {
stack[sp++] = oid;
}
break;
case dbCompiledQuery::StartFromArrayPtr:
root = *(dbAnyArray**)root;
// no break
case dbCompiledQuery::StartFromArray:
arr = (dbAnyArray*)root;
len = arr->length();
if ( len > stackSize ) {
stackSize = len;
stack = new oid_t[stackSize];
}
refs = (oid_t*)arr->base();
while ( --len >= 0 ) {
oid = refs[len];
if ( oid != 0 ) {
stack[sp++] = oid;
}
}
break;
default:
assert(false);
}
cursor->checkForDuplicates();
dbExprNode* condition = query.tree;
dbFollowByNode* follow = query.follow;
while ( sp != 0 ) {
oid_t curr = stack[--sp];
if ( condition->cop == dbvmVoid || evaluate(condition, curr, table, cursor) ) {
if ( !cursor->add(curr) ) {
break;
}
} else {
cursor->mark(curr);
}
byte* record = (byte*)getRow(curr);
for ( dbFollowByNode* fp = follow; fp != NULL; fp = fp->next ) {
dbFieldDescriptor* fd = fp->field;
if ( fd->type == dbField::tpArray ) {
dbVarying* vp = (dbVarying*)(record + fd->dbsOffs);
len = vp->size;
if ( sp + len > stackSize ) {
int newSize = len > stackSize ? len*2 : stackSize*2;
oid_t* newStack = new oid_t[newSize];
memcpy(newStack, stack, stackSize*sizeof(oid_t));
stackSize = newSize;
if ( stack != buf ) {
delete[] stack;
}
stack = newStack;
}
refs = (oid_t*)(record + vp->offs);
while ( --len >= 0 ) {
oid = refs[len];
if ( oid != 0 && !cursor->isMarked(oid) ) {
stack[sp++] = oid;
}
}
} else {
assert(fd->type == dbField::tpReference);
if ( sp == stackSize ) {
int newSize = stackSize*2;
oid_t* newStack = new oid_t[newSize];
memcpy(newStack, stack, stackSize*sizeof(oid_t));
stackSize = newSize;
if ( stack != buf ) {
delete[] stack;
}
stack = newStack;
}
oid = *(oid_t*)(record + fd->dbsOffs);
if ( oid != 0 && !cursor->isMarked(oid) ) {
stack[sp++] = oid;
}
}
}
}
if ( stack != buf ) {
delete[] stack;
}
if ( query.order != NULL ) {
cursor->selection.sort(this, query.order);
}
}
void dbDatabase::select(dbAnyCursor* cursor, dbQuery& query)
{
# ifdef FASTDB_VERBOSE_LOGGING
{
char buf[4096];
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Entering dbDatabase::select cursor(0x%08x) select * from %s where %s",
(int)cursor, (query.table)? query.table->name:((char*)"NULL"),
query.dump(buf)
);
}
# endif
assert(opened);
dbDatabaseThreadContext* ctx = threadContext.get();
dbFieldDescriptor* indexedField = NULL;
assert(ctx != NULL);
{
dbCriticalSection cs(query.mutex);
query.mutexLocked = true;
if ( !query.compiled() || cursor->table != query.table ) {
if ( !ctx->compiler.compile(cursor->table, query) ) {
query.mutexLocked = false;
return;
}
}
query.mutexLocked = false;
}
#if FASTDB_DEBUG == DEBUG_TRACE
char buf[4096];
TRACE_MSG(("Query: select * from %s where %s\n", query.table->name, query.dump(buf)));
#endif
beginTransaction(cursor->type == dbCursorForUpdate
? dbDatabase::dbExclusiveLock : dbDatabase::dbSharedLock);
if ( query.startFrom != dbCompiledQuery::StartFromAny ) {
ctx->cursors.link(cursor);
traverse(cursor, query);
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Leaving dbDatabase::select:(line=%d) cursor(0x%08x) "
"select * from %s where %s ",
__LINE__, (int)cursor, (query.table)? query.table->name:((char*)"NULL"),
query.dump(buf));
OsSysLog::flush();
}
# endif
return;
}
dbExprNode* condition = query.tree;
if ( condition->cop == dbvmVoid && query.order == NULL ) {
// Empty select condition: select all records in the table
select(cursor);
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Leaving dbDatabase::select:(line=%d) cursor(0x%08x) query(0x%08x)\n",
__LINE__, (int)cursor, (int)&query);
OsSysLog::flush();
}
# endif
return;
}
if ( condition->cop == dbvmEqReference ) {
if ( condition->operand[0]->cop == dbvmCurrent
&& condition->operand[1]->cop == dbvmLoadVarReference )
{
cursor->setCurrent(*(dbAnyReference*)((char*)condition->operand[1]->var + (size_t)cursor->paramBase));
return;
}
if ( condition->operand[1]->cop == dbvmCurrent
&& condition->operand[0]->cop == dbvmLoadVarReference )
{
cursor->setCurrent(*(dbAnyReference*)((char*)condition->operand[0]->var + (size_t)cursor->paramBase));
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Leaving dbDatabase::select:(line=%d) cursor(0x%08x) "
"select * from %s where %s ",
__LINE__, (int)cursor,
(query.table)? query.table->name:((char*)"NULL"), query.dump(buf)
);
OsSysLog::flush();
}
# endif
return;
}
}
ctx->cursors.link(cursor);
if ( condition->cop == dbvmAndBool ) {
if ( isIndexApplicable(cursor, condition->operand[0],
condition, indexedField) )
{
if ( query.order != NULL ) {
if ( indexedField->defTable != query.table
|| query.order->next != NULL
|| query.order->field != indexedField )
{
cursor->selection.sort(this, query.order);
} else if ( !query.order->ascent ) {
cursor->selection.reverse();
}
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "Leaving dbDatabase::select:(line=%d) "
"cursor(0x%08x) select * from %s where %s ",
__LINE__, (int)cursor,
(query.table)? query.table->name:((char*)"NULL"), query.dump(buf));
OsSysLog::flush();
}
# endif
return;
}
} else {
if ( condition->cop == dbvmOrBool ) {
cursor->checkForDuplicates();
}
while ( condition->cop == dbvmOrBool
&& !cursor->isLimitReached()
&& isIndexApplicable(cursor, condition->operand[0], NULL,
indexedField) )
{
condition = condition->operand[1];
}
if ( !cursor->isLimitReached()
&& isIndexApplicable(cursor, condition, NULL, indexedField) )
{
if ( query.order != NULL ) {
if ( indexedField->defTable != query.table
|| condition != query.tree
|| query.order->next != NULL
|| query.order->field != indexedField )
{
cursor->selection.sort(this, query.order);
} else if ( !query.order->ascent ) {
cursor->selection.reverse();
}
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "Leaving dbDatabase::select:(line=%d) "
"cursor(0x%08x) select * from %s where %s ",
__LINE__, (int)cursor,
(query.table)? query.table->name:((char*)"NULL"), query.dump(buf));
OsSysLog::flush();
}
# endif
return;
}
}
if ( query.order != NULL && query.order->next == NULL
&& query.order->field != NULL && query.order->field->tTree != 0 )
{
dbFieldDescriptor* field = query.order->field;
TRACE_MSG(("Use index for ordering records by field %s.%s\n",
query.table->name, field->longName));
if ( condition->cop == dbvmVoid ) {
if ( query.order->ascent ) {
dbTtree::traverseForward(this, field->tTree, cursor);
} else {
dbTtree::traverseBackward(this, field->tTree, cursor);
}
} else {
if ( query.order->ascent ) {
dbTtree::traverseForward(this,field->tTree,cursor,condition);
} else {
dbTtree::traverseBackward(this,field->tTree,cursor,condition);
}
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Leaving dbDatabase::select:(line=%d) cursor(0x%08x) "
"select * from %s where %s ",
__LINE__, (int)cursor,
(query.table)? query.table->name:((char*)"NULL"), query.dump(buf));
OsSysLog::flush();
}
# endif
return;
}
dbTable* table = (dbTable*)getRow(cursor->table->tableId);
int n = parThreads-1;
if ( cursor->getNumberOfRecords() == 0
&& n > 0 && table->nRows >= dbParallelScanThreshold
&& cursor->limit >= dbDefaultSelectionLimit )
{
dbPooledThread* thread[dbMaxParallelSearchThreads];
SearchThreadArgument sa[dbMaxParallelSearchThreads];
dbParallelQueryContext par(this, table, &query, cursor);
int i;
for ( i = 0; i < n; i++ ) {
sa[i].id = i;
sa[i].ctx = ∥
thread[i] = threadPool.create((dbThread::thread_proc_t)parallelSearch, &sa[i]);
}
par.search(i);
for ( i = 0; i < n; i++ ) {
threadPool.join(thread[i]);
}
if ( query.order != NULL ) {
dbRecord* rec[dbMaxParallelSearchThreads];
for ( i = 0; i <= n; i++ ) {
if ( par.selection[i].first != NULL ) {
rec[i] = getRow(par.selection[i].first->rows[0]);
} else {
rec[i] = NULL;
}
}
while ( true ) {
int min = -1;
for ( i = 0; i <= n; i++ ) {
if ( rec[i] != NULL
&& (min < 0 || dbSelection::compare(rec[i], rec[min],
query.order) < 0) )
{
min = i;
}
}
if ( min < 0 ) {
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Leaving dbDatabase::select:(line=%d) cursor(0x%08x) "
"select * from %s where %s ",
__LINE__, (int)cursor,
(query.table)? query.table->name:((char*)"NULL"),
query.dump(buf));
OsSysLog::flush();
}
# endif
return;
}
oid_t oid =
par.selection[min].first->rows[par.selection[min].pos];
cursor->selection.add(oid);
par.selection[min].pos += 1;
if ( par.selection[min].pos == par.selection[min].first->nRows ){
par.selection[min].pos = 0;
dbSelection::segment* next=par.selection[min].first->next;
delete par.selection[min].first;
if ( (par.selection[min].first = next) == NULL ) {
rec[min] = NULL;
continue;
}
}
oid = par.selection[min].first->rows[par.selection[min].pos];
rec[min] = getRow(oid);
}
} else {
for ( i = 0; i <= n; i++ ) {
if ( par.selection[i].first != NULL ) {
par.selection[i].first->prev = cursor->selection.last;
if ( cursor->selection.last == NULL ) {
cursor->selection.first = par.selection[i].first;
} else {
cursor->selection.last->next = par.selection[i].first;
}
cursor->selection.last = par.selection[i].last;
cursor->selection.nRows += par.selection[i].nRows;
}
}
}
} else {
oid_t oid = table->firstRow;
if ( !cursor->isLimitReached() ) {
while ( oid != 0 ) {
if ( evaluate(condition, oid, table, cursor) ) {
if ( !cursor->add(oid) ) {
break;
}
}
oid = getRow(oid)->next;
}
}
if ( query.order != NULL ) {
cursor->selection.sort(this, query.order);
}
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "Leaving dbDatabase::select:(line=%d) cursor(0x%08x) "
"select * from %s where %s ",
__LINE__, (int)cursor,
(query.table)? query.table->name:((char*)"NULL"), query.dump(buf));
OsSysLog::flush();
}
# endif
}
void dbDatabase::select(dbAnyCursor* cursor)
{
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Entering 2 dbDatabase::select entering cursor(0x%08x), opened(%d)",
(int)cursor, (int)opened);
OsSysLog::flush();
}
# endif
assert(opened);
beginTransaction(cursor->type == dbCursorForUpdate ? dbExclusiveLock : dbSharedLock);
dbTable* table = (dbTable*)getRow(cursor->table->tableId);
cursor->firstId = table->firstRow;
cursor->lastId = table->lastRow;
cursor->selection.nRows = table->nRows;
cursor->allRecords = true;
threadContext.get()->cursors.link(cursor);
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Leaving 2 dbDatabase::select entering cursor(0x%08x), opened(%d)",
(int)pid, (int)cursor, (int)opened);
OsSysLog::flush();
}
# endif
}
void dbDatabase::remove(dbTableDescriptor* desc, oid_t delId)
{
modified = true;
removeInverseReferences(desc, delId);
dbFieldDescriptor* fd;
for ( fd = desc->hashedFields; fd != NULL; fd = fd->nextHashedField ){
dbHashTable::remove(this, fd->hashTable, delId, fd->type, fd->dbsSize, fd->dbsOffs);
}
for ( fd = desc->indexedFields; fd != NULL; fd = fd->nextIndexedField ) {
dbTtree::remove(this, fd->tTree, delId, fd->type, fd->dbsSize, fd->comparator, fd->dbsOffs);
}
freeRow(desc->tableId, delId);
updateCursors(delId, true);
}
dbRecord* dbDatabase::putRow(oid_t oid, size_t newSize)
{
offs_t offs = currIndex[oid];
if ( oid < committedIndexSize && index[0][oid] == index[1][oid] ) {
size_t pageNo = oid/dbHandlesPerPage;
monitor->dirtyPagesMap[pageNo >> 5] |= 1 << (pageNo & 31);
cloneBitmap(offs, getRow(oid)->size);
currIndex[oid] = allocate(newSize);
} else {
size_t oldSize = getRow(oid)->size;
if ( oldSize != newSize ) {
currIndex[oid] = allocate(newSize);
cloneBitmap(offs, oldSize);
deallocate(offs, oldSize);
}
}
dbRecord* record = (dbRecord*)(baseAddr + currIndex[oid]);
record->next = ((dbRecord*)(baseAddr + offs))->next;
record->prev = ((dbRecord*)(baseAddr + offs))->prev;
record->size = newSize;
return record;
}
void dbDatabase::allocateRow(oid_t tableId, oid_t oid, size_t size)
{
currIndex[oid] = allocate(size);
dbTable* table = (dbTable*)putRow(tableId);
dbRecord* record = getRow(oid);
record->size = size;
record->next = 0;
record->prev = table->lastRow;
if ( table->lastRow != 0 ) {
//
// Optimisation hack: avoid cloning of the last record.
// Possible inconsistency in L2-list will be eliminated by recovery
// procedure.
//
getRow(table->lastRow)->next = oid;
file.markAsDirty(currIndex[table->lastRow], sizeof(dbRecord));
} else {
table->firstRow = oid;
}
table->lastRow = oid;
table->nRows += 1;
#ifdef AUTOINCREMENT_SUPPORT
table->count += 1;
#endif
}
void dbDatabase::freeRow(oid_t tableId, oid_t oid)
{
dbTable* table = (dbTable*)putRow(tableId);
dbRecord* del = getRow(oid);
size_t size = del->size;
oid_t next = del->next;
oid_t prev = del->prev;
table->nRows -= 1;
if ( prev == 0 ) {
table->firstRow = next;
}
if ( next == 0 ) {
table->lastRow = prev;
}
if ( prev != 0 ) {
putRow(prev)->next = next;
}
if ( next != 0 ) {
putRow(next)->prev = prev;
}
if ( oid < committedIndexSize && index[0][oid] == index[1][oid] ) {
cloneBitmap(currIndex[oid], size);
} else {
deallocate(currIndex[oid], size);
}
freeId(oid);
}
void dbDatabase::freeObject(oid_t oid)
{
int marker = currIndex[oid] & dbInternalObjectMarker;
if ( oid < committedIndexSize && index[0][oid] == index[1][oid] ) {
cloneBitmap(currIndex[oid] - marker, internalObjectSize[marker]);
} else {
deallocate(currIndex[oid] - marker, internalObjectSize[marker]);
}
freeId(oid);
}
void dbDatabase::update(oid_t oid, dbTableDescriptor* desc, void const* record)
{
assert(opened);
beginTransaction(dbExclusiveLock);
size_t size =
desc->columns->calculateRecordSize((byte*)record, desc->fixedSize);
byte* src = (byte*)record;
desc->columns->markUpdatedFields((byte*)getRow(oid), src);
updatedRecordId = oid;
dbFieldDescriptor* fd;
for ( fd = desc->inverseFields; fd != NULL; fd = fd->nextInverseField ) {
if ( fd->type == dbField::tpArray ) {
dbAnyArray* arr = (dbAnyArray*)(src + fd->appOffs);
int n = arr->length();
oid_t* newrefs = (oid_t*)arr->base();
byte* old = (byte*)getRow(oid);
int m = ((dbVarying*)(old + fd->dbsOffs))->size;
int offs = ((dbVarying*)(old + fd->dbsOffs))->offs;
int i, j, k;
old += offs;
for ( i = j = 0; i < m; i++ ) {
oid_t oldref = *((oid_t*)old + i);
if ( oldref != 0 ) {
for ( k = j; j < n && newrefs[j] != oldref; j++ );
if ( j == n ) {
for ( j = k--; k >= 0 && newrefs[k] != oldref; k-- );
if ( k < 0 ) {
removeInverseReference(fd, oid, oldref);
old = (byte*)getRow(oid) + offs;
}
} else {
j += 1;
}
}
}
for ( i = j = 0; i < n; i++ ) {
if ( newrefs[i] != 0 ) {
for ( k=j; j < m && newrefs[i] != *((oid_t*)old+j); j++ );
if ( j == m ) {
for ( j=k--; k >= 0 && newrefs[i] != *((oid_t*)old+k); k-- );
if ( k < 0 ) {
insertInverseReference(fd, oid, newrefs[i]);
old = (byte*)getRow(oid) + offs;
}
} else {
j += 1;
}
}
}
} else {
oid_t newref = *(oid_t*)(src + fd->appOffs);
byte* old = (byte*)getRow(oid);
oid_t oldref = *(oid_t*)(old + fd->dbsOffs);
if ( newref != oldref ) {
if ( oldref != 0 ) {
removeInverseReference(fd, oid, oldref);
}
if ( newref != 0 ) {
insertInverseReference(fd, oid, newref);
}
}
}
}
updatedRecordId = 0;
for ( fd = desc->hashedFields; fd != NULL; fd = fd->nextHashedField ) {
if ( fd->attr & dbFieldDescriptor::Updated ) {
dbHashTable::remove(this, fd->hashTable, oid, fd->type, fd->dbsSize, fd->dbsOffs);
}
}
for ( fd = desc->indexedFields; fd != NULL; fd = fd->nextIndexedField ) {
if ( fd->attr & dbFieldDescriptor::Updated ) {
dbTtree::remove(this, fd->tTree, oid, fd->type, fd->dbsSize, fd->comparator, fd->dbsOffs);
}
}
byte* old = (byte*)getRow(oid);
byte* dst = (byte*)putRow(oid, size);
if ( dst == old ) {
dbSmallBuffer buf(size+7);
byte* temp = (byte*)buf.base();
temp = (byte*)DOALIGN((long)temp, 8);
desc->columns->storeRecordFields(temp, src, desc->fixedSize, false);
memcpy(dst+sizeof(dbRecord), temp+sizeof(dbRecord), size-sizeof(dbRecord));
} else {
desc->columns->storeRecordFields(dst, src, desc->fixedSize, false);
}
modified = true;
for ( fd = desc->hashedFields; fd != NULL; fd = fd->nextHashedField ) {
if ( fd->attr & dbFieldDescriptor::Updated ) {
dbHashTable::insert(this, fd->hashTable, oid, fd->type, fd->dbsSize,
fd->dbsOffs, 0);
}
}
for ( fd = desc->indexedFields; fd != NULL; fd = fd->nextIndexedField ) {
if ( fd->attr & dbFieldDescriptor::Updated ) {
dbTtree::insert(this, fd->tTree, oid, fd->type, fd->dbsSize, fd->comparator, fd->dbsOffs);
fd->attr &= ~dbFieldDescriptor::Updated;
}
}
for ( fd = desc->hashedFields; fd != NULL; fd = fd->nextHashedField ) {
fd->attr &= ~dbFieldDescriptor::Updated;
}
}
void dbDatabase::insertRecord(dbTableDescriptor* desc, dbAnyReference* ref,
void const* record)
{
assert(opened);
beginTransaction(dbExclusiveLock);
modified = true;
size_t size =
desc->columns->calculateRecordSize((byte*)record, desc->fixedSize);
oid_t oid = allocateRow(desc->tableId, size);
byte* src = (byte*)record;
byte* dst = (byte*)getRow(oid);
desc->columns->storeRecordFields(dst, src, desc->fixedSize, true);
ref->oid = oid;
dbTable* table = (dbTable*)getRow(desc->tableId);
#ifdef AUTOINCREMENT_SUPPORT
desc->autoincrementCount = table->count;
#endif
int nRows = table->nRows;
dbFieldDescriptor* fd;
for ( fd = desc->inverseFields; fd != NULL; fd = fd->nextInverseField ) {
if ( fd->type == dbField::tpArray ) {
dbAnyArray* arr = (dbAnyArray*)(src + fd->appOffs);
int n = arr->length();
oid_t* refs = (oid_t*)arr->base();
while ( --n >= 0 ) {
if ( refs[n] != 0 ) {
insertInverseReference(fd, oid, refs[n]);
}
}
} else {
oid_t ref = *(oid_t*)(src + fd->appOffs);
if ( ref != 0 ) {
insertInverseReference(fd, oid, ref);
}
}
}
for ( fd = desc->hashedFields; fd != NULL; fd = fd->nextHashedField ) {
dbHashTable::insert(this, fd->hashTable, oid, fd->type, fd->dbsSize, fd->dbsOffs,
nRows);
}
for ( fd = desc->indexedFields; fd != NULL; fd = fd->nextIndexedField ) {
dbTtree::insert(this, fd->tTree, oid, fd->type, fd->dbsSize, fd->comparator, fd->dbsOffs);
}
}
inline void dbDatabase::extend(offs_t size)
{
size_t oldSize = header->size;
if ( size > oldSize ) {
#ifdef DISKLESS_CONFIGURATION
handleError(FileLimitExeeded);
#endif
if ( dbFileSizeLimit != 0 && size > dbFileSizeLimit ) {
handleError(FileLimitExeeded);
}
dbDatabaseThreadContext* ctx = threadContext.get();
assert(ctx != NULL);
if ( ctx->mutatorCSLocked && !ctx->writeAccess ) {
beginTransaction(dbCommitLock);
}
if ( oldSize*2 > size ) {
if ( dbFileSizeLimit == 0 || oldSize*2 <= dbFileSizeLimit ) {
size = oldSize*2;
} else if ( dbFileSizeLimit > size ) {
size = dbFileSizeLimit;
}
}
TRACE_MSG(("Extend database file from %ld to %ld bytes\n",
header->size, size));
header->size = size;
version = ++monitor->version;
sprintf(databaseName + databaseNameLen, ".%d", version);
int status = file.setSize(size, databaseName);
if ( status != dbFile::ok ) {
handleError(FileError, "Failed to extend file", status);
}
// file.markAsDirty(oldSize, size - oldSize);
byte* addr = (byte*)file.getAddr();
long shift = addr - baseAddr;
if ( shift != 0 ) {
size_t base = (size_t)baseAddr;
for ( dbL2List* cursor = ctx->cursors.next;
cursor != &ctx->cursors;
cursor = cursor->next )
{
((dbAnyCursor*)cursor)->adjustReferences(base, oldSize, shift);
}
baseAddr = addr;
index[0] = (offs_t*)((char*)index[0] + shift);
index[1] = (offs_t*)((char*)index[1] + shift);
currIndex = (offs_t*)((char*)currIndex + shift);
header = (dbHeader*)addr;
}
}
}
inline bool dbDatabase::wasReserved(offs_t pos, size_t size)
{
for ( dbLocation* location = reservedChain; location != NULL; location = location->next ) {
if ( pos - location->pos < location->size || location->pos - pos < size ) {
return true;
}
}
return false;
}
inline void dbDatabase::reserveLocation(dbLocation& location, offs_t pos, size_t size)
{
location.pos = pos;
location.size = size;
location.next = reservedChain;
reservedChain = &location;
}
inline void dbDatabase::commitLocation()
{
reservedChain = reservedChain->next;
}
offs_t dbDatabase::allocate(size_t size, oid_t oid)
{
static byte const firstHoleSize [] = {
8,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,4,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,
5,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,4,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,
6,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,4,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,
5,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,4,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,
7,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,4,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,
5,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,4,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,
6,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,4,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,
5,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0,4,0,1,0,2,0,1,0,3,0,1,0,2,0,1,0
};
static byte const lastHoleSize [] = {
8,7,6,6,5,5,5,5,4,4,4,4,4,4,4,4,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,
2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
};
static byte const maxHoleSize [] = {
8,7,6,6,5,5,5,5,4,4,4,4,4,4,4,4,4,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,
5,4,3,3,2,2,2,2,3,2,2,2,2,2,2,2,4,3,2,2,2,2,2,2,3,2,2,2,2,2,2,2,
6,5,4,4,3,3,3,3,3,2,2,2,2,2,2,2,4,3,2,2,2,1,1,1,3,2,1,1,2,1,1,1,
5,4,3,3,2,2,2,2,3,2,1,1,2,1,1,1,4,3,2,2,2,1,1,1,3,2,1,1,2,1,1,1,
7,6,5,5,4,4,4,4,3,3,3,3,3,3,3,3,4,3,2,2,2,2,2,2,3,2,2,2,2,2,2,2,
5,4,3,3,2,2,2,2,3,2,1,1,2,1,1,1,4,3,2,2,2,1,1,1,3,2,1,1,2,1,1,1,
6,5,4,4,3,3,3,3,3,2,2,2,2,2,2,2,4,3,2,2,2,1,1,1,3,2,1,1,2,1,1,1,
5,4,3,3,2,2,2,2,3,2,1,1,2,1,1,1,4,3,2,2,2,1,1,1,3,2,1,1,2,1,1,0
};
static byte const maxHoleOffset [] = {
0,1,2,2,3,3,3,3,4,4,4,4,4,4,4,4,0,1,5,5,5,5,5,5,0,5,5,5,5,5,5,5,
0,1,2,2,0,3,3,3,0,1,6,6,0,6,6,6,0,1,2,2,0,6,6,6,0,1,6,6,0,6,6,6,
0,1,2,2,3,3,3,3,0,1,4,4,0,4,4,4,0,1,2,2,0,1,0,3,0,1,0,2,0,1,0,5,
0,1,2,2,0,3,3,3,0,1,0,2,0,1,0,4,0,1,2,2,0,1,0,3,0,1,0,2,0,1,0,7,
0,1,2,2,3,3,3,3,0,4,4,4,4,4,4,4,0,1,2,2,0,5,5,5,0,1,5,5,0,5,5,5,
0,1,2,2,0,3,3,3,0,1,0,2,0,1,0,4,0,1,2,2,0,1,0,3,0,1,0,2,0,1,0,6,
0,1,2,2,3,3,3,3,0,1,4,4,0,4,4,4,0,1,2,2,0,1,0,3,0,1,0,2,0,1,0,5,
0,1,2,2,0,3,3,3,0,1,0,2,0,1,0,4,0,1,2,2,0,1,0,3,0,1,0,2,0,1,0,0
};
setDirty();
size = DOALIGN(size, dbAllocationQuantum);
int objBitSize = size >> dbAllocationQuantumBits;
offs_t pos;
oid_t i, firstPage, lastPage;
int holeBitSize = 0;
register int alignment = size & (dbPageSize-1);
const size_t inc = dbPageSize/dbAllocationQuantum/8;
register size_t offs;
const int pageBits = dbPageSize*8;
int holeBeforeFreePage = 0;
oid_t freeBitmapPage = 0;
dbLocation location;
lastPage = dbBitmapId + dbBitmapPages;
allocatedSize += size;
if ( alignment == 0 ) {
firstPage = currPBitmapPage;
offs = DOALIGN(currPBitmapOffs, inc);
} else {
firstPage = currRBitmapPage;
offs = currRBitmapOffs;
}
while ( true ) {
if ( alignment == 0 ) {
// allocate page object
for ( i = firstPage; i < lastPage && currIndex[i] != dbFreeHandleMarker; i++ ){
int spaceNeeded = objBitSize - holeBitSize < pageBits
? objBitSize - holeBitSize : pageBits;
if ( bitmapPageAvailableSpace[i] <= spaceNeeded ) {
holeBitSize = 0;
offs = 0;
continue;
}
register byte* begin = get(i);
size_t startOffs = offs;
while ( offs < dbPageSize ) {
if ( begin[offs++] != 0 ) {
offs = DOALIGN(offs, inc);
holeBitSize = 0;
} else if ( (holeBitSize += 8) == objBitSize ) {
pos = ((offs_t(i-dbBitmapId)*dbPageSize + offs)*8
- holeBitSize) << dbAllocationQuantumBits;
if ( wasReserved(pos, size) ) {
offs += objBitSize >> 3;
startOffs = offs = DOALIGN(offs, inc);
holeBitSize = 0;
continue;
}
reserveLocation(location, pos, size);
currPBitmapPage = i;
currPBitmapOffs = offs;
extend(pos + size);
if ( oid != 0 ) {
offs_t prev = currIndex[oid];
memcpy(baseAddr+pos,
baseAddr+(prev&~dbInternalObjectMarker), size);
currIndex[oid] = (prev & dbInternalObjectMarker) + pos;
}
begin = put(i);
size_t holeBytes = holeBitSize >> 3;
if ( holeBytes > offs ) {
memset(begin, 0xFF, offs);
holeBytes -= offs;
begin = put(--i);
offs = dbPageSize;
}
while ( holeBytes > dbPageSize ) {
memset(begin, 0xFF, dbPageSize);
holeBytes -= dbPageSize;
bitmapPageAvailableSpace[i] = 0;
begin = put(--i);
}
memset(&begin[offs-holeBytes], 0xFF, holeBytes);
commitLocation();
file.markAsDirty(pos, size);
return pos;
}
}
if ( startOffs == 0 && holeBitSize == 0
&& spaceNeeded < bitmapPageAvailableSpace[i] )
{
bitmapPageAvailableSpace[i] = spaceNeeded;
}
offs = 0;
}
} else {
for ( i=firstPage; i<lastPage && currIndex[i] != dbFreeHandleMarker; i++ ){
int spaceNeeded = objBitSize - holeBitSize < pageBits
? objBitSize - holeBitSize : pageBits;
if ( bitmapPageAvailableSpace[i] <= spaceNeeded ) {
holeBitSize = 0;
offs = 0;
continue;
}
register byte* begin = get(i);
size_t startOffs = offs;
while ( offs < dbPageSize ) {
int mask = begin[offs];
if ( holeBitSize + firstHoleSize[mask] >= objBitSize ) {
pos = ((offs_t(i-dbBitmapId)*dbPageSize + offs)*8
- holeBitSize) << dbAllocationQuantumBits;
if ( wasReserved(pos, size) ) {
startOffs = offs += (objBitSize + 7) >> 3;
holeBitSize = 0;
continue;
}
reserveLocation(location, pos, size);
currRBitmapPage = i;
currRBitmapOffs = offs;
extend(pos + size);
if ( oid != 0 ) {
offs_t prev = currIndex[oid];
memcpy(baseAddr+pos,
baseAddr+(prev&~dbInternalObjectMarker), size);
currIndex[oid] = (prev & dbInternalObjectMarker) + pos;
}
begin = put(i);
begin[offs] |= (1 << (objBitSize - holeBitSize)) - 1;
if ( holeBitSize != 0 ) {
if ( size_t(holeBitSize) > offs*8 ) {
memset(begin, 0xFF, offs);
holeBitSize -= offs*8;
begin = put(--i);
offs = dbPageSize;
}
while ( holeBitSize > pageBits ) {
memset(begin, 0xFF, dbPageSize);
holeBitSize -= pageBits;
bitmapPageAvailableSpace[i] = 0;
begin = put(--i);
}
while ( (holeBitSize -= 8) > 0 ) {
begin[--offs] = 0xFF;
}
begin[offs-1] |= ~((1 << -holeBitSize) - 1);
}
commitLocation();
file.markAsDirty(pos, size);
return pos;
} else if ( maxHoleSize[mask] >= objBitSize ) {
int holeBitOffset = maxHoleOffset[mask];
pos = ((offs_t(i-dbBitmapId)*dbPageSize + offs)*8 +
holeBitOffset) << dbAllocationQuantumBits;
if ( wasReserved(pos, size) ) {
startOffs = offs += (objBitSize + 7) >> 3;
holeBitSize = 0;
continue;
}
reserveLocation(location, pos, size);
currRBitmapPage = i;
currRBitmapOffs = offs;
extend(pos + size);
if ( oid != 0 ) {
offs_t prev = currIndex[oid];
memcpy(baseAddr+pos,
baseAddr+(prev&~dbInternalObjectMarker), size);
currIndex[oid] = (prev & dbInternalObjectMarker) + pos;
}
begin = put(i);
begin[offs] |= ((1 << objBitSize) - 1) << holeBitOffset;
commitLocation();
file.markAsDirty(pos, size);
return pos;
}
offs += 1;
if ( lastHoleSize[mask] == 8 ) {
holeBitSize += 8;
} else {
holeBitSize = lastHoleSize[mask];
}
}
if ( startOffs == 0 && holeBitSize == 0
&& spaceNeeded < bitmapPageAvailableSpace[i] )
{
bitmapPageAvailableSpace[i] = spaceNeeded;
}
offs = 0;
}
}
if ( firstPage == dbBitmapId ) {
if ( freeBitmapPage > i ) {
i = freeBitmapPage;
holeBitSize = holeBeforeFreePage;
}
if ( i == dbBitmapId + dbBitmapPages ) {
handleError(OutOfMemoryError, NULL, size);
}
assert(currIndex[i] == dbFreeHandleMarker);
size_t extension = (size > extensionQuantum)
? size : extensionQuantum;
int morePages =
(extension + dbPageSize*(dbAllocationQuantum*8-1) - 1)
/ (dbPageSize*(dbAllocationQuantum*8-1));
if ( size_t(i + morePages) > dbBitmapId + dbBitmapPages ) {
morePages =
(size + dbPageSize*(dbAllocationQuantum*8-1) - 1)
/ (dbPageSize*(dbAllocationQuantum*8-1));
if ( size_t(i + morePages) > dbBitmapId + dbBitmapPages ) {
handleError(OutOfMemoryError, NULL, size);
}
}
objBitSize -= holeBitSize;
int skip = DOALIGN(objBitSize, dbPageSize/dbAllocationQuantum);
pos = ((i-dbBitmapId) << (dbPageBits+dbAllocationQuantumBits+3))
+ (skip << dbAllocationQuantumBits);
extend(pos + morePages*dbPageSize);
file.markAsDirty(pos, morePages*dbPageSize);
memset(baseAddr + pos, 0, morePages*dbPageSize);
memset(baseAddr + pos, 0xFF, objBitSize>>3);
*(baseAddr + pos + (objBitSize>>3)) = (1 << (objBitSize&7))-1;
memset(baseAddr + pos + (skip>>3), 0xFF,
morePages*(dbPageSize/dbAllocationQuantum/8));
oid_t j = i;
while ( --morePages >= 0 ) {
monitor->dirtyPagesMap[oid_t(j)/dbHandlesPerPage/32]
|= 1 << int(j/dbHandlesPerPage & 31);
currIndex[j++] = pos + dbPageObjectMarker;
pos += dbPageSize;
}
freeBitmapPage = j;
j = i + objBitSize / pageBits;
if ( alignment != 0 ) {
currRBitmapPage = j;
currRBitmapOffs = 0;
} else {
currPBitmapPage = j;
currPBitmapOffs = 0;
}
while ( j > i ) {
bitmapPageAvailableSpace[--j] = 0;
}
pos = (offs_t(i-dbBitmapId)*dbPageSize*8 - holeBitSize)
<< dbAllocationQuantumBits;
if ( oid != 0 ) {
offs_t prev = currIndex[oid];
memcpy(baseAddr + pos,
baseAddr + (prev & ~dbInternalObjectMarker), size);
currIndex[oid] = (prev & dbInternalObjectMarker) + pos;
}
if ( holeBitSize != 0 ) {
reserveLocation(location, pos, size);
while ( holeBitSize > pageBits ) {
holeBitSize -= pageBits;
memset(put(--i), 0xFF, dbPageSize);
bitmapPageAvailableSpace[i] = 0;
}
byte* cur = (byte*)put(--i) + dbPageSize;
while ( (holeBitSize -= 8) > 0 ) {
*--cur = 0xFF;
}
*(cur-1) |= ~((1 << -holeBitSize) - 1);
commitLocation();
}
file.markAsDirty(pos, size);
return pos;
}
freeBitmapPage = i;
holeBeforeFreePage = holeBitSize;
holeBitSize = 0;
lastPage = firstPage + 1;
firstPage = dbBitmapId;
offs = 0;
}
}
void dbDatabase::deallocate(offs_t pos, size_t size)
{
assert(pos != 0 && (pos & (dbAllocationQuantum-1)) == 0);
size_t quantNo = pos / dbAllocationQuantum;
int objBitSize = (size+dbAllocationQuantum-1) / dbAllocationQuantum;
oid_t pageId = dbBitmapId + quantNo / (dbPageSize*8);
size_t offs = quantNo % (dbPageSize*8) / 8;
byte* p = put(pageId) + offs;
int bitOffs = quantNo & 7;
allocatedSize -= objBitSize*dbAllocationQuantum;
if ( (size_t(pos) & (dbPageSize-1)) == 0 && size >= dbPageSize ) {
if ( pageId == currPBitmapPage && offs < currPBitmapOffs ) {
currPBitmapOffs = offs;
}
} else {
if ( pageId == currRBitmapPage && offs < currRBitmapOffs ) {
currRBitmapOffs = offs;
}
}
bitmapPageAvailableSpace[pageId] = INT_MAX;
if ( objBitSize > 8 - bitOffs ) {
objBitSize -= 8 - bitOffs;
*p++ &= (1 << bitOffs) - 1;
offs += 1;
while ( objBitSize + offs*8 > dbPageSize*8 ) {
memset(p, 0, dbPageSize - offs);
p = put(++pageId);
bitmapPageAvailableSpace[pageId] = INT_MAX;
objBitSize -= (dbPageSize - offs)*8;
offs = 0;
}
while ( (objBitSize -= 8) > 0 ) {
*p++ = 0;
}
*p &= ~((1 << (objBitSize + 8)) - 1);
} else {
*p &= ~(((1 << objBitSize) - 1) << bitOffs);
}
}
void dbDatabase::cloneBitmap(offs_t pos, size_t size)
{
size_t quantNo = pos / dbAllocationQuantum;
int objBitSize = (size+dbAllocationQuantum-1) / dbAllocationQuantum;
oid_t pageId = dbBitmapId + quantNo / (dbPageSize*8);
size_t offs = quantNo % (dbPageSize*8) / 8;
int bitOffs = quantNo & 7;
put(pageId);
if ( objBitSize > 8 - bitOffs ) {
objBitSize -= 8 - bitOffs;
offs += 1;
while ( objBitSize + offs*8 > dbPageSize*8 ) {
put(++pageId);
objBitSize -= (dbPageSize - offs)*8;
offs = 0;
}
}
}
void dbDatabase::setDirty()
{
if ( !header->dirty ) {
if ( accessType == dbReadOnly ) {
handleError(DatabaseReadOnly, "Attempt to modify readonly database");
}
header->dirty = true;
file.markAsDirty(0, sizeof(dbHeader));
file.flush(true);
}
}
oid_t dbDatabase::allocateId(int n)
{
setDirty();
oid_t oid;
int curr = 1-header->curr;
if ( n == 1 ) {
if ( (oid = header->root[curr].freeList) != 0 ) {
header->root[curr].freeList =
(oid_t)(currIndex[oid] - dbFreeHandleMarker);
unsigned i = oid / dbHandlesPerPage;
monitor->dirtyPagesMap[i >> 5] |= 1 << (i & 31);
return oid;
}
}
if ( currIndexSize + n > header->root[curr].indexSize ) {
size_t oldIndexSize = header->root[curr].indexSize;
size_t newIndexSize = oldIndexSize * 2;
while ( newIndexSize < oldIndexSize + n ) {
newIndexSize = newIndexSize*2;
}
TRACE_MSG(("Extend index size from %ld to %ld\n",
oldIndexSize, newIndexSize));
offs_t newIndex = allocate(newIndexSize*sizeof(offs_t));
memcpy(baseAddr + newIndex, currIndex, currIndexSize*sizeof(offs_t));
currIndex = index[curr] = (offs_t*)(baseAddr + newIndex);
deallocate(header->root[curr].index, oldIndexSize*sizeof(offs_t));
header->root[curr].index = newIndex;
header->root[curr].indexSize = newIndexSize;
file.markAsDirty(newIndex, currIndexSize*sizeof(offs_t));
}
oid = currIndexSize;
header->root[curr].indexUsed = currIndexSize += n;
return oid;
}
void dbDatabase::freeId(oid_t oid, int n)
{
oid_t freeList = header->root[1-header->curr].freeList;
while ( --n >= 0 ) {
unsigned i = oid / dbHandlesPerPage;
monitor->dirtyPagesMap[i >> 5] |= 1 << (i & 31);
currIndex[oid] = freeList + dbFreeHandleMarker;
freeList = oid++;
}
header->root[1-header->curr].freeList = freeList;
}
bool dbDatabase::beginTransaction(dbLockType lockType)
{
bool gotSem = false;
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"Entering dbDatabase::beginTransaction lockType(%d)",
(int)lockType);
OsSysLog::flush();
}
# endif
dbDatabaseThreadContext* ctx = threadContext.get();
if ( commitDelay != 0 && lockType != dbCommitLock ) {
dbCriticalSection cs(delayedCommitStopTimerMutex);
monitor->forceCommitCount += 1;
if ( monitor->delayedCommitContext == ctx && ctx->commitDelayed ) {
// skip delayed transaction because this thread is starting new transaction
monitor->delayedCommitContext = NULL;
ctx->commitDelayed = false;
if ( commitTimerStarted != 0 ) {
long elapsed = time(NULL) - commitTimerStarted;
if ( commitTimeout < elapsed ) {
commitTimeout = 0;
} else {
commitTimeout -= elapsed;
}
delayedCommitStopTimerEvent.signal();
}
}
}
if ( accessType == dbConcurrentUpdate && lockType != dbCommitLock ) {
if ( !ctx->mutatorCSLocked ) {
mutatorCS.enter();
ctx->mutatorCSLocked = true;
}
} else if ( lockType != dbSharedLock ) {
if ( !ctx->writeAccess ) {
assert(accessType != dbReadOnly && accessType != dbConcurrentRead);
cs.enter();
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction "
"after cs.enter ctx access (W=%d, R=%d), accessType=%d",
(int)pid, (int)ctx->writeAccess, (int)ctx->readAccess, accessType);
}
# endif
if ( ctx->readAccess ) {
assert(monitor->nWriters == 0);
TRACE_MSG(("Attempt to upgrade lock from shared to exclusive can cause deadlock\n"));
if ( monitor->nReaders != 1 ) {
if ( monitor->waitForUpgrade ) {
handleError(Deadlock);
}
monitor->waitForUpgrade = true;
monitor->nWaitWriters += 1;
cs.leave();
if ( commitDelay != 0 ) {
delayedCommitStopTimerEvent.signal();
}
gotSem = upgradeSem.wait(dbWaitLockTimeout);
while ( !gotSem )
{
// There are no writers, so some reader was died
cs.enter();
unsigned currTime = dbSystem::getCurrentTimeMsec();
if ( currTime - monitor->lastDeadlockRecoveryTime
>= dbWaitLockTimeout )
{
// Ok, let's try to "remove" this reader
monitor->lastDeadlockRecoveryTime = currTime;
if ( --monitor->nReaders == 1 ) {
// Looks like we are recovered
monitor->nWriters = 1;
monitor->nReaders = 0;
monitor->nWaitWriters -= 1;
monitor->waitForUpgrade = false;
cs.leave();
break;
}
}
cs.leave();
gotSem = upgradeSem.wait(dbWaitLockTimeout);
}
assert(monitor->nWriters == 1 && monitor->nReaders == 0);
} else {
monitor->nWriters = 1;
monitor->nReaders = 0;
cs.leave();
}
} else {
if ( monitor->nWriters != 0 || monitor->nReaders != 0 ) {
monitor->nWaitWriters += 1;
cs.leave();
if ( commitDelay != 0 ) {
delayedCommitStopTimerEvent.signal();
}
gotSem = writeSem.wait(dbWaitLockTimeout);
while ( !gotSem )
{
cs.enter();
unsigned currTime = dbSystem::getCurrentTimeMsec();
if ( currTime - monitor->lastDeadlockRecoveryTime
>= dbWaitLockTimeout )
{
monitor->lastDeadlockRecoveryTime = currTime;
if ( monitor->nWriters != 0 ) {
// writer was died
checkVersion();
recovery();
monitor->nWriters = 1;
monitor->nWaitWriters -= 1;
cs.leave();
break;
} else {
// some reader was died
// Ok, let's try to "remove" this reader
if ( --monitor->nReaders == 0 ) {
// Looks like we are recovered
monitor->nWriters = 1;
monitor->nWaitWriters -= 1;
cs.leave();
break;
}
}
}
cs.leave();
gotSem = writeSem.wait(dbWaitLockTimeout);
}
assert(monitor->nWriters == 1 && monitor->nReaders == 0);
} else {
monitor->nWriters = 1;
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction "
"before cs.leave() ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
OsSysLog::flush();
}
# endif
cs.leave();
}
}
monitor->ownerPid = ctx->currPid;
ctx->writeAccess = true;
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"dbDatabase::beginTransaction setting ownerPid ctx "
"access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
OsSysLog::flush();
}
# endif
} else {
if ( monitor->ownerPid != ctx->currPid ) {
# ifdef FASTDB_VERBOSE_LOGGING
OsSysLog::add(FAC_DB, PRI_DEBUG,
"dbDatabase::beginTransaction before LockRevoked"
" ctx access (W=%d, R=%d), accessType=%d, ctx->currPid=%d, %d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType,
(int)ctx->currPid.getTid(), (int)ctx->currPid.getPid());
OsSysLog::flush();
# endif
handleError(LockRevoked);
}
}
} else {
if ( !ctx->readAccess && !ctx->writeAccess ) {
cs.enter();
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction after cs.enter "
"ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
}
# endif
if ( monitor->nWriters + monitor->nWaitWriters != 0 ) {
monitor->nWaitReaders += 1;
cs.leave();
if ( commitDelay != 0 ) {
delayedCommitStopTimerEvent.signal();
}
gotSem = readSem.wait(dbWaitLockTimeout);
while ( !gotSem )
{
cs.enter();
unsigned currTime = dbSystem::getCurrentTimeMsec();
if ( currTime - monitor->lastDeadlockRecoveryTime
>= dbWaitLockTimeout )
{
monitor->lastDeadlockRecoveryTime = currTime;
if ( monitor->nWriters != 0 ) {
// writer was died
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction:"
"(line=%d) before checkVersion (W=%d, R=%d), "
"accessType=%d",
__LINE__, (int)ctx->writeAccess,
(int)ctx->readAccess, accessType);
}
# endif
checkVersion();
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction:"
"(line=%d) after checkVersion (W=%d, R=%d), "
"accessType=%d",
__LINE__, (int)ctx->writeAccess,
(int)ctx->readAccess, accessType);
}
# endif
recovery();
monitor->nWriters = 0;
} else {
// some potential writer was died
monitor->nWaitWriters -= 1;
}
monitor->nReaders += 1;
monitor->nWaitReaders -= 1;
cs.leave();
break;
}
cs.leave();
gotSem = readSem.wait(dbWaitLockTimeout);
}
assert(monitor->nWriters == 0 && monitor->nReaders > 0);
} else {
monitor->nReaders += 1;
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG,
"dbDatabase::beginTransaction before cs.leave() "
"ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
OsSysLog::flush();
}
# endif
cs.leave();
}
ctx->readAccess = true;
}
}
if ( lockType != dbCommitLock ) {
if ( commitDelay != 0 ) {
dbCriticalSection cs(delayedCommitStopTimerMutex);
monitor->forceCommitCount -= 1;
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction:(line=%d) "
"before checkVersion (W=%d, R=%d), accessType=%d",
__LINE__, (int)ctx->writeAccess, (int)ctx->readAccess, accessType);
OsSysLog::flush();
}
# endif
if ( !checkVersion() ) {
return false;
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction:(line=%d) "
"after checkVersion (W=%d, R=%d), accessType=%d",
__LINE__, (int)ctx->writeAccess, (int)ctx->readAccess, accessType);
}
# endif
cs.enter();
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction "
"after cs.enter ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
}
# endif
index[0] = (offs_t*)(baseAddr + header->root[0].index);
index[1] = (offs_t*)(baseAddr + header->root[1].index);
if ( lockType == dbExclusiveLock && !header->dirty ) {
header->dirty = true;
file.markAsDirty(0, sizeof(dbHeader));
file.flush(true);
}
int curr = monitor->curr;
if ( accessType != dbConcurrentRead ) {
currIndex = index[1-curr];
currIndexSize = header->root[1-curr].indexUsed;
committedIndexSize = header->root[curr].indexUsed;
} else {
currIndex = index[curr];
currIndexSize = header->root[curr].indexUsed;
committedIndexSize = header->root[curr].indexUsed;
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::beginTransaction "
"before cs.leave() ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
OsSysLog::flush();
}
# endif
cs.leave();
}
return true;
}
bool dbDatabase::checkVersion()
{
if ( version != monitor->version ) {
sprintf(databaseName+databaseNameLen, ".%d", monitor->version);
if ( version == 0 ) {
if ( file.open(fileName, databaseName, accessType == dbReadOnly || accessType == dbConcurrentRead, mmapSize, false)
!= dbFile::ok )
{
handleError(DatabaseOpenError, "Failed to open database file");
endTransaction(); // release locks
return false;
}
} else {
int status = file.setSize(header->size, databaseName, false);
if ( status != dbFile::ok ) {
handleError(FileError,"Failed to reopen database file",status);
endTransaction(); // release locks
return false;
}
}
version = monitor->version;
baseAddr = (byte*)file.getAddr();
header = (dbHeader*)baseAddr;
assert(file.getSize() == header->size);
}
return true;
}
void dbDatabase::precommit()
{
assert(accessType != dbConcurrentUpdate);
dbDatabaseThreadContext* ctx = threadContext.get();
if ( ctx != NULL && (ctx->writeAccess || ctx->readAccess) ) {
ctx->concurrentId = monitor->concurrentTransId;
endTransaction(ctx);
}
}
void dbDatabase::delayedCommit()
{
dbCriticalSection cs(delayedCommitStartTimerMutex);
commitThreadSyncEvent.signal();
while ( !stopDelayedCommitThread ) {
delayedCommitStartTimerEvent.wait(delayedCommitStartTimerMutex);
delayedCommitStartTimerEvent.reset();
int forceCommitCount;
{
dbCriticalSection cs2(delayedCommitStopTimerMutex);
forceCommitCount = monitor->forceCommitCount;
}
if ( !stopDelayedCommitThread && forceCommitCount == 0 ) {
commitTimerStarted = time(NULL);
delayedCommitStopTimerEvent.wait(commitTimeout*1000);
delayedCommitStopTimerEvent.reset();
}
{
dbCriticalSection cs2(delayedCommitStopTimerMutex);
dbDatabaseThreadContext* ctx = monitor->delayedCommitContext;
if ( ctx != NULL ) {
commitTimeout = commitDelay;
monitor->delayedCommitContext = NULL;
threadContext.set(ctx);
commit(ctx);
ctx->commitDelayed = false;
if ( ctx->removeContext ) {
dbCriticalSection cs(threadContextListMutex);
delete ctx;
}
}
}
}
}
void dbDatabase::commit()
{
dbDatabaseThreadContext* ctx = threadContext.get();
if ( ctx != NULL && !ctx->commitDelayed ) {
if ( ctx->writeAccess ) {
if ( monitor->ownerPid != ctx->currPid ) {
int pid = getProcessID();
OsSysLog::add(FAC_DB, PRI_DEBUG, "(pid=%d) dbDatabase::commit before LockRevoked ctx access (W=%d, R=%d), accessType=%d, ctx->currPid=%d, %d", (int)pid, (int)ctx->writeAccess, (int)ctx->readAccess, accessType, (int)ctx->currPid.getTid(), (int)ctx->currPid.getPid());
OsSysLog::add(FAC_DB, PRI_DEBUG, "(pid=%d) dbDatabase::commit before LockRevoked monitor(owner=%d, %d, nWriters=%d, nReaders=%d, nWaitWriters=%d, nWaitReaders=%d, sem=%d mutatorSem=%d), waitForUpgrade(%d)", (int)pid, (int)monitor->ownerPid.getTid(), (int)monitor->ownerPid.getPid(), monitor->nWriters, monitor->nReaders, monitor->nWaitWriters, monitor->nWaitReaders, LOG_SEM(monitor->sem), LOG_SEM(monitor->mutatorSem), monitor->waitForUpgrade);
OsSysLog::flush();
handleError(LockRevoked);
}
}
cs.enter();
bool hasSomethingToCommit = modified && !monitor->commitInProgress
&& (monitor->uncommittedChanges || ctx->writeAccess || ctx->mutatorCSLocked || ctx->concurrentId == monitor->concurrentTransId);
cs.leave();
if ( hasSomethingToCommit ) {
if ( !ctx->writeAccess ) {
beginTransaction(ctx->mutatorCSLocked ? dbCommitLock : dbExclusiveLock);
}
if ( commitDelay != 0 ) {
dbCriticalSection cs(delayedCommitStartTimerMutex);
monitor->delayedCommitContext = ctx;
ctx->commitDelayed = true;
delayedCommitStartTimerEvent.signal();
} else {
commit(ctx);
}
} else {
if ( ctx->writeAccess || ctx->readAccess || ctx->mutatorCSLocked ) {
endTransaction(ctx);
}
}
}
}
void dbDatabase::commit(dbDatabaseThreadContext* ctx)
{
//
// commit transaction
//
int curr = header->curr;
int4 *map = monitor->dirtyPagesMap;
size_t oldIndexSize = header->root[curr].indexSize;
size_t newIndexSize = header->root[1-curr].indexSize;
if ( newIndexSize > oldIndexSize ) {
offs_t newIndex = allocate(newIndexSize*sizeof(offs_t));
header->root[1-curr].shadowIndex = newIndex;
header->root[1-curr].shadowIndexSize = newIndexSize;
cloneBitmap(header->root[curr].index, oldIndexSize*sizeof(offs_t));
deallocate(header->root[curr].index, oldIndexSize*sizeof(offs_t));
}
//
// Enable read access to the database
//
cs.enter();
assert(ctx->writeAccess);
monitor->commitInProgress = true;
monitor->nWriters -= 1;
monitor->nReaders += 1;
monitor->ownerPid.clear();
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::commit after ownerPid.clear() "
"ctx access (W=%d, R=%d), accessType=%d, ctx->currPid=%d, %d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType,
(int)ctx->currPid.getTid(), (int)ctx->currPid.getPid());
OsSysLog::flush();
}
# endif
if ( accessType == dbConcurrentUpdate ) {
// now readers will see updated data
monitor->curr ^= 1;
}
if ( monitor->nWaitReaders != 0 ) {
monitor->nReaders += monitor->nWaitReaders;
readSem.signal(monitor->nWaitReaders);
monitor->nWaitReaders = 0;
}
ctx->writeAccess = false;
ctx->readAccess = true;
cs.leave();
size_t nPages = committedIndexSize / dbHandlesPerPage;
offs_t* srcIndex = currIndex;
offs_t* dstIndex = index[curr];
for ( size_t i = 0; i < nPages; i++ ) {
if ( map[i >> 5] & (1 << (i & 31)) ) {
file.markAsDirty(header->root[1-curr].index + i*dbPageSize, dbPageSize);
for ( size_t j = 0; j < dbHandlesPerPage; j++ ) {
offs_t offs = dstIndex[j];
if ( srcIndex[j] != offs ) {
if ( !(offs & dbFreeHandleMarker) ) {
size_t marker = offs & dbInternalObjectMarker;
if ( marker != 0 ) {
deallocate(offs-marker, internalObjectSize[marker]);
} else {
deallocate(offs, ((dbRecord*)(baseAddr+offs))->size);
}
}
}
}
}
dstIndex += dbHandlesPerPage;
srcIndex += dbHandlesPerPage;
}
file.markAsDirty(header->root[1-curr].index + nPages*dbPageSize,
(currIndexSize - nPages*dbHandlesPerPage)*sizeof(offs_t));
offs_t* end = index[curr] + committedIndexSize;
while ( dstIndex < end ) {
offs_t offs = *dstIndex;
if ( *srcIndex != offs ) {
if ( !(offs & dbFreeHandleMarker) ) {
size_t marker = offs & dbInternalObjectMarker;
if ( marker != 0 ) {
deallocate(offs-marker, internalObjectSize[marker]);
} else {
deallocate(offs, ((dbRecord*)(baseAddr+offs))->size);
}
}
}
dstIndex += 1;
srcIndex += 1;
}
file.markAsDirty(0, sizeof(dbHeader));
file.flush();
cs.enter();
while ( monitor->backupInProgress ) {
cs.leave();
backupCompletedEvent.wait();
cs.enter();
}
header->curr = curr ^= 1;
cs.leave();
file.markAsDirty(0, sizeof(dbHeader));
file.flush();
file.markAsDirty(0, sizeof(dbHeader));
header->root[1-curr].indexUsed = currIndexSize;
header->root[1-curr].freeList = header->root[curr].freeList;
if ( newIndexSize != oldIndexSize ) {
header->root[1-curr].index=header->root[curr].shadowIndex;
header->root[1-curr].indexSize=header->root[curr].shadowIndexSize;
header->root[1-curr].shadowIndex=header->root[curr].index;
header->root[1-curr].shadowIndexSize=header->root[curr].indexSize;
file.markAsDirty(header->root[1-curr].index, currIndexSize*sizeof(offs_t));
memcpy(baseAddr + header->root[1-curr].index, currIndex,
currIndexSize*sizeof(offs_t));
memset(map, 0, 4*((currIndexSize+dbHandlesPerPage*32-1)
/ (dbHandlesPerPage*32)));
} else {
byte* srcIndex = (byte*)currIndex;
byte* dstIndex = (byte*)index[1-curr];
for ( size_t i = 0; i < nPages; i++ ) {
if ( map[i >> 5] & (1 << (i & 31)) ) {
map[i >> 5] -= (1 << (i & 31));
memcpy(dstIndex, srcIndex, dbPageSize);
file.markAsDirty(header->root[1-curr].index + i*dbPageSize, dbPageSize);
}
srcIndex += dbPageSize;
dstIndex += dbPageSize;
}
if ( currIndexSize > nPages*dbHandlesPerPage ) {
memcpy(dstIndex, srcIndex,
sizeof(offs_t)*(currIndexSize-nPages*dbHandlesPerPage));
file.markAsDirty(header->root[1-curr].index + nPages*dbPageSize,
sizeof(offs_t)*(currIndexSize-nPages*dbHandlesPerPage));
memset(map + (nPages>>5), 0,
((currIndexSize + dbHandlesPerPage*32 - 1)
/ (dbHandlesPerPage*32) - (nPages>>5))*4);
}
}
cs.enter();
modified = false;
monitor->uncommittedChanges = false;
monitor->commitInProgress = false;
if ( accessType != dbConcurrentUpdate ) {
monitor->curr = curr;
}
monitor->concurrentTransId += 1;
cs.leave();
if ( ctx->writeAccess || ctx->readAccess || ctx->mutatorCSLocked ) {
endTransaction(ctx);
}
}
void dbDatabase::rollback()
{
dbDatabaseThreadContext* ctx = threadContext.get();
if ( modified
&& (monitor->uncommittedChanges || ctx->writeAccess || ctx->mutatorCSLocked || ctx->concurrentId == monitor->concurrentTransId) )
{
if ( !ctx->writeAccess && !ctx->mutatorCSLocked ) {
beginTransaction(dbExclusiveLock);
}
int curr = header->curr;
byte* dstIndex = baseAddr + header->root[curr].shadowIndex;
byte* srcIndex = (byte*)index[curr];
currRBitmapPage = currPBitmapPage = dbBitmapId;
currRBitmapOffs = currPBitmapOffs = 0;
size_t nPages =
(committedIndexSize + dbHandlesPerPage - 1) / dbHandlesPerPage;
int4 *map = monitor->dirtyPagesMap;
if ( header->root[1-curr].index != header->root[curr].shadowIndex ) {
memcpy(dstIndex, srcIndex, nPages*dbPageSize);
file.markAsDirty( header->root[curr].shadowIndex, nPages*dbPageSize);
} else {
for ( size_t i = 0; i < nPages; i++ ) {
if ( map[i >> 5] & (1 << (i & 31)) ) {
memcpy(dstIndex, srcIndex, dbPageSize);
file.markAsDirty(header->root[1-curr].index + i*dbPageSize, dbPageSize);
}
srcIndex += dbPageSize;
dstIndex += dbPageSize;
}
}
header->root[1-curr].indexSize = header->root[curr].shadowIndexSize;
header->root[1-curr].indexUsed = header->root[curr].indexUsed;
header->root[1-curr].freeList = header->root[curr].freeList;
header->root[1-curr].index = header->root[curr].shadowIndex;
memset(map, 0,
size_t((currIndexSize+dbHandlesPerPage*32-1) / (dbHandlesPerPage*32))*4);
file.markAsDirty(0, sizeof(dbHeader));
modified = false;
monitor->uncommittedChanges = false;
monitor->concurrentTransId += 1;
restoreTablesConsistency();
}
if ( monitor->users != 0 ) { // if not abandon
endTransaction(ctx);
}
}
void dbDatabase::updateCursors(oid_t oid, bool removed)
{
dbDatabaseThreadContext* ctx = threadContext.get();
if ( ctx != NULL ) {
for ( dbAnyCursor* cursor = (dbAnyCursor*)ctx->cursors.next;
cursor != &ctx->cursors;
cursor = (dbAnyCursor*)cursor->next )
{
if ( cursor->currId == oid ) {
if ( removed ) {
cursor->currId = 0;
} else if ( cursor->record != NULL && !cursor->updateInProgress ) {
cursor->fetch();
}
}
}
}
}
void dbDatabase::endTransaction(dbDatabaseThreadContext* ctx)
{
while ( !ctx->cursors.isEmpty() ) {
((dbAnyCursor*)ctx->cursors.next)->reset();
}
if ( ctx->writeAccess ) {
cs.enter();
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "Entering dbDatabase::endTransaction, "
"after cs.enter ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
}
# endif
monitor->nWriters -= 1;
monitor->ownerPid.clear();
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::endTransaction after ownerPid.clear() "
"ctx access (W=%d, R=%d), accessType=%d, ctx->currPid=%d, %d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType,
(int)ctx->currPid.getTid(), (int)ctx->currPid.getPid());
OsSysLog::flush();
}
# endif
assert(monitor->nWriters == 0 && !monitor->waitForUpgrade);
if ( monitor->nWaitWriters != 0 ) {
monitor->nWaitWriters -= 1;
monitor->nWriters = 1;
writeSem.signal();
} else if ( monitor->nWaitReaders != 0 ) {
monitor->nReaders = monitor->nWaitReaders;
monitor->nWaitReaders = 0;
readSem.signal(monitor->nReaders);
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::endTransaction, before cs.leave "
"ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
}
# endif
cs.leave();
} else if ( ctx->readAccess ) {
cs.enter();
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "Entering dbDatabase::endTransaction, after cs.enter"
" ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
}
# endif
monitor->nReaders -= 1;
if ( monitor->nReaders == 1 && monitor->waitForUpgrade ) {
assert(monitor->nWriters == 0);
monitor->waitForUpgrade = false;
monitor->nWaitWriters -= 1;
monitor->nWriters = 1;
monitor->nReaders = 0;
upgradeSem.signal();
} else if ( monitor->nReaders == 0 ) {
if ( monitor->nWaitWriters != 0 ) {
assert(monitor->nWriters == 0 && !monitor->waitForUpgrade);
monitor->nWaitWriters -= 1;
monitor->nWriters = 1;
writeSem.signal();
}
}
# ifdef FASTDB_VERBOSE_LOGGING
{
OsSysLog::add(FAC_DB, PRI_DEBUG, "dbDatabase::endTransaction, before cs.leave "
"ctx access (W=%d, R=%d), accessType=%d",
(int)ctx->writeAccess, (int)ctx->readAccess, accessType);
}
# endif
cs.leave();
}
ctx->writeAccess = false;
ctx->readAccess = false;
if ( ctx->mutatorCSLocked ) {
ctx->mutatorCSLocked = false;
mutatorCS.leave();
}
}
void dbDatabase::linkTable(dbTableDescriptor* table, oid_t tableId)
{
assert(((void)"Table can be used only in one database",
table->tableId == 0));
table->db = this;
table->nextDbTable = tables;
table->tableId = tableId;
tables = table;
}
void dbDatabase::unlinkTable(dbTableDescriptor* table)
{
dbTableDescriptor** tpp;
for ( tpp = &tables; *tpp != table; tpp = &(*tpp)->nextDbTable );
*tpp = table->nextDbTable;
table->tableId = 0;
if ( !table->fixedDatabase ) {
table->db = NULL;
}
}
dbTableDescriptor* dbDatabase::findTableByName(char const* name)
{
char* sym = (char*)name;
dbSymbolTable::add(sym, tkn_ident, FASTDB_CLONE_ANY_IDENTIFIER);
return findTable(sym);
}
dbTableDescriptor* dbDatabase::findTable(char const* name)
{
for ( dbTableDescriptor* desc=tables; desc != NULL; desc=desc->nextDbTable )
{
if ( desc->name == name ) {
return desc;
}
}
return NULL;
}
void dbDatabase::insertInverseReference(dbFieldDescriptor* fd, oid_t inverseId,
oid_t targetId)
{
byte buf[1024];
if ( inverseId == targetId ) {
return;
}
fd = fd->inverseRef;
if ( fd->type == dbField::tpArray ) {
dbTableDescriptor* desc = fd->defTable;
dbRecord* rec = getRow(targetId);
dbVarying* arr = (dbVarying*)((byte*)rec + fd->dbsOffs);
size_t arrSize = arr->size;
size_t arrOffs = arr->offs;
offs_t oldOffs = currIndex[targetId];
size_t newSize = desc->fixedSize;
size_t lastOffs = desc->columns->sizeWithoutOneField(fd, (byte*)rec, newSize);
size_t newArrOffs = DOALIGN(newSize, sizeof(oid_t));
size_t oldSize = rec->size;
newSize = newArrOffs + (arrSize + 1)*sizeof(oid_t);
if ( newSize > oldSize ) {
newSize = newArrOffs + (arrSize+1)*sizeof(oid_t)*2;
} else {
newSize = oldSize;
}
byte* dst = (byte*)putRow(targetId, newSize);
byte* src = baseAddr + oldOffs;
byte* tmp = NULL;
if ( dst == src ) {
if ( arrOffs == newArrOffs && newArrOffs > lastOffs ) {
*((oid_t*)((byte*)rec + newArrOffs) + arrSize) = inverseId;
arr->size += 1;
updateCursors(targetId);
return;
}
if ( oldSize > sizeof(buf) ) {
src = tmp = dbMalloc(oldSize);
} else {
src = buf;
}
memcpy(src, rec, oldSize);
}
desc->columns->copyRecordExceptOneField(fd, dst, src, desc->fixedSize);
arr = (dbVarying*)(dst + fd->dbsOffs);
arr->size = arrSize + 1;
arr->offs = newArrOffs;
memcpy(dst + newArrOffs, src + arrOffs, arrSize*sizeof(oid_t));
*((oid_t*)(dst + newArrOffs) + arrSize) = inverseId;
if ( tmp != NULL ) {
dbFree(tmp);
}
} else {
*(oid_t*)((byte*)putRow(targetId) + fd->dbsOffs) = inverseId;
}
updateCursors(targetId);
}
void dbDatabase::removeInverseReferences(dbTableDescriptor* desc, oid_t oid)
{
dbVisitedObject* chain = visitedChain;
dbVisitedObject vo(oid, chain);
visitedChain = &vo;
dbFieldDescriptor* fd;
for ( fd = desc->inverseFields; fd != NULL; fd = fd->nextInverseField ) {
if ( fd->type == dbField::tpArray ) {
dbVarying* arr = (dbVarying*)((byte*)getRow(oid) + fd->dbsOffs);
int n = arr->size;
int offs = arr->offs + n*sizeof(oid_t);
while ( --n >= 0 ) {
offs -= sizeof(oid_t);
oid_t ref = *(oid_t*)((byte*)getRow(oid) + offs);
if ( ref != 0 ) {
removeInverseReference(fd, oid, ref);
}
}
} else {
oid_t ref = *(oid_t*)((byte*)getRow(oid) + fd->dbsOffs);
if ( ref != 0 ) {
removeInverseReference(fd, oid, ref);
}
}
}
visitedChain = chain;
}
void dbDatabase::removeInverseReference(dbFieldDescriptor* fd,
oid_t inverseId,
oid_t targetId)
{
if ( inverseId == targetId || targetId == updatedRecordId ||
(currIndex[targetId] & dbFreeHandleMarker) != 0 )
{
return;
}
for ( dbVisitedObject* vo = visitedChain; vo != NULL; vo = vo->next ) {
if ( vo->oid == targetId ) {
return;
}
}
byte* rec = (byte*)putRow(targetId);
if ( (fd->indexType & DB_FIELD_CASCADE_DELETE)
&& ((fd->inverseRef->type != dbField::tpArray) ||
((dbVarying*)(rec + fd->inverseRef->dbsOffs))->size <= 1) )
{
remove(fd->inverseRef->defTable, targetId);
return;
}
fd = fd->inverseRef;
if ( fd->type == dbField::tpArray ) {
dbVarying* arr = (dbVarying*)(rec + fd->dbsOffs);
oid_t* p = (oid_t*)(rec + arr->offs);
for ( int n = arr->size, i = n; --i >= 0; ) {
if ( p[i] == inverseId ) {
while ( ++i < n ) {
p[i-1] = p[i];
}
arr->size -= 1;
break;
}
}
} else {
if ( *(oid_t*)(rec + fd->dbsOffs) == inverseId ) {
*(oid_t*)(rec + fd->dbsOffs) = 0;
}
}
updateCursors(targetId);
}
bool dbDatabase::completeDescriptorsInitialization()
{
dbTableDescriptor* desc;
bool result = true;
for ( desc = tables; desc != NULL; desc = desc->nextDbTable ) {
dbFieldDescriptor* fd;
for ( fd = desc->firstField; fd != NULL; fd = fd->nextField ) {
if ( fd->refTableName != NULL ) {
fd->refTable = findTable(fd->refTableName);
if ( fd->refTable == NULL ) {
result = false;
}
}
}
if ( result ) {
desc->checkRelationship();
}
}
return result;
}
bool dbDatabase::backup(char const* file, bool compactify)
{
dbFile f;
bool result = true;
if ( f.create(file, !compactify) != dbFile::ok ) {
return false;
}
backupCompletedEvent.reset();
cs.enter();
if ( monitor->backupInProgress ) {
cs.leave();
return false; // no two concurrent backups are possible
}
monitor->backupInProgress = true;
cs.leave();
if ( compactify ) {
int curr = header->curr;
size_t nObjects = header->root[1-curr].indexUsed;
size_t i;
size_t nIndexPages = (header->root[1-curr].indexSize + dbHandlesPerPage - 1) / dbHandlesPerPage;
offs_t* newIndex = new offs_t[nIndexPages*dbHandlesPerPage];
memset(newIndex, 0, nIndexPages*dbPageSize);
offs_t used = (nIndexPages*2 + 1)*dbPageSize;
for ( i = 0; i < nObjects; i++ ) {
offs_t offs = currIndex[i];
if ( !(offs & dbFreeHandleMarker) ) {
int marker = offs & dbInternalObjectMarker;
newIndex[i] = used | marker;
used += DOALIGN(marker ? internalObjectSize[marker] : getRow(i)->size,
dbAllocationQuantum);
} else {
newIndex[i] = offs;
}
}
byte page[dbPageSize];
memset(page, 0, sizeof page);
dbHeader* newHeader = (dbHeader*)page;
offs_t newFileSize = DOALIGN(used, dbPageSize);
newHeader->size = newFileSize;
newHeader->curr = 0;
newHeader->dirty = 0;
newHeader->initialized = true;
newHeader->majorVersion = header->majorVersion;
newHeader->minorVersion = header->minorVersion;
newHeader->root[0].index = newHeader->root[1].shadowIndex = dbPageSize;
newHeader->root[0].shadowIndex = newHeader->root[1].index = dbPageSize + nIndexPages*dbPageSize;
newHeader->root[0].shadowIndexSize = newHeader->root[0].indexSize =
newHeader->root[1].shadowIndexSize = newHeader->root[1].indexSize = nIndexPages*dbHandlesPerPage;
newHeader->root[0].indexUsed = newHeader->root[1].indexUsed = nObjects;
newHeader->root[0].freeList = newHeader->root[1].freeList = header->root[1-curr].freeList;
result &= f.write(page, dbPageSize);
result &= f.write(newIndex, nIndexPages*dbPageSize);
result &= f.write(newIndex, nIndexPages*dbPageSize);
for ( i = 0; i < nObjects; i++ ) {
offs_t offs = newIndex[i];
if ( !(offs & dbFreeHandleMarker) ) {
int marker = offs & dbInternalObjectMarker;
size_t size = DOALIGN(marker ? internalObjectSize[marker] : getRow(i)->size,
dbAllocationQuantum);
result &= f.write(baseAddr + currIndex[i] - marker, size);
}
}
if ( used != newFileSize ) {
assert(newFileSize - used < dbPageSize);
size_t align = (size_t)(newFileSize - used);
memset(page, 0, align);
result &= f.write(page, align);
}
delete[] newIndex;
} else { // end if compactify
result = f.write(baseAddr, header->size);
}
monitor->backupInProgress = false;
backupCompletedEvent.signal();
f.close();
return result;
}
dbDatabase::dbDatabase(dbAccessType type, size_t dbInitSize,
size_t dbExtensionQuantum, size_t dbInitIndexSize,
int nThreads
#ifdef NO_PTHREADS
, bool
#endif
) : accessType(type),
initSize(dbInitSize),
extensionQuantum(dbExtensionQuantum),
initIndexSize(dbInitIndexSize)
{
bitmapPageAvailableSpace = new int[dbBitmapId + dbBitmapPages];
setConcurrency(nThreads);
tables = NULL;
commitDelay = 0;
commitTimeout = 0;
commitTimerStarted = 0;
backupFileName = NULL;
backupPeriod = 0;
databaseName = NULL;
fileName = NULL;
opened = false;
dbFileSizeLimit = 0;
errorHandler = NULL;
confirmDeleteColumns = false;
visitedChain = NULL;
header = NULL;
}
dbDatabase::~dbDatabase()
{
delete[] bitmapPageAvailableSpace;
delete[] databaseName;
delete[] fileName;
}
dbDatabase::dbErrorHandler dbDatabase::setErrorHandler(dbDatabase::dbErrorHandler newHandler)
{
dbErrorHandler prevHandler = errorHandler;
errorHandler = newHandler;
return prevHandler;
}
void dbDatabase::cleanup()
{
delete &dbExprNode::mutex;
delete &dbQueryElementAllocator::instance.mutex;
dbExprNodeSegment* segm = dbExprNode::segmentList;
while ( segm != NULL ) {
dbExprNodeSegment* next = segm->next;
delete segm;
segm = next;
}
}
#ifdef REPLICATION_SUPPORT
#define MAX_LOST_TRANSACTIONS 100
int dbReplicatedDatabase::dbPollInterval = 10*1000; // milliseconds
int dbReplicatedDatabase::dbWaitReadyTimeout = 60*1000; // milliseconds
int dbReplicatedDatabase::dbWaitStatusTimeout = 60*1000; // milliseconds
int dbReplicatedDatabase::dbRecoveryConnectionAttempts = 3;
int dbReplicatedDatabase::dbStartupConnectionAttempts = 60;
char const* statusText[] = {
"OFFLINE",
"ONLINE",
"ACTIVE",
"STANDBY",
"RECOVERED"
};
char const* requestText[] = {
"CONNECT",
"RECOVERY",
"GET_STATUS",
"STATUS",
"UPDATE_PAGE",
"NEW_ACTIVE_NODE",
"CLOSE",
"READY"
};
bool dbReplicatedDatabase::open(char const* dbName, char const* fiName,
int id, char* servers[], int nServers)
{
int i;
char buf [64];
ReplicationRequest rr;
this->id = id;
this->nServers = nServers;
con = new dbConnection[nServers];
serverURL = servers;
delete[] databaseName;
delete[] fileName;
commitDelay = 0;
commitTimeout = 0;
commitTimerStarted = 0;
backupFileName = NULL;
backupPeriod = 0;
opened = false;
header = NULL;
stopDelayedCommitThread = false;
databaseNameLen = strlen(dbName);
char* name = new char[databaseNameLen+16];
sprintf(name, "%s.in", dbName);
databaseName = name;
if ( fiName == NULL ) {
fileName = new char[databaseNameLen + 5];
sprintf(fileName, "%s.fdb", dbName);
} else {
fileName = new char[strlen(fiName)+1];
strcpy(fileName, fiName);
}
dbInitializationMutex::initializationStatus initStatus;
initStatus = initMutex.initialize(name);
if ( initStatus == dbInitializationMutex::InitializationError ) {
handleError(DatabaseOpenError,
"Failed to start database initialization");
return false;
}
if ( initStatus != dbInitializationMutex::NotYetInitialized ) {
handleError(DatabaseOpenError, "Database is already started");
return false;
}
sprintf(name, "%s.dm", dbName);
if ( !shm.open(name) ) {
handleError(DatabaseOpenError, "Failed to open database monitor");
return false;
}
monitor = shm.get();
sprintf(name, "%s.ws", dbName);
if ( !writeSem.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize database writers semaphore");
return false;
}
sprintf(name, "%s.rs", dbName);
if ( !readSem.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize database readers semaphore");
return false;
}
sprintf(name, "%s.us", dbName);
if ( !upgradeSem.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize database upgrade semaphore");
return false;
}
sprintf(name, "%s.bce", dbName);
if ( !backupCompletedEvent.open(name) ) {
handleError(DatabaseOpenError,
"Failed to initialize database backup completed event");
return false;
}
allocatedSize = 0;
size_t indexSize = initIndexSize < dbFirstUserId
? size_t(dbFirstUserId) : initIndexSize;
indexSize = DOALIGN(indexSize, dbHandlesPerPage);
size_t fileSize = initSize ? initSize : dbDefaultInitDatabaseSize;
fileSize = DOALIGN(fileSize, dbBitmapSegmentSize);
if ( fileSize < indexSize*sizeof(offs_t)*4 ) {
fileSize = indexSize*sizeof(offs_t)*4;
}
#ifdef DISKLESS_CONFIGURATION
mmapSize = fileSize;
#else
mmapSize = 0;
#endif
for ( i = dbBitmapId + dbBitmapPages; --i >= 0; ) {
bitmapPageAvailableSpace[i] = INT_MAX;
}
currRBitmapPage = currPBitmapPage = dbBitmapId;
currRBitmapOffs = currPBitmapOffs = 0;
reservedChain = NULL;
tables = NULL;
modified = false;
attach();
sprintf(name, "%s.cs", dbName);
if ( !cs.create(name, &monitor->sem) ) {
handleError(DatabaseOpenError,
"Failed to initialize database monitor");
return false;
}
if ( accessType == dbConcurrentUpdate ) {
sprintf(name, "%s.mcs", dbName);
if ( !mutatorCS.create(name, &monitor->mutatorSem) ) {
handleError(DatabaseOpenError,
"Failed to initialize database monitor");
return false;
}
}
readSem.reset();
writeSem.reset();
upgradeSem.reset();
monitor->nReaders = 0;
monitor->nWriters = 0;
monitor->nWaitReaders = 0;
monitor->nWaitWriters = 0;
monitor->waitForUpgrade = false;
monitor->version = version = 1;
monitor->users = 0;
monitor->backupInProgress = 0;
monitor->forceCommitCount = 0;
monitor->lastDeadlockRecoveryTime = 0;
monitor->delayedCommitContext = NULL;
monitor->concurrentTransId = 1;
monitor->commitInProgress = false;
monitor->uncommittedChanges = false;
memset(monitor->dirtyPagesMap, 0, dbDirtyPageBitmapSize);
sprintf(databaseName, "%s.%d", dbName, version);
if ( file.open(fileName, databaseName,
accessType == dbReadOnly || accessType == dbConcurrentRead, fileSize, true) != dbFile::ok )
{
handleError(DatabaseOpenError, "Failed to create database file");
return false;
}
baseAddr = (byte*)file.getAddr();
fileSize = file.getSize();
header = (dbHeader*)baseAddr;
if ( (unsigned)header->curr > 1 ) {
handleError(DatabaseOpenError, "Database file was corrupted: "
"invalid root index");
return false;
}
acceptSock = socket_t::create_global(servers[id]);
if ( !acceptSock->is_ok() ) {
acceptSock->get_error_text(buf, sizeof buf);
dbTrace("<<<FATAL>>> Failed to create accept socket: %s\n", buf);
return false;
}
FD_ZERO(&inputSD);
int acceptSockHnd = acceptSock->get_handle();
FD_SET(acceptSockHnd, &inputSD);
nInputSD = acceptSockHnd+1;
startEvent.open(false);
int connectionAttempts = dbStartupConnectionAttempts;
if ( !header->initialized ) {
if ( accessType == dbReadOnly || accessType == dbConcurrentRead ) {
handleError(DatabaseOpenError, "Can not open uninitialized "
"file in read only moode");
return false;
}
monitor->curr = header->curr = 0;
header->size = fileSize;
size_t used = dbPageSize;
header->root[0].index = used;
header->root[0].indexSize = indexSize;
header->root[0].indexUsed = dbFirstUserId;
header->root[0].freeList = 0;
used += indexSize*sizeof(offs_t);
header->root[1].index = used;
header->root[1].indexSize = indexSize;
header->root[1].indexUsed = dbFirstUserId;
header->root[1].freeList = 0;
used += indexSize*sizeof(offs_t);
header->root[0].shadowIndex = header->root[1].index;
header->root[1].shadowIndex = header->root[0].index;
header->root[0].shadowIndexSize = indexSize;
header->root[1].shadowIndexSize = indexSize;
header->majorVersion= FASTDB_MAJOR_VERSION;
header->minorVersion = FASTDB_MINOR_VERSION;
index[0] = (offs_t*)(baseAddr + header->root[0].index);
index[1] = (offs_t*)(baseAddr + header->root[1].index);
index[0][dbInvalidId] = dbFreeHandleMarker;
size_t bitmapPages =
(used + dbPageSize*(dbAllocationQuantum*8-1) - 1)
/ (dbPageSize*(dbAllocationQuantum*8-1));
memset(baseAddr+used, 0xFF, (used + bitmapPages*dbPageSize)
/ (dbAllocationQuantum*8));
size_t i;
for ( i = 0; i < bitmapPages; i++ ) {
index[0][dbBitmapId + i] = used + dbPageObjectMarker;
used += dbPageSize;
}
while ( i < dbBitmapPages ) {
index[0][dbBitmapId+i] = dbFreeHandleMarker;
i += 1;
}
currIndex = index[0];
currIndexSize = dbFirstUserId;
committedIndexSize = 0;
initializeMetaTable();
header->dirty = true;
memcpy(index[1], index[0], currIndexSize*sizeof(offs_t));
file.markAsDirty(0, used);
file.flush(true);
header->initialized = true;
file.markAsDirty(0, sizeof(dbHeader));
con[id].status = ST_ONLINE;
} else {
monitor->curr = header->curr;
if ( header->dirty ) {
dbTrace("Database was not normally closed: start recovery\n");
con[id].status = ST_RECOVERED;
connectionAttempts = dbRecoveryConnectionAttempts;
} else {
con[id].status = ST_ONLINE;
}
}
opened = true;
monitor->users += 1;
if ( accessType == dbConcurrentUpdate ) {
initMutex.done();
}
readerThread.create(startReader, this);
pollNodes:
bool startup = true;
activeNodeId = -1;
do {
for ( i = 0; i < nServers; i++ ) {
if ( i != id ) {
socket_t* s = con[i].reqSock;
if ( s == NULL ) {
TRACE_MSG(("Try to connect to node %d address '%s'\n", i, servers[i]));
s = socket_t::connect(servers[i],
socket_t::sock_global_domain,
connectionAttempts);
if ( !s->is_ok() ) {
s->get_error_text(buf, sizeof buf);
dbTrace("Failed to establish connection with node %d: %s\n",
i, buf);
delete s;
continue;
}
TRACE_MSG(("Establish connection with node %d address '%s'\n", i, servers[i]));
}
rr.op = ReplicationRequest::RR_GET_STATUS;
rr.nodeId = id;
bool success = false;
if ( con[i].reqSock == NULL ) {
if ( !s->write(&rr, sizeof rr) || !s->read(&rr, sizeof rr) ) {
dbTrace("Failed to get status from node %d\n", i);
delete s;
} else {
TRACE_MSG(("Node %d returns status %s\n", i, statusText[rr.status]));
addConnection(i, s);
con[i].status = rr.status;
success = true;
}
} else {
con[i].statusEvent.reset();
con[i].waitStatusEventFlag += 1;
TRACE_MSG(("Send GET_STATUS request to node %d\n", i));
if ( writeReq(i, rr) ) {
dbCriticalSection cs(startCS);
lockConnection(i);
if ( !con[i].statusEvent.wait(startCS, dbWaitStatusTimeout) ) {
dbTrace("Failed to get status from node %d\n", i);
deleteConnection(i);
} else {
TRACE_MSG(("Received response from node %d with status %s\n", i, statusText[con[i].status]));
success = true;
}
unlockConnection(i);
}
con[i].waitStatusEventFlag -= 1;
}
if ( success ) {
TRACE_MSG(("Status of node %d is %s\n", i, statusText[con[i].status]));
if ( con[i].status == ST_ACTIVE ) {
startup = false;
activeNodeId = i;
} else if ( con[i].status == ST_STANDBY ) {
startup = false;
}
}
}
}
} while ( con[id].status == ST_RECOVERED && activeNodeId < 0 );
if ( !startup ) {
//
// The node was activated after the active node start the user application
// So the node's data is out of date. Mark it as recovered.
//
con[id].status = ST_RECOVERED;
}
file.configure(this);
TRACE_MSG(("My status is %s\n", statusText[con[id].status]));
if ( con[id].status == ST_ONLINE ) {
for ( activeNodeId = 0;
activeNodeId < id && con[activeNodeId].status != ST_ONLINE;
activeNodeId++ );
if ( activeNodeId == id ) {
dbTrace("Node %d becomes active at startup\n", id);
//
// Nobody else pretends to be active, so I will be...
//
for ( i = 0; i < nServers; i++ ) {
lockConnection(i);
if ( i != id && con[i].status == ST_ONLINE ) {
dbCriticalSection cs(startCS);
TRACE_MSG(("Waiting ready event from node %d\n", i));
if ( !con[i].readyEvent.wait(startCS, dbWaitReadyTimeout) ) {
dbTrace("Didn't receive ready event from node %d\n", i);
deleteConnection(i);
unlockConnection(i);
goto pollNodes;
}
TRACE_MSG(("Received ready event from node %d\n", i));
}
unlockConnection(i);
}
con[id].status = ST_ACTIVE;
for ( i = 0; i < nServers; i++ ) {
if ( con[i].status == ST_ONLINE ) {
con[i].status = ST_STANDBY;
rr.op = ReplicationRequest::RR_STATUS;
rr.nodeId = i;
rr.status = ST_STANDBY;
TRACE_MSG(("Send STANDBY status to node %d\n", i));
writeReq(i, rr);
}
}
} else {
rr.op = ReplicationRequest::RR_READY;
rr.nodeId = id;
TRACE_MSG(("Send READY status to node %d\n", i));
if ( !writeReq(activeNodeId, rr) ) {
goto pollNodes;
}
}
} else {
TRACE_MSG(("Send RECOVERY request to node %d\n", activeNodeId));
rr.op = ReplicationRequest::RR_RECOVERY;
rr.size = file.getUpdateCountTableSize();
rr.nodeId = id;
if ( !writeReq(activeNodeId, rr, file.diskUpdateCount, rr.size*sizeof(int)) ) {
goto pollNodes;
}
}
TRACE_MSG(("My new status is %s\n", statusText[con[id].status]));
if ( con[id].status != ST_ACTIVE ) {
dbCriticalSection cs(startCS);
startEvent.wait(startCS);
baseAddr = (byte*)file.getAddr();
header = (dbHeader*)baseAddr;
if ( opened ) {
int curr = header->curr;
monitor->curr = curr;
offs_t* shadowIndex = (offs_t*)(baseAddr + header->root[curr].index);
offs_t* currIndex = (offs_t*)(baseAddr + header->root[1-curr].index);
for ( size_t i = 0, size = header->root[curr].indexUsed; i < size; i++ ) {
if ( currIndex[i] != shadowIndex[i] ) {
currIndex[i] = shadowIndex[i];
file.markAsDirty(header->root[1-curr].index + i*sizeof(offs_t), sizeof(offs_t));
}
}
this->currIndex = currIndex;
header->size = file.getSize();
header->root[1-curr].index = header->root[curr].shadowIndex;
header->root[1-curr].indexSize = header->root[curr].shadowIndexSize;
header->root[1-curr].shadowIndex = header->root[curr].index;
header->root[1-curr].shadowIndexSize = header->root[curr].indexSize;
header->root[1-curr].indexUsed = header->root[curr].indexUsed;
header->root[1-curr].freeList = header->root[curr].freeList;
file.markAsDirty(0, sizeof(dbHeader));
file.updateCounter += MAX_LOST_TRANSACTIONS;
restoreTablesConsistency();
file.flush();
}
}
if ( opened ) {
if ( loadScheme(true) ) {
if ( accessType != dbConcurrentUpdate ) {
initMutex.done();
}
return true;
}
} else {
file.flush(true);
header->dirty = false;
file.close();
delete[] con;
}
monitor->users -= 1;
return false;
}
void thread_proc dbReplicatedDatabase::startReader(void* arg)
{
((dbReplicatedDatabase*)arg)->reader();
}
void dbReplicatedDatabase::deleteConnection(int nodeId)
{
dbTrace("Delete connection with node %d\n", nodeId);
{
dbCriticalSection cs(sockCS);
while ( con[nodeId].useCount > 1 ) {
con[nodeId].waitUseEventFlag += 1;
con[nodeId].useCount -= 1;
con[nodeId].useEvent.reset();
con[nodeId].useEvent.wait(sockCS);
con[nodeId].waitUseEventFlag -= 1;
con[nodeId].useCount += 1;
}
con[nodeId].status = ST_OFFLINE;
if ( con[nodeId].reqSock != NULL ) {
FD_CLR(con[nodeId].reqSock->get_handle(), &inputSD);
delete con[nodeId].reqSock;
con[nodeId].reqSock = NULL;
}
if ( con[nodeId].respSock != NULL ) {
FD_CLR(con[nodeId].respSock->get_handle(), &inputSD);
delete con[nodeId].respSock;
con[nodeId].respSock = NULL;
}
}
if ( nodeId == activeNodeId ) {
changeActiveNode();
}
}
void dbReplicatedDatabase::changeActiveNode()
{
ReplicationRequest rr;
activeNodeId = -1;
TRACE_MSG(("Change active node\n"));
if ( con[id].status == ST_STANDBY ) {
int i;
for ( i = 0; i < id; i++ ) {
if ( con[i].status == ST_ONLINE || con[i].status == ST_STANDBY ) {
rr.op = ReplicationRequest::RR_GET_STATUS;
TRACE_MSG(("Send GET_STATUS request to node %d to choose new active node\n", i));
rr.nodeId = id;
if ( writeReq(i, rr) ) {
return;
}
}
}
dbTrace("Activate stand-by server %d\n", id);
for ( i = 0; i < nServers; i++ ) {
if ( con[i].status != ST_OFFLINE && i != id ) {
TRACE_MSG(("Send NEW_ACTIVE_NODE request to node %d\n", i));
rr.op = ReplicationRequest::RR_NEW_ACTIVE_NODE;
rr.nodeId = id;
if ( writeReq(i, rr) ) {
con[i].status = ST_STANDBY;
}
}
}
con[id].status = ST_ACTIVE;
{
dbCriticalSection cs(startCS);
startEvent.signal();
}
}
}
void dbReplicatedDatabase::reader()
{
char buf[256];
ReplicationRequest rr;
bool statusRequested = false;
bool closed = false;
dbDatabaseThreadContext* ctx = NULL;
if ( accessType == dbConcurrentUpdate ) {
ctx = new dbDatabaseThreadContext();
threadContext.set(ctx);
}
while ( opened ) {
timeval tv;
tv.tv_sec = dbPollInterval / 1000;
tv.tv_usec = dbPollInterval % 1000 * 1000;
fd_set ready = inputSD;
int rc = ::select(nInputSD, &ready, NULL, NULL, &tv);
if ( rc == 0 ) { // timeout
if ( !closed && con[id].status == ST_STANDBY ) {
if ( statusRequested || activeNodeId < 0 ) {
changeActiveNode();
} else {
rr.op = ReplicationRequest::RR_GET_STATUS;
rr.nodeId = id;
if ( !writeResp(activeNodeId, rr) ) {
dbTrace("Connection with active server lost\n");
activeNodeId = -1;
changeActiveNode();
} else {
statusRequested = true;
}
}
}
} else if ( rc < 0 ) {
if ( errno != EINTR ) {
dbTrace("Select failed: %d\n", errno);
tv.tv_sec = 0;
tv.tv_usec = 0;
FD_ZERO(&ready);
for ( int i = nInputSD; --i >= 0; ) {
FD_SET(i, &ready);
if ( ::select(i+1, &ready, NULL, NULL, &tv) < 0 ) {
FD_CLR(i, &inputSD);
for ( int j = nServers; --j >= 0; ) {
lockConnection(j);
if ( con[j].respSock != NULL
&& con[j].respSock->get_handle() == i )
{
deleteConnection(j);
}
unlockConnection(j);
}
}
FD_CLR(i, &ready);
}
}
} else {
statusRequested = false;
if ( FD_ISSET(acceptSock->get_handle(), &ready) ) {
socket_t* s = acceptSock->accept();
if ( s != NULL && s->read(&rr, sizeof rr) ) {
int op = rr.op;
int nodeId = rr.nodeId;
if ( op != ReplicationRequest::RR_GET_STATUS ) {
dbTrace("Receive unexpected request %d\n", rr.op);
delete s;
} else {
TRACE_MSG(("Send STATUS response %s for GET_STATUS request from node %d\n",
statusText[con[id].status], nodeId));
rr.op = ReplicationRequest::RR_STATUS;
rr.nodeId = id;
rr.status = con[id].status;
if ( !s->write(&rr, sizeof rr) ) {
s->get_error_text(buf, sizeof buf);
dbTrace("Failed to write response: %s\n", buf);
delete s;
} else {
lockConnection(nodeId);
if ( con[nodeId].respSock != NULL ) {
deleteConnection(nodeId);
}
{
dbCriticalSection cs(sockCS);
con[nodeId].respSock = s;
int hnd = s->get_handle();
if ( hnd >= nInputSD ) {
nInputSD = hnd+1;
}
FD_SET(hnd, &inputSD);
}
unlockConnection(nodeId);
}
}
} else if ( s == NULL ) {
acceptSock->get_error_text(buf, sizeof buf);
dbTrace("Accept failed: %s\n", buf);
} else {
s->get_error_text(buf, sizeof buf);
dbTrace("Failed to read login request: %s\n", buf);
delete s;
}
}
for ( int i = nServers; --i >= 0; ) {
lockConnection(i);
socket_t* s;
if ( ((s = con[i].respSock) != NULL && FD_ISSET(s->get_handle(), &ready))
|| ((s = con[i].reqSock) != NULL && FD_ISSET(s->get_handle(), &ready)) )
{
if ( !s->read(&rr, sizeof rr) ) {
if ( closed && i == activeNodeId ) {
dbCriticalSection cs(startCS);
opened = false;
startEvent.signal();
delete ctx;
return;
}
deleteConnection(i);
} else {
TRACE_MSG(("Receive request %s, status %s, size %d from node %d\n",
requestText[rr.op],
((rr.status <= ST_RECOVERED) ? statusText[rr.status] : "?"),
rr.size, rr.nodeId));
switch ( rr.op ) {
case ReplicationRequest::RR_GET_STATUS:
rr.op = ReplicationRequest::RR_STATUS;
rr.nodeId = id;
rr.status = con[id].status;
TRACE_MSG(("Send RR_STATUS response %s to node %d\n",
statusText[con[id].status], rr.nodeId));
writeResp(i, rr);
break;
case ReplicationRequest::RR_STATUS:
con[rr.nodeId].status = rr.status;
if ( con[rr.nodeId].waitStatusEventFlag ) {
con[rr.nodeId].statusEvent.signal();
} else if ( activeNodeId < 0 && rr.nodeId < id && rr.status == ST_RECOVERED ) {
changeActiveNode();
}
break;
case ReplicationRequest::RR_NEW_ACTIVE_NODE:
activeNodeId = rr.nodeId;
statusRequested = false;
break;
case ReplicationRequest::RR_CLOSE:
closed = true;
break;
case ReplicationRequest::RR_RECOVERY:
{
int* updateCountTable = new int[file.getMaxPages()];
if ( rr.size != 0 && !s->read(updateCountTable, rr.size*sizeof(int)) ) {
deleteConnection(i);
} else {
con[i].status = ST_RECOVERED;
file.recovery(i, updateCountTable, rr.size);
}
break;
}
case ReplicationRequest::RR_UPDATE_PAGE:
TRACE_MSG(("Update page at address %x size %d\n", rr.page.offs, rr.size));
if ( (accessType == dbConcurrentUpdate
&& !file.concurrentUpdatePages(i, rr.page.offs, rr.page.updateCount, rr.size))
|| (accessType != dbConcurrentUpdate
&& !file.updatePages(i, rr.page.offs, rr.page.updateCount,
rr.size)) )
{
dbTrace("Failed to update page %lx\n", (long)rr.page.offs);
activeNodeId = -1;
}
break;
case ReplicationRequest::RR_READY:
con[rr.nodeId].readyEvent.signal();
break;
default:
dbTrace("Unexpected requerst %d from node %d\n", rr.op, i);
}
}
}
unlockConnection(i);
}
}
}
delete ctx;
}
void dbReplicatedDatabase::close()
{
detach();
opened = false;
readerThread.join();
file.flush();
ReplicationRequest rr;
rr.op = ReplicationRequest::RR_CLOSE;
rr.nodeId = id;
for ( int i = nServers; --i >= 0; ) {
if ( con[i].reqSock != NULL
&& (con[i].status == ST_STANDBY || con[i].status == ST_RECOVERED) )
{
con[i].reqSock->write(&rr, sizeof rr);
}
}
dbDatabase::close();
delete[] con;
startEvent.close();
}
bool dbReplicatedDatabase::writeReq(int nodeId, ReplicationRequest const& hdr,
void* body, size_t bodySize)
{
bool result;
lockConnection(nodeId);
dbCriticalSection cs(con[nodeId].writeCS);
socket_t* s = con[nodeId].reqSock;
if ( s == NULL ) {
s = con[nodeId].respSock;
}
if ( s != NULL ) {
if ( !s->write(&hdr, sizeof hdr) ||
(bodySize != 0 && !s->write(body, bodySize)) )
{
char buf[64];
s->get_error_text(buf, sizeof buf);
dbTrace("Failed to write request to node %d: %s\n", nodeId, buf);
deleteConnection(nodeId);
result = false;
} else {
result = true;
}
} else {
TRACE_MSG(("Connection %d request socket is not opened\n", nodeId));
result = false;
}
unlockConnection(nodeId);
return result;
}
bool dbReplicatedDatabase::writeResp(int nodeId, ReplicationRequest const& hdr)
{
lockConnection(nodeId);
bool result;
if ( con[nodeId].respSock != NULL ) {
if ( !con[nodeId].respSock->write(&hdr, sizeof hdr) ) {
char buf[64];
con[nodeId].respSock->get_error_text(buf, sizeof buf);
dbTrace("Failed to write request to node %d: %s\n", nodeId, buf);
deleteConnection(nodeId);
result = false;
} else {
result = true;
}
} else {
result = false;
}
unlockConnection(nodeId);
return result;
}
void dbReplicatedDatabase::lockConnection(int nodeId)
{
dbCriticalSection cs(sockCS);
con[nodeId].useCount += 1;
}
void dbReplicatedDatabase::unlockConnection(int nodeId)
{
dbCriticalSection cs(sockCS);
if ( --con[nodeId].useCount == 0 && con[nodeId].waitUseEventFlag ) {
con[nodeId].useEvent.signal();
}
}
void dbReplicatedDatabase::addConnection(int nodeId, socket_t* s)
{
TRACE_MSG(("Add connection with node %d\n", nodeId));
lockConnection(nodeId);
if ( con[nodeId].reqSock != NULL ) {
deleteConnection(nodeId);
}
{
dbCriticalSection cs(sockCS);
con[nodeId].reqSock = s;
int hnd = s->get_handle();
if ( hnd >= nInputSD ) {
nInputSD = hnd+1;
}
FD_SET(hnd, &inputSD);
}
unlockConnection(nodeId);
}
#endif
syntax highlighted by Code2HTML, v. 0.9.1