//-< SERVER.CPP >----------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 13-Jan-2000 K.A. Knizhnik * / [] \ *
// Last update: 13-Jan-2000 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// CLI multithreaded server implementation
//-------------------------------------------------------------------*--------*
#include <ctype.h>
#include "fastdb.h"
#include "compiler.h"
#include "wwwapi.h"
#include "subsql.h"
#include "symtab.h"
#include "hashtab.h"
#include "ttree.h"
#include "cli.h"
#include "cliproto.h"
#include "server.h"
#include "localcli.h"
#if !THREADS_SUPPORTED
#error Server requires multithreading support
#endif
int dbColumnBinding::unpackArray(char* dst, size_t offs)
{
int len = this->len;
int i;
if (cliType >= cli_array_of_oid) {
switch (sizeof_type[cliType - cli_array_of_oid]) {
case 1:
memcpy(dst + offs, ptr + 4, len);
break;
case 2:
for (i = 0; i < len; i++) {
unpack2(dst + offs + i*2, ptr + 4 + i*2);
}
break;
case 4:
for (i = 0; i < len; i++) {
unpack4(dst + offs + i*4, ptr + 4 + i*4);
}
break;
case 8:
for (i = 0; i < len; i++) {
unpack8(dst + offs + i*8, ptr + 4 + i*8);
}
break;
default:
assert(false);
}
} else { // string
memcpy(dst + offs, ptr + 4, len);
}
return len;
}
void dbColumnBinding::unpackScalar(char* dst)
{
if (cliType == cli_autoincrement) {
assert(fd->type == dbField::tpInt4);
#ifdef AUTOINCREMENT_SUPPORT
*(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;
#else
*(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;
#endif
return;
}
switch (fd->type) {
case dbField::tpBool:
case dbField::tpInt1:
switch (sizeof_type[cliType]) {
case 1:
*(dst + fd->dbsOffs) = *ptr;
break;
case 2:
*(dst + fd->dbsOffs) = (char)unpack2(ptr);
break;
case 4:
*(dst + fd->dbsOffs) = (char)unpack4(ptr);
break;
case 8:
*(dst + fd->dbsOffs) = (char)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt2:
switch (sizeof_type[cliType]) {
case 1:
*(int2*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
unpack2(dst+fd->dbsOffs, ptr);
break;
case 4:
*(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr);
break;
case 8:
*(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt4:
switch (sizeof_type[cliType]) {
case 1:
*(int4*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
*(int4*)(dst+fd->dbsOffs) = unpack2(ptr);
break;
case 4:
unpack4(dst+fd->dbsOffs, ptr);
break;
case 8:
*(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt8:
switch (sizeof_type[cliType]) {
case 1:
*(db_int8*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
*(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr);
break;
case 4:
*(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr);
break;
case 8:
unpack8(dst+fd->dbsOffs, ptr);
break;
default:
assert(false);
}
break;
case dbField::tpReal4:
switch (cliType) {
case cli_real4:
unpack4(dst+fd->dbsOffs, ptr);
break;
case cli_real8:
{
real8 temp;
unpack8((char*)&temp, ptr);
*(real4*)(dst + fd->dbsOffs) = (real4)temp;
}
break;
default:
assert(false);
}
break;
case dbField::tpReal8:
switch (cliType) {
case cli_real4:
{
real4 temp;
unpack4((char*)&temp, ptr);
*(real8*)(dst + fd->dbsOffs) = temp;
}
break;
case cli_real8:
unpack8(dst+fd->dbsOffs, ptr);
break;
default:
assert(false);
}
break;
default:
assert(false);
}
}
void dbStatement::reset()
{
dbColumnBinding *cb, *next;
for (cb = columns; cb != NULL; cb = next) {
next = cb->next;
delete cb;
}
columns = NULL;
delete[] params;
params = NULL;
delete cursor;
cursor = NULL;
query.reset();
table = NULL;
}
int dbQueryScanner::get()
{
int i = 0, ch, digits;
do {
if ((ch = *p++) == '\0') {
return tkn_eof;
}
} while (isspace(ch));
if (ch == '*') {
return tkn_all;
} else if (isdigit(ch) || ch == '+' || ch == '-') {
do {
buf[i++] = ch;
if (i == dbQueryMaxIdLength) {
// Numeric constant too long
return tkn_error;
}
ch = *p++;
} while (ch != '\0'
&& (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' ||
ch == 'E' || ch == '.'));
p -= 1;
buf[i] = '\0';
if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1) {
// Bad integer constant
return tkn_error;
}
if (digits != i) {
if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i) {
// Bad float constant
return tkn_error;
}
return tkn_fconst;
}
return tkn_iconst;
} else if (isalpha(ch) || ch == '$' || ch == '_') {
do {
buf[i++] = ch;
if (i == dbQueryMaxIdLength) {
// Identifier too long
return tkn_error;
}
ch = *p++;
} while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_'));
p -= 1;
buf[i] = '\0';
ident = buf;
return dbSymbolTable::add(ident, tkn_ident);
} else {
// Invalid symbol
return tkn_error;
}
}
dbServer* dbServer::chain;
inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id)
{
for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next)
{
if (stmt->id == stmt_id) {
return stmt;
}
}
return NULL;
}
void thread_proc dbServer::serverThread(void* arg)
{
((dbServer*)arg)->serveClient();
}
void thread_proc dbServer::acceptLocalThread(void* arg)
{
dbServer* server = (dbServer*)arg;
server->acceptConnection(server->localAcceptSock);
}
void thread_proc dbServer::acceptGlobalThread(void* arg)
{
dbServer* server = (dbServer*)arg;
server->acceptConnection(server->globalAcceptSock);
}
dbServer::dbServer(dbDatabase* db,
char const* serverURL,
int optimalNumberOfThreads,
int connectionQueueLen)
{
char buf[256];
next = chain;
chain = this;
this->db = db;
this->optimalNumberOfThreads = optimalNumberOfThreads;
this->URL = new char[strlen(serverURL)+1];
strcpy(URL, serverURL);
globalAcceptSock =
socket_t::create_global(serverURL, connectionQueueLen);
if (!globalAcceptSock->is_ok()) {
globalAcceptSock->get_error_text(buf, sizeof buf);
dbTrace((char*)"Failed to create global socket: %s\n", buf);
delete globalAcceptSock;
globalAcceptSock = NULL;
}
localAcceptSock =
socket_t::create_local(serverURL, connectionQueueLen);
if (!localAcceptSock->is_ok()) {
localAcceptSock->get_error_text(buf, sizeof buf);
dbTrace((char*)"Failed to create local socket: %s\n", buf);
delete localAcceptSock;
localAcceptSock = NULL;
}
freeList = activeList = waitList = NULL;
waitListLength = 0;
}
dbServer* dbServer::find(char const* URL)
{
for (dbServer* server = chain; server != NULL; server = server->next) {
if (strcmp(URL, server->URL) == 0) {
return server;
}
}
return NULL;
}
void dbServer::cleanup()
{
dbServer *server, *next;
for (server = chain; server != NULL; server = next) {
next = server->next;
delete server;
}
}
void dbServer::start()
{
nActiveThreads = nIdleThreads = 0;
cancelWait = cancelSession = cancelAccept = false;
go.open();
done.open();
if (globalAcceptSock != NULL) {
globalAcceptThread.create(acceptGlobalThread, this);
}
if (localAcceptSock != NULL) {
localAcceptThread.create(acceptLocalThread, this);
}
}
void dbServer::stop()
{
cancelAccept = true;
if (globalAcceptSock != NULL) {
globalAcceptSock->cancel_accept();
globalAcceptThread.join();
}
delete globalAcceptSock;
globalAcceptSock = NULL;
if (localAcceptSock != NULL) {
localAcceptSock->cancel_accept();
localAcceptThread.join();
}
delete localAcceptSock;
localAcceptSock = NULL;
dbCriticalSection cs(mutex);
cancelSession = true;
while (activeList != NULL) {
activeList->sock->shutdown();
done.wait(mutex);
}
cancelWait = true;
while (nIdleThreads != 0) {
go.signal();
done.wait(mutex);
}
while (waitList != NULL) {
dbSession* next = waitList->next;
delete waitList->sock;
waitList->next = freeList;
freeList = waitList;
waitList = next;
}
waitListLength = 0;
assert(nActiveThreads == 0);
done.close();
go.close();
}
bool dbServer::freeze(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response = cli_ok;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
} else {
stmt->cursor->freeze();
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::unfreeze(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response = cli_ok;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
} else {
stmt->cursor->unfreeze();
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::get_first(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
} else if (!stmt->cursor->gotoFirst()) {
response = cli_not_found;
} else {
return fetch(session, stmt);
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::get_last(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
} else if (!stmt->cursor->gotoLast()) {
response = cli_not_found;
} else {
return fetch(session, stmt);
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::get_next(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
}
else if (!((stmt->firstFetch && stmt->cursor->gotoFirst()) ||
(!stmt->firstFetch && stmt->cursor->gotoNext())))
{
response = cli_not_found;
} else {
return fetch(session, stmt);
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::get_prev(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
}
else if (!((stmt->firstFetch && stmt->cursor->gotoLast()) ||
(!stmt->firstFetch && stmt->cursor->gotoPrev())))
{
response = cli_not_found;
} else {
return fetch(session, stmt);
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::skip(dbSession* session, int stmt_id, char* buf)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
} else {
int n = unpack4(buf);
if ((n > 0 && !((stmt->firstFetch && stmt->cursor->gotoFirst() && stmt->cursor->skip(n-1)
|| (!stmt->firstFetch && stmt->cursor->skip(n)))))
|| (n < 0 && !((stmt->firstFetch && stmt->cursor->gotoLast() && stmt->cursor->skip(n+1)
|| (!stmt->firstFetch && stmt->cursor->skip(n))))))
{
response = cli_not_found;
} else {
return fetch(session, stmt);
}
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::seek(dbSession* session, int stmt_id, char* buf)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
} else {
oid_t oid = unpack_oid(buf);
int pos = stmt->cursor->seek(oid);
if (pos < 0) {
response = cli_not_found;
} else {
return fetch(session, stmt, pos);
}
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::fetch(dbSession* session, dbStatement* stmt, oid_t result)
{
int4 response;
dbColumnBinding* cb;
stmt->firstFetch = false;
if (stmt->cursor->isEmpty()) {
response = cli_not_found;
pack4(response);
return session->sock->write(&response, sizeof response);
}
int msg_size = sizeof(cli_oid_t) + 4;
char* data = (char*)db->getRow(stmt->cursor->currId);
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
if (cb->cliType == cli_autoincrement) {
msg_size += 4;
} else if (cb->cliType >= cli_array_of_oid) {
msg_size += 4 + ((dbVarying*)(data + cb->fd->dbsOffs))->size
* sizeof_type[cb->cliType - cli_array_of_oid];
} else if (cb->cliType >= cli_asciiz) {
msg_size += 4 + ((dbVarying*)(data + cb->fd->dbsOffs))->size;
} else {
msg_size += sizeof_type[cb->cliType];
}
}
if (stmt->buf_size < msg_size) {
delete[] stmt->buf;
stmt->buf = new char[msg_size];
stmt->buf_size = msg_size;
}
char* p = stmt->buf;
p = pack4(p, msg_size);
p = pack_oid(p, result);
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
char* src = data + cb->fd->dbsOffs;
switch (cb->fd->type) {
case dbField::tpBool:
case dbField::tpInt1:
switch (sizeof_type[cb->cliType]) {
case 1:
*p++ = *src;
break;
case 2:
p = pack2(p, (int2)*(char*)src);
break;
case 4:
p = pack4(p, (int4)*(char*)src);
break;
case 8:
p = pack8(p, (db_int8)*(char*)src);
break;
default:
assert(false);
}
break;
case dbField::tpInt2:
switch (sizeof_type[cb->cliType]) {
case 1:
*p++ = (char)*(int2*)src;
break;
case 2:
p = pack2(p, src);
break;
case 4:
p = pack4(p, (int4)*(int2*)src);
break;
case 8:
p = pack8(p, (db_int8)*(int2*)src);
break;
default:
assert(false);
}
break;
case dbField::tpInt4:
switch (sizeof_type[cb->cliType]) {
case 1:
*p++ = (char)*(int4*)src;
break;
case 2:
p = pack2(p, (int2)*(int4*)src);
break;
case 4:
p = pack4(p, src);
break;
case 8:
p = pack8(p, (db_int8)*(int4*)src);
break;
default:
assert(false);
}
break;
case dbField::tpInt8:
switch (sizeof_type[cb->cliType]) {
case 1:
*p++ = (char)*(db_int8*)src;
break;
case 2:
p = pack2(p, (int2)*(db_int8*)src);
break;
case 4:
p = pack4(p, (int4)*(db_int8*)src);
break;
case 8:
p = pack8(p, src);
break;
default:
assert(false);
}
break;
case dbField::tpReal4:
switch (cb->cliType) {
case cli_real4:
p = pack4(p, src);
break;
case cli_real8:
{
real8 temp = *(real4*)src;
p = pack8(p, (char*)&temp);
}
break;
default:
assert(false);
}
break;
case dbField::tpReal8:
switch (cb->cliType) {
case cli_real4:
{
real4 temp = (real4)*(real8*)src;
p = pack4(p, (char*)&temp);
}
break;
case cli_real8:
p = pack8(p, src);
break;
default:
assert(false);
}
break;
case dbField::tpString:
{
dbVarying* v = (dbVarying*)src;
p = pack4(p, v->size);
memcpy(p, data + v->offs, v->size);
p += v->size;
}
break;
case dbField::tpReference:
p = pack_oid(p, *(oid_t*)src);
break;
case dbField::tpArray:
{
dbVarying* v = (dbVarying*)src;
int n = v->size;
p = pack4(p, n);
src = data + v->offs;
switch (sizeof_type[cb->cliType-cli_array_of_oid]) {
case 2:
while (--n >= 0) {
p = pack2(p, src);
src += 2;
}
break;
case 4:
while (--n >= 0) {
p = pack4(p, src);
src += 4;
}
break;
case 8:
while (--n >= 0) {
p = pack8(p, src);
src += 8;
}
break;
default:
memcpy(p, src, n);
p += n;
}
break;
}
default:
assert(false);
}
}
assert(p - stmt->buf == msg_size);
return session->sock->write(stmt->buf, msg_size);
}
bool dbServer::remove(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response;
if (stmt == NULL) {
response = cli_bad_descriptor;
} else {
if (stmt->cursor->isEmpty()) {
response = cli_not_found;
} else {
stmt->cursor->removeAllSelected();
response = cli_ok;
}
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::update(dbSession* session, int stmt_id, char* new_data)
{
dbStatement* stmt = findStatement(session, stmt_id);
dbColumnBinding* cb;
int4 response;
if (stmt == NULL) {
response = cli_bad_descriptor;
pack4(response);
return session->sock->write(&response, sizeof response);
}
if (stmt->cursor->isEmpty()) {
response = cli_not_found;
pack4(response);
return session->sock->write(&response, sizeof response);
}
char* old_data = stmt->buf + sizeof(cli_oid_t) + 4;
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
cb->ptr = new_data;
if (cb->cliType >= cli_asciiz) {
int new_len = unpack4(new_data);
int old_len = unpack4(old_data);
cb->len = new_len;
if (cb->fd->indexType & (HASHED|INDEXED)
&& memcmp(new_data, old_data, new_len+4) != 0)
{
cb->fd->attr |= dbFieldDescriptor::Updated;
}
if (cb->cliType >= cli_array_of_oid) {
new_len *= sizeof_type[cb->cliType - cli_array_of_oid];
old_len *= sizeof_type[cb->cliType - cli_array_of_oid];
}
new_data += 4 + new_len;
old_data += 4 + old_len;
} else {
int size = sizeof_type[cb->cliType];
if (cb->fd->indexType & (HASHED|INDEXED)
&& memcmp(new_data, old_data, size) != 0)
{
cb->fd->attr |= dbFieldDescriptor::Updated;
}
new_data += size;
old_data += size;
}
}
db->beginTransaction(dbDatabase::dbExclusiveLock);
dbRecord* rec = db->getRow(stmt->cursor->currId);
dbTableDescriptor* table = stmt->query.table;
dbFieldDescriptor *first = table->columns, *fd = first;
size_t offs = table->fixedSize;
do {
if (fd->type == dbField::tpArray || fd->type == dbField::tpString)
{
int len = ((dbVarying*)((char*)rec + fd->dbsOffs))->size;
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
if (cb->fd == fd) {
len = cb->len;
break;
}
}
offs = DOALIGN(offs, fd->components->alignment)
+ len*fd->components->dbsSize;
}
} while ((fd = fd->next) != first);
old_data = new char[rec->size];
memcpy(old_data, rec, rec->size);
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
if (cb->fd->attr & dbFieldDescriptor::Updated) {
if (cb->fd->indexType & HASHED) {
dbHashTable::remove(db, cb->fd->hashTable,
stmt->cursor->currId,
cb->fd->type, cb->fd->dbsSize, cb->fd->dbsOffs);
}
if (cb->fd->indexType & INDEXED) {
dbTtree::remove(db, cb->fd->tTree, stmt->cursor->currId,
cb->fd->type, cb->fd->dbsSize, cb->fd->comparator, cb->fd->dbsOffs);
}
}
}
db->modified = true;
new_data = (char*)db->putRow(stmt->cursor->currId, offs);
fd = first;
offs = table->fixedSize;
do {
if (fd->type == dbField::tpArray|| fd->type == dbField::tpString)
{
int len = ((dbVarying*)(old_data + fd->dbsOffs))->size;
offs = DOALIGN(offs, fd->components->alignment);
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
if (cb->fd == fd) {
len = cb->unpackArray(new_data, offs);
break;
}
}
if (cb == NULL) {
memcpy(new_data + offs,
old_data + ((dbVarying*)(old_data + fd->dbsOffs))->offs,
len*fd->components->dbsSize);
}
((dbVarying*)(new_data + fd->dbsOffs))->size = len;
((dbVarying*)(new_data + fd->dbsOffs))->offs = offs;
offs += len*fd->components->dbsSize;
} else {
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
if (cb->fd == fd) {
cb->unpackScalar(new_data);
break;
}
}
if (cb == NULL) {
memcpy(new_data + fd->dbsOffs, old_data + fd->dbsOffs,
fd->dbsSize);
}
}
} while ((fd = fd->next) != first);
delete[] old_data;
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
if (cb->fd->attr & dbFieldDescriptor::Updated) {
cb->fd->attr &= ~dbFieldDescriptor::Updated;
if (cb->fd->indexType & HASHED) {
dbHashTable::insert(db, cb->fd->hashTable,
stmt->cursor->currId,
cb->fd->type, cb->fd->dbsSize, cb->fd->dbsOffs, 0);
}
if (cb->fd->indexType & INDEXED) {
dbTtree::insert(db, cb->fd->tTree, stmt->cursor->currId,
cb->fd->type, cb->fd->dbsSize, cb->fd->comparator, cb->fd->dbsOffs);
}
}
}
response = cli_ok;
pack4(response);
return session->sock->write(&response, sizeof response);
}
char* dbServer::checkColumns(dbStatement* stmt, int n_columns,
dbTableDescriptor* desc, char* data,
int4& response)
{
dbColumnBinding** cpp = &stmt->columns;
response = cli_ok;
while (--n_columns >= 0) {
int cliType = *data++;
char* columnName = data;
dbSymbolTable::add(columnName, tkn_ident, true);
dbFieldDescriptor* fd = desc->findSymbol(columnName);
data += strlen(data) + 1;
if (fd != NULL) {
if ((cliType == cli_oid
&& fd->type == dbField::tpReference)
|| (cliType >= cli_bool && cliType <= cli_int8
&& fd->type >= dbField::tpBool
&& fd->type <= dbField::tpInt8)
|| (cliType >= cli_real4 && cliType <= cli_real8
&& fd->type >= dbField::tpReal4
&& fd->type <= dbField::tpReal8)
|| ((cliType == cli_asciiz || cliType == cli_pasciiz)
&& fd->type == dbField::tpString)
|| (cliType == cli_array_of_oid &&
fd->type == dbField::tpArray &&
fd->components->type == dbField::tpReference)
|| (cliType >= cli_array_of_bool
&& fd->type == dbField::tpArray
&& cliType-cli_array_of_bool
== fd->components->type-dbField::tpBool))
{
dbColumnBinding* cb = new dbColumnBinding(fd, cliType);
*cpp = cb;
cpp = &cb->next;
} else {
response = cli_incompatible_type;
break;
}
} else {
TRACE_MSG(("Field '%s' not found\n", columnName));
response = cli_column_not_found;
break;
}
}
return data;
}
bool dbServer::insert(dbSession* session, int stmt_id, char* data, bool prepare)
{
dbStatement* stmt = findStatement(session, stmt_id);
dbTableDescriptor* desc = NULL;
dbColumnBinding* cb;
int4 response;
char reply_buf[sizeof(cli_oid_t) + 8];
char* dst;
oid_t oid = 0;
size_t offs;
int n_columns;
if (stmt == NULL) {
if (!prepare) {
response = cli_bad_statement;
goto return_response;
}
stmt = new dbStatement(stmt_id);
stmt->next = session->stmts;
session->stmts = stmt;
} else {
if (prepare) {
stmt->reset();
} else if ((desc = stmt->table) == NULL) {
response = cli_bad_descriptor;
goto return_response;
}
}
if (prepare) {
session->scanner.reset(data);
if (session->scanner.get() != tkn_insert
|| session->scanner.get() != tkn_into
|| session->scanner.get() != tkn_ident)
{
response = cli_bad_statement;
goto return_response;
}
desc = db->findTable(session->scanner.ident);
if (desc == NULL) {
response = cli_table_not_found;
goto return_response;
}
data += strlen(data)+1;
n_columns = *data++;
data = checkColumns(stmt, n_columns, desc, data, response);
if (response != cli_ok) {
goto return_response;
}
stmt->table = desc;
}
offs = desc->fixedSize;
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
cb->ptr = data;
if (cb->cliType == cli_autoincrement) {
;
} else if (cb->cliType >= cli_asciiz) {
cb->len = unpack4(data);
data += 4 + cb->len*cb->fd->components->dbsSize;
offs = DOALIGN(offs, cb->fd->components->alignment)
+ cb->len*cb->fd->components->dbsSize;
} else {
data += sizeof_type[cb->cliType];
}
}
db->beginTransaction(dbDatabase::dbExclusiveLock);
db->modified = true;
oid = db->allocateRow(desc->tableId, offs);
dst = (char*)db->getRow(oid);
offs = desc->fixedSize;
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
dbFieldDescriptor* fd = cb->fd;
if (fd->type == dbField::tpArray || fd->type == dbField::tpString) {
offs = DOALIGN(offs, fd->components->alignment);
((dbVarying*)(dst + fd->dbsOffs))->offs = offs;
((dbVarying*)(dst + fd->dbsOffs))->size = cb->len;
offs += cb->unpackArray(dst, offs)*fd->components->dbsSize;
} else {
cb->unpackScalar(dst);
}
}
for (cb = stmt->columns; cb != NULL; cb = cb->next) {
if (cb->fd->indexType & HASHED) {
dbHashTable::insert(db, cb->fd->hashTable, oid,
cb->fd->type, cb->fd->dbsSize, cb->fd->dbsOffs, 0);
}
if (cb->fd->indexType & INDEXED) {
dbTtree::insert(db, cb->fd->tTree, oid,
cb->fd->type, cb->fd->dbsSize, cb->fd->comparator, cb->fd->dbsOffs);
}
}
response = cli_ok;
return_response:
pack4(reply_buf, response);
if (desc == NULL) {
pack4(reply_buf+4, 0);
} else {
#ifdef AUTOINCREMENT_SUPPORT
pack4(reply_buf+4, desc->autoincrementCount);
#else
pack4(reply_buf+4, ((dbTable*)db->getRow(desc->tableId))->nRows);
#endif
}
pack_oid(reply_buf+8, oid);
return session->sock->write(reply_buf, sizeof reply_buf);
}
bool dbServer::describe_table(dbSession* session, char const* table)
{
dbTableDescriptor* desc = db->findTableByName(table);
if (desc == NULL) {
char response[8];
pack4(response, 0);
pack4(response+4, -1);
return session->sock->write(response, sizeof response);
} else {
int i, length = 0;
dbFieldDescriptor* fd = desc->columns;
for (i = desc->nColumns; --i >= 0;) {
length += strlen(fd->name)+2+3;
if (fd->refTableName != NULL) {
length += strlen(fd->refTableName);
} else if (fd->type == dbField::tpArray && fd->components->refTableName != NULL) {
length += strlen(fd->components->refTableName);
}
if (fd->inverseRefName != NULL) {
length += strlen(fd->inverseRefName);
}
fd = fd->next;
}
dbSmallBuffer response(length+8);
char* p = (char*)response;
pack4(p, length);
pack4(p+4, desc->nColumns);
p += 8;
for (i = desc->nColumns, fd = desc->columns; --i >= 0;) {
int flags = 0;
*p++ = map_type(fd);
if (fd->tTree != 0) {
flags |= cli_indexed;
}
if (fd->hashTable != 0) {
flags |= cli_hashed;
}
*p++ = (char)flags;
strcpy(p, fd->name);
p += strlen(fd->name)+1;
if (fd->refTableName != NULL) {
strcpy(p, fd->refTableName);
p += strlen(p) + 1;
} else if (fd->type == dbField::tpArray && fd->components->refTableName != NULL) {
strcpy(p, fd->components->refTableName);
p += strlen(p) + 1;
} else {
*p++ = '\0';
}
if (fd->inverseRefName != NULL) {
strcpy(p, fd->inverseRefName);
p += strlen(p) + 1;
} else {
*p++ = '\0';
}
fd = fd->next;
}
return session->sock->write(response, length+8);
}
}
bool dbServer::show_tables(dbSession* session)
{
dbTableDescriptor* desc=db->tables;
if (desc == NULL) {
char response[8];
pack4(response, 0);
pack4(response+4, -1);
return session->sock->write(response, sizeof response);
} else {
int length = 0, n = 0;
for (desc=db->tables; desc != NULL; desc=desc->nextDbTable) {
if (strcmp(desc->name, "Metatable")) {
length += strlen(desc->name)+1;
n++;
}
}
dbSmallBuffer response(length+8);
char* p = (char*)response;
pack4(p, length);
pack4(p+4, n);
p += 8;
for (desc=db->tables; desc != NULL; desc=desc->nextDbTable) {
if (strcmp(desc->name, "Metatable")) {
strcpy(p, desc->name);
p += strlen(desc->name)+1;
}
}
return session->sock->write(response, length+8);
}
}
bool dbServer::create_table(dbSession* session, char* data)
{
db->beginTransaction(dbDatabase::dbExclusiveLock);
db->modified = true;
char* tableName = data;
data += strlen(data) + 1;
int nColumns = *data++ & 0xFF;
cli_field_descriptor* columns = new cli_field_descriptor[nColumns];
for (int i = 0; i < nColumns; i++) {
columns[i].type = (cli_var_type)*data++;
columns[i].flags = *data++ & 0xFF;
columns[i].name = data;
data += strlen(data) + 1;
if (*data != 0) {
columns[i].refTableName = data;
data += strlen(data) + 1;
} else {
columns[i].refTableName = NULL;
data += 1;
}
if (*data != 0) {
columns[i].inverseRefFieldName = data;
data += strlen(data) + 1;
} else {
columns[i].inverseRefFieldName = NULL;
data += 1;
}
}
if (session->existed_tables == NULL) {
session->existed_tables = db->tables;
}
int4 response = dbCLI::create_table(db, tableName, nColumns, columns);
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::drop_table(dbSession* session, char* tableName)
{
db->beginTransaction(dbDatabase::dbExclusiveLock);
dbTableDescriptor* desc = db->findTableByName(tableName);
int4 response = cli_ok;
if (desc != NULL) {
db->dropTable(desc);
if (desc == session->existed_tables) {
session->existed_tables = desc->nextDbTable;
}
db->unlinkTable(desc);
desc->nextDbTable = session->dropped_tables;
session->dropped_tables = desc;
} else {
response = cli_table_not_found;
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::alter_index(dbSession* session, char* data)
{
char* tableName = data;
data += strlen(data) + 1;
char* fieldName = data;
data += strlen(data) + 1;
int newFlags = *data++ & 0xFF;
int4 response = dbCLI::alter_index(db, tableName, fieldName, newFlags);
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::select(dbSession* session, int stmt_id, char* msg, bool prepare)
{
int4 response;
int i, n_params, tkn, n_columns;
dbStatement* stmt = findStatement(session, stmt_id);
dbCursorType cursorType;
dbTableDescriptor* desc;
if (prepare) {
if (stmt == NULL) {
stmt = new dbStatement(stmt_id);
stmt->next = session->stmts;
session->stmts = stmt;
} else {
stmt->reset();
}
stmt->n_params = *msg++;
stmt->n_columns = n_columns = *msg++;
stmt->params = new dbParameterBinding[stmt->n_params];
stmt->firstFetch = true;
int len = unpack2(msg);
msg += 2;
session->scanner.reset(msg);
char *p, *end = msg + len;
if (session->scanner.get() != tkn_select) {
response = cli_bad_statement;
goto return_response;
}
if ((tkn = session->scanner.get()) == tkn_all) {
tkn = session->scanner.get();
}
if (tkn == tkn_from && session->scanner.get() == tkn_ident) {
if ((desc = db->findTable(session->scanner.ident)) != NULL) {
msg = checkColumns(stmt, n_columns, desc, end, response);
if (response != cli_ok) {
goto return_response;
}
stmt->cursor = new dbAnyCursor(*desc, dbCursorViewOnly, NULL);
stmt->cursor->setPrefetchMode(false);
} else {
response = cli_table_not_found;
goto return_response;
}
} else {
response = cli_bad_statement;
goto return_response;
}
p = session->scanner.p;
for (i = 0; p < end; i++) {
stmt->query.append(dbQueryElement::qExpression, p);
p += strlen(p) + 1;
if (p < end) {
int cliType = *p++;
static const dbQueryElement::ElementType type_map[] = {
dbQueryElement::qVarReference, // cli_oid
dbQueryElement::qVarBool, // cli_bool
dbQueryElement::qVarInt1, // cli_int1
dbQueryElement::qVarInt2, // cli_int2
dbQueryElement::qVarInt4, // cli_int4
dbQueryElement::qVarInt8, // cli_int8
dbQueryElement::qVarReal4, // cli_real4
dbQueryElement::qVarReal8, // cli_real8
dbQueryElement::qVarStringPtr, // cli_asciiz
dbQueryElement::qVarStringPtr, // cli_pasciiz
};
stmt->params[i].type = cliType;
stmt->query.append(type_map[cliType], &stmt->params[i].u);
}
}
} else {
if (stmt == NULL) {
response = cli_bad_descriptor;
goto return_response;
}
}
cursorType = *msg++ ? dbCursorForUpdate : dbCursorViewOnly;
for (i = 0, n_params = stmt->n_params; i < n_params; i++) {
switch (stmt->params[i].type) {
case cli_oid:
stmt->params[i].u.oid = unpack_oid(msg);
msg += sizeof(cli_oid_t);
break;
case cli_int1:
stmt->params[i].u.i1 = *msg++;
break;
case cli_int2:
msg = unpack2((char*)&stmt->params[i].u.i2, msg);
break;
case cli_int4:
msg = unpack4((char*)&stmt->params[i].u.i4, msg);
break;
case cli_int8:
msg = unpack8((char*)&stmt->params[i].u.i8, msg);
break;
case cli_real4:
msg = unpack4((char*)&stmt->params[i].u.r4, msg);
break;
case cli_real8:
msg = unpack8((char*)&stmt->params[i].u.r8, msg);
break;
case cli_bool:
stmt->params[i].u.b = *msg++;
break;
case cli_asciiz:
case cli_pasciiz:
stmt->params[i].u.str = msg;
msg += strlen(msg) + 1;
break;
default:
response = cli_bad_statement;
goto return_response;
}
}
#ifdef THROW_EXCEPTION_ON_ERROR
try {
response = stmt->cursor->select(stmt->query, cursorType);
} catch (dbException const& x) {
response = (x.getErrCode() == dbDatabase::QueryError)
? cli_bad_statement : cli_runtime_error;
}
#else
{
dbDatabaseThreadContext* ctx = db->threadContext.get();
ctx->catched = true;
int errorCode = setjmp(ctx->unwind);
if (errorCode == 0) {
response = stmt->cursor->select(stmt->query, cursorType);
} else {
response = (errorCode == dbDatabase::QueryError)
? cli_bad_statement : cli_runtime_error;
}
ctx->catched = false;
}
#endif
return_response:
pack4(response);
return session->sock->write(&response, sizeof response);
}
void dbServer::serveClient()
{
dbStatement *sp, **spp;
db->attach();
while (true) {
dbSession* session;
{
dbCriticalSection cs(mutex);
do {
go.wait(mutex);
if (cancelWait) {
nIdleThreads -= 1;
done.signal();
db->detach();
return;
}
} while (waitList == NULL);
session = waitList;
waitList = waitList->next;
session->next = activeList;
activeList = session;
nIdleThreads -= 1;
nActiveThreads += 1;
waitListLength -= 1;
}
cli_request req;
int4 response = cli_ok;
bool online = true;
while (online && session->sock->read(&req, sizeof req)) {
req.unpack();
int length = req.length - sizeof(req);
dbSmallBuffer msg(length);
if (length > 0) {
if (!session->sock->read(msg, length)) {
break;
}
}
switch(req.cmd) {
case cli_cmd_close_session:
while (session->dropped_tables != NULL) {
dbTableDescriptor* next = session->dropped_tables->nextDbTable;
delete session->dropped_tables;
session->dropped_tables = next;
}
db->commit();
session->in_transaction = false;
online = false;
break;
case cli_cmd_prepare_and_execute:
online = select(session, req.stmt_id, msg, true);
session->in_transaction = true;
break;
case cli_cmd_execute:
online = select(session, req.stmt_id, msg, false);
break;
case cli_cmd_get_first:
online = get_first(session, req.stmt_id);
break;
case cli_cmd_get_last:
online = get_last(session, req.stmt_id);
break;
case cli_cmd_get_next:
online = get_next(session, req.stmt_id);
break;
case cli_cmd_get_prev:
online = get_prev(session, req.stmt_id);
break;
case cli_cmd_skip:
online = skip(session, req.stmt_id, msg);
break;
case cli_cmd_seek:
online = seek(session, req.stmt_id, msg);
break;
case cli_cmd_freeze:
online = freeze(session, req.stmt_id);
break;
case cli_cmd_unfreeze:
online = unfreeze(session, req.stmt_id);
break;
case cli_cmd_free_statement:
for (spp = &session->stmts; (sp = *spp) != NULL; spp = &sp->next)
{
if (sp->id == req.stmt_id) {
*spp = sp->next;
delete sp;
break;
}
}
break;
case cli_cmd_abort:
while (session->dropped_tables != NULL) {
dbTableDescriptor* next = session->dropped_tables->nextDbTable;
db->linkTable(session->dropped_tables, session->dropped_tables->tableId);
session->dropped_tables = next;
}
if (session->existed_tables != NULL) {
while (db->tables != session->existed_tables) {
dbTableDescriptor* table = db->tables;
db->unlinkTable(table);
delete table;
}
session->existed_tables = NULL;
}
db->rollback();
session->in_transaction = false;
online = session->sock->write(&response, sizeof response);
break;
case cli_cmd_commit:
while (session->dropped_tables != NULL) {
dbTableDescriptor* next = session->dropped_tables->nextDbTable;
delete session->dropped_tables;
session->dropped_tables = next;
}
session->existed_tables = NULL;
db->commit();
session->in_transaction = false;
online = session->sock->write(&response, sizeof response);
break;
case cli_cmd_precommit:
db->precommit();
online = session->sock->write(&response, sizeof response);
break;
case cli_cmd_update:
update(session, req.stmt_id, msg);
break;
case cli_cmd_remove:
remove(session, req.stmt_id);
break;
case cli_cmd_prepare_and_insert:
insert(session, req.stmt_id, msg, true);
session->in_transaction = true;
break;
case cli_cmd_insert:
insert(session, req.stmt_id, msg, false);
break;
case cli_cmd_describe_table:
describe_table(session, (char*)msg);
break;
case cli_cmd_show_tables:
show_tables(session);
break;
case cli_cmd_create_table:
online = create_table(session, msg);
break;
case cli_cmd_drop_table:
online = drop_table(session, msg);
break;
case cli_cmd_alter_index:
online = alter_index(session, msg);
break;
}
}
if (session->in_transaction) {
while (session->dropped_tables != NULL) {
dbTableDescriptor* next = session->dropped_tables->nextDbTable;
db->linkTable(session->dropped_tables, session->dropped_tables->tableId);
session->dropped_tables = next;
}
if (session->existed_tables != NULL) {
while (db->tables != session->existed_tables) {
dbTableDescriptor* table = db->tables;
db->unlinkTable(table);
delete table;
}
session->existed_tables = NULL;
}
db->rollback();
}
// Finish session
{
dbCriticalSection cs(mutex);
dbSession** spp;
delete session->sock;
for (spp = &activeList; *spp != session; spp = &(*spp)->next);
*spp = session->next;
session->next = freeList;
freeList = session;
nActiveThreads -= 1;
if (cancelSession) {
done.signal();
break;
}
if (nActiveThreads + nIdleThreads >= optimalNumberOfThreads) {
break;
}
nIdleThreads += 1;
}
}
db->detach();
}
void dbServer::acceptConnection(socket_t* acceptSock)
{
while (true) {
socket_t* sock = acceptSock->accept();
dbCriticalSection cs(mutex);
if (cancelAccept) {
return;
}
if (sock != NULL) {
if (freeList == NULL) {
freeList = new dbSession;
freeList->next = NULL;
}
dbSession* session = freeList;
freeList = session->next;
session->sock = sock;
session->stmts = NULL;
session->next = waitList;
session->in_transaction = false;
session->existed_tables = NULL;
session->dropped_tables = NULL;
waitList = session;
waitListLength += 1;
if (nIdleThreads < waitListLength) {
dbThread thread;
nIdleThreads += 1;
thread.create(serverThread, this);
thread.detach();
}
go.signal();
}
}
}
dbServer::~dbServer()
{
dbServer** spp;
for (spp = &chain; *spp != this; spp = &(*spp)->next);
*spp = next;
delete globalAcceptSock;
delete localAcceptSock;
delete[] URL;
}
syntax highlighted by Code2HTML, v. 0.9.1