//-< SERVER.CPP >----------------------------------------------------*--------* // FastDB Version 1.0 (c) 1999 GARRET * ? * // (Main Memory Database Management System) * /\| * // * / \ * // Created: 13-Jan-2000 K.A. Knizhnik * / [] \ * // Last update: 13-Jan-2000 K.A. Knizhnik * GARRET * //-------------------------------------------------------------------*--------* // CLI multithreaded server implementation //-------------------------------------------------------------------*--------* #include #include "fastdb.h" #include "compiler.h" #include "wwwapi.h" #include "subsql.h" #include "symtab.h" #include "hashtab.h" #include "ttree.h" #include "cli.h" #include "cliproto.h" #include "server.h" #include "localcli.h" #if !THREADS_SUPPORTED #error Server requires multithreading support #endif int dbColumnBinding::unpackArray(char* dst, size_t offs) { int len = this->len; int i; if (cliType >= cli_array_of_oid) { switch (sizeof_type[cliType - cli_array_of_oid]) { case 1: memcpy(dst + offs, ptr + 4, len); break; case 2: for (i = 0; i < len; i++) { unpack2(dst + offs + i*2, ptr + 4 + i*2); } break; case 4: for (i = 0; i < len; i++) { unpack4(dst + offs + i*4, ptr + 4 + i*4); } break; case 8: for (i = 0; i < len; i++) { unpack8(dst + offs + i*8, ptr + 4 + i*8); } break; default: assert(false); } } else { // string memcpy(dst + offs, ptr + 4, len); } return len; } void dbColumnBinding::unpackScalar(char* dst) { if (cliType == cli_autoincrement) { assert(fd->type == dbField::tpInt4); #ifdef AUTOINCREMENT_SUPPORT *(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount; #else *(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows; #endif return; } switch (fd->type) { case dbField::tpBool: case dbField::tpInt1: switch (sizeof_type[cliType]) { case 1: *(dst + fd->dbsOffs) = *ptr; break; case 2: *(dst + fd->dbsOffs) = (char)unpack2(ptr); break; case 4: *(dst + fd->dbsOffs) = (char)unpack4(ptr); break; case 8: *(dst + fd->dbsOffs) = (char)unpack8(ptr); break; default: assert(false); } break; case dbField::tpInt2: switch (sizeof_type[cliType]) { case 1: *(int2*)(dst+fd->dbsOffs) = *ptr; break; case 2: unpack2(dst+fd->dbsOffs, ptr); break; case 4: *(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr); break; case 8: *(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr); break; default: assert(false); } break; case dbField::tpInt4: switch (sizeof_type[cliType]) { case 1: *(int4*)(dst+fd->dbsOffs) = *ptr; break; case 2: *(int4*)(dst+fd->dbsOffs) = unpack2(ptr); break; case 4: unpack4(dst+fd->dbsOffs, ptr); break; case 8: *(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr); break; default: assert(false); } break; case dbField::tpInt8: switch (sizeof_type[cliType]) { case 1: *(db_int8*)(dst+fd->dbsOffs) = *ptr; break; case 2: *(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr); break; case 4: *(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr); break; case 8: unpack8(dst+fd->dbsOffs, ptr); break; default: assert(false); } break; case dbField::tpReal4: switch (cliType) { case cli_real4: unpack4(dst+fd->dbsOffs, ptr); break; case cli_real8: { real8 temp; unpack8((char*)&temp, ptr); *(real4*)(dst + fd->dbsOffs) = (real4)temp; } break; default: assert(false); } break; case dbField::tpReal8: switch (cliType) { case cli_real4: { real4 temp; unpack4((char*)&temp, ptr); *(real8*)(dst + fd->dbsOffs) = temp; } break; case cli_real8: unpack8(dst+fd->dbsOffs, ptr); break; default: assert(false); } break; default: assert(false); } } void dbStatement::reset() { dbColumnBinding *cb, *next; for (cb = columns; cb != NULL; cb = next) { next = cb->next; delete cb; } columns = NULL; delete[] params; params = NULL; delete cursor; cursor = NULL; query.reset(); table = NULL; } int dbQueryScanner::get() { int i = 0, ch, digits; do { if ((ch = *p++) == '\0') { return tkn_eof; } } while (isspace(ch)); if (ch == '*') { return tkn_all; } else if (isdigit(ch) || ch == '+' || ch == '-') { do { buf[i++] = ch; if (i == dbQueryMaxIdLength) { // Numeric constant too long return tkn_error; } ch = *p++; } while (ch != '\0' && (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' || ch == 'E' || ch == '.')); p -= 1; buf[i] = '\0'; if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1) { // Bad integer constant return tkn_error; } if (digits != i) { if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i) { // Bad float constant return tkn_error; } return tkn_fconst; } return tkn_iconst; } else if (isalpha(ch) || ch == '$' || ch == '_') { do { buf[i++] = ch; if (i == dbQueryMaxIdLength) { // Identifier too long return tkn_error; } ch = *p++; } while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_')); p -= 1; buf[i] = '\0'; ident = buf; return dbSymbolTable::add(ident, tkn_ident); } else { // Invalid symbol return tkn_error; } } dbServer* dbServer::chain; inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id) { for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next) { if (stmt->id == stmt_id) { return stmt; } } return NULL; } void thread_proc dbServer::serverThread(void* arg) { ((dbServer*)arg)->serveClient(); } void thread_proc dbServer::acceptLocalThread(void* arg) { dbServer* server = (dbServer*)arg; server->acceptConnection(server->localAcceptSock); } void thread_proc dbServer::acceptGlobalThread(void* arg) { dbServer* server = (dbServer*)arg; server->acceptConnection(server->globalAcceptSock); } dbServer::dbServer(dbDatabase* db, char const* serverURL, int optimalNumberOfThreads, int connectionQueueLen) { char buf[256]; next = chain; chain = this; this->db = db; this->optimalNumberOfThreads = optimalNumberOfThreads; this->URL = new char[strlen(serverURL)+1]; strcpy(URL, serverURL); globalAcceptSock = socket_t::create_global(serverURL, connectionQueueLen); if (!globalAcceptSock->is_ok()) { globalAcceptSock->get_error_text(buf, sizeof buf); dbTrace((char*)"Failed to create global socket: %s\n", buf); delete globalAcceptSock; globalAcceptSock = NULL; } localAcceptSock = socket_t::create_local(serverURL, connectionQueueLen); if (!localAcceptSock->is_ok()) { localAcceptSock->get_error_text(buf, sizeof buf); dbTrace((char*)"Failed to create local socket: %s\n", buf); delete localAcceptSock; localAcceptSock = NULL; } freeList = activeList = waitList = NULL; waitListLength = 0; } dbServer* dbServer::find(char const* URL) { for (dbServer* server = chain; server != NULL; server = server->next) { if (strcmp(URL, server->URL) == 0) { return server; } } return NULL; } void dbServer::cleanup() { dbServer *server, *next; for (server = chain; server != NULL; server = next) { next = server->next; delete server; } } void dbServer::start() { nActiveThreads = nIdleThreads = 0; cancelWait = cancelSession = cancelAccept = false; go.open(); done.open(); if (globalAcceptSock != NULL) { globalAcceptThread.create(acceptGlobalThread, this); } if (localAcceptSock != NULL) { localAcceptThread.create(acceptLocalThread, this); } } void dbServer::stop() { cancelAccept = true; if (globalAcceptSock != NULL) { globalAcceptSock->cancel_accept(); globalAcceptThread.join(); } delete globalAcceptSock; globalAcceptSock = NULL; if (localAcceptSock != NULL) { localAcceptSock->cancel_accept(); localAcceptThread.join(); } delete localAcceptSock; localAcceptSock = NULL; dbCriticalSection cs(mutex); cancelSession = true; while (activeList != NULL) { activeList->sock->shutdown(); done.wait(mutex); } cancelWait = true; while (nIdleThreads != 0) { go.signal(); done.wait(mutex); } while (waitList != NULL) { dbSession* next = waitList->next; delete waitList->sock; waitList->next = freeList; freeList = waitList; waitList = next; } waitListLength = 0; assert(nActiveThreads == 0); done.close(); go.close(); } bool dbServer::freeze(dbSession* session, int stmt_id) { dbStatement* stmt = findStatement(session, stmt_id); int4 response = cli_ok; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else { stmt->cursor->freeze(); } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::unfreeze(dbSession* session, int stmt_id) { dbStatement* stmt = findStatement(session, stmt_id); int4 response = cli_ok; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else { stmt->cursor->unfreeze(); } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::get_first(dbSession* session, int stmt_id) { dbStatement* stmt = findStatement(session, stmt_id); int4 response; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else if (!stmt->cursor->gotoFirst()) { response = cli_not_found; } else { return fetch(session, stmt); } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::get_last(dbSession* session, int stmt_id) { dbStatement* stmt = findStatement(session, stmt_id); int4 response; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else if (!stmt->cursor->gotoLast()) { response = cli_not_found; } else { return fetch(session, stmt); } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::get_next(dbSession* session, int stmt_id) { dbStatement* stmt = findStatement(session, stmt_id); int4 response; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else if (!((stmt->firstFetch && stmt->cursor->gotoFirst()) || (!stmt->firstFetch && stmt->cursor->gotoNext()))) { response = cli_not_found; } else { return fetch(session, stmt); } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::get_prev(dbSession* session, int stmt_id) { dbStatement* stmt = findStatement(session, stmt_id); int4 response; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else if (!((stmt->firstFetch && stmt->cursor->gotoLast()) || (!stmt->firstFetch && stmt->cursor->gotoPrev()))) { response = cli_not_found; } else { return fetch(session, stmt); } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::skip(dbSession* session, int stmt_id, char* buf) { dbStatement* stmt = findStatement(session, stmt_id); int4 response; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else { int n = unpack4(buf); if ((n > 0 && !((stmt->firstFetch && stmt->cursor->gotoFirst() && stmt->cursor->skip(n-1) || (!stmt->firstFetch && stmt->cursor->skip(n))))) || (n < 0 && !((stmt->firstFetch && stmt->cursor->gotoLast() && stmt->cursor->skip(n+1) || (!stmt->firstFetch && stmt->cursor->skip(n)))))) { response = cli_not_found; } else { return fetch(session, stmt); } } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::seek(dbSession* session, int stmt_id, char* buf) { dbStatement* stmt = findStatement(session, stmt_id); int4 response; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else { oid_t oid = unpack_oid(buf); int pos = stmt->cursor->seek(oid); if (pos < 0) { response = cli_not_found; } else { return fetch(session, stmt, pos); } } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::fetch(dbSession* session, dbStatement* stmt, oid_t result) { int4 response; dbColumnBinding* cb; stmt->firstFetch = false; if (stmt->cursor->isEmpty()) { response = cli_not_found; pack4(response); return session->sock->write(&response, sizeof response); } int msg_size = sizeof(cli_oid_t) + 4; char* data = (char*)db->getRow(stmt->cursor->currId); for (cb = stmt->columns; cb != NULL; cb = cb->next) { if (cb->cliType == cli_autoincrement) { msg_size += 4; } else if (cb->cliType >= cli_array_of_oid) { msg_size += 4 + ((dbVarying*)(data + cb->fd->dbsOffs))->size * sizeof_type[cb->cliType - cli_array_of_oid]; } else if (cb->cliType >= cli_asciiz) { msg_size += 4 + ((dbVarying*)(data + cb->fd->dbsOffs))->size; } else { msg_size += sizeof_type[cb->cliType]; } } if (stmt->buf_size < msg_size) { delete[] stmt->buf; stmt->buf = new char[msg_size]; stmt->buf_size = msg_size; } char* p = stmt->buf; p = pack4(p, msg_size); p = pack_oid(p, result); for (cb = stmt->columns; cb != NULL; cb = cb->next) { char* src = data + cb->fd->dbsOffs; switch (cb->fd->type) { case dbField::tpBool: case dbField::tpInt1: switch (sizeof_type[cb->cliType]) { case 1: *p++ = *src; break; case 2: p = pack2(p, (int2)*(char*)src); break; case 4: p = pack4(p, (int4)*(char*)src); break; case 8: p = pack8(p, (db_int8)*(char*)src); break; default: assert(false); } break; case dbField::tpInt2: switch (sizeof_type[cb->cliType]) { case 1: *p++ = (char)*(int2*)src; break; case 2: p = pack2(p, src); break; case 4: p = pack4(p, (int4)*(int2*)src); break; case 8: p = pack8(p, (db_int8)*(int2*)src); break; default: assert(false); } break; case dbField::tpInt4: switch (sizeof_type[cb->cliType]) { case 1: *p++ = (char)*(int4*)src; break; case 2: p = pack2(p, (int2)*(int4*)src); break; case 4: p = pack4(p, src); break; case 8: p = pack8(p, (db_int8)*(int4*)src); break; default: assert(false); } break; case dbField::tpInt8: switch (sizeof_type[cb->cliType]) { case 1: *p++ = (char)*(db_int8*)src; break; case 2: p = pack2(p, (int2)*(db_int8*)src); break; case 4: p = pack4(p, (int4)*(db_int8*)src); break; case 8: p = pack8(p, src); break; default: assert(false); } break; case dbField::tpReal4: switch (cb->cliType) { case cli_real4: p = pack4(p, src); break; case cli_real8: { real8 temp = *(real4*)src; p = pack8(p, (char*)&temp); } break; default: assert(false); } break; case dbField::tpReal8: switch (cb->cliType) { case cli_real4: { real4 temp = (real4)*(real8*)src; p = pack4(p, (char*)&temp); } break; case cli_real8: p = pack8(p, src); break; default: assert(false); } break; case dbField::tpString: { dbVarying* v = (dbVarying*)src; p = pack4(p, v->size); memcpy(p, data + v->offs, v->size); p += v->size; } break; case dbField::tpReference: p = pack_oid(p, *(oid_t*)src); break; case dbField::tpArray: { dbVarying* v = (dbVarying*)src; int n = v->size; p = pack4(p, n); src = data + v->offs; switch (sizeof_type[cb->cliType-cli_array_of_oid]) { case 2: while (--n >= 0) { p = pack2(p, src); src += 2; } break; case 4: while (--n >= 0) { p = pack4(p, src); src += 4; } break; case 8: while (--n >= 0) { p = pack8(p, src); src += 8; } break; default: memcpy(p, src, n); p += n; } break; } default: assert(false); } } assert(p - stmt->buf == msg_size); return session->sock->write(stmt->buf, msg_size); } bool dbServer::remove(dbSession* session, int stmt_id) { dbStatement* stmt = findStatement(session, stmt_id); int4 response; if (stmt == NULL) { response = cli_bad_descriptor; } else { if (stmt->cursor->isEmpty()) { response = cli_not_found; } else { stmt->cursor->removeAllSelected(); response = cli_ok; } } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::update(dbSession* session, int stmt_id, char* new_data) { dbStatement* stmt = findStatement(session, stmt_id); dbColumnBinding* cb; int4 response; if (stmt == NULL) { response = cli_bad_descriptor; pack4(response); return session->sock->write(&response, sizeof response); } if (stmt->cursor->isEmpty()) { response = cli_not_found; pack4(response); return session->sock->write(&response, sizeof response); } char* old_data = stmt->buf + sizeof(cli_oid_t) + 4; for (cb = stmt->columns; cb != NULL; cb = cb->next) { cb->ptr = new_data; if (cb->cliType >= cli_asciiz) { int new_len = unpack4(new_data); int old_len = unpack4(old_data); cb->len = new_len; if (cb->fd->indexType & (HASHED|INDEXED) && memcmp(new_data, old_data, new_len+4) != 0) { cb->fd->attr |= dbFieldDescriptor::Updated; } if (cb->cliType >= cli_array_of_oid) { new_len *= sizeof_type[cb->cliType - cli_array_of_oid]; old_len *= sizeof_type[cb->cliType - cli_array_of_oid]; } new_data += 4 + new_len; old_data += 4 + old_len; } else { int size = sizeof_type[cb->cliType]; if (cb->fd->indexType & (HASHED|INDEXED) && memcmp(new_data, old_data, size) != 0) { cb->fd->attr |= dbFieldDescriptor::Updated; } new_data += size; old_data += size; } } db->beginTransaction(dbDatabase::dbExclusiveLock); dbRecord* rec = db->getRow(stmt->cursor->currId); dbTableDescriptor* table = stmt->query.table; dbFieldDescriptor *first = table->columns, *fd = first; size_t offs = table->fixedSize; do { if (fd->type == dbField::tpArray || fd->type == dbField::tpString) { int len = ((dbVarying*)((char*)rec + fd->dbsOffs))->size; for (cb = stmt->columns; cb != NULL; cb = cb->next) { if (cb->fd == fd) { len = cb->len; break; } } offs = DOALIGN(offs, fd->components->alignment) + len*fd->components->dbsSize; } } while ((fd = fd->next) != first); old_data = new char[rec->size]; memcpy(old_data, rec, rec->size); for (cb = stmt->columns; cb != NULL; cb = cb->next) { if (cb->fd->attr & dbFieldDescriptor::Updated) { if (cb->fd->indexType & HASHED) { dbHashTable::remove(db, cb->fd->hashTable, stmt->cursor->currId, cb->fd->type, cb->fd->dbsSize, cb->fd->dbsOffs); } if (cb->fd->indexType & INDEXED) { dbTtree::remove(db, cb->fd->tTree, stmt->cursor->currId, cb->fd->type, cb->fd->dbsSize, cb->fd->comparator, cb->fd->dbsOffs); } } } db->modified = true; new_data = (char*)db->putRow(stmt->cursor->currId, offs); fd = first; offs = table->fixedSize; do { if (fd->type == dbField::tpArray|| fd->type == dbField::tpString) { int len = ((dbVarying*)(old_data + fd->dbsOffs))->size; offs = DOALIGN(offs, fd->components->alignment); for (cb = stmt->columns; cb != NULL; cb = cb->next) { if (cb->fd == fd) { len = cb->unpackArray(new_data, offs); break; } } if (cb == NULL) { memcpy(new_data + offs, old_data + ((dbVarying*)(old_data + fd->dbsOffs))->offs, len*fd->components->dbsSize); } ((dbVarying*)(new_data + fd->dbsOffs))->size = len; ((dbVarying*)(new_data + fd->dbsOffs))->offs = offs; offs += len*fd->components->dbsSize; } else { for (cb = stmt->columns; cb != NULL; cb = cb->next) { if (cb->fd == fd) { cb->unpackScalar(new_data); break; } } if (cb == NULL) { memcpy(new_data + fd->dbsOffs, old_data + fd->dbsOffs, fd->dbsSize); } } } while ((fd = fd->next) != first); delete[] old_data; for (cb = stmt->columns; cb != NULL; cb = cb->next) { if (cb->fd->attr & dbFieldDescriptor::Updated) { cb->fd->attr &= ~dbFieldDescriptor::Updated; if (cb->fd->indexType & HASHED) { dbHashTable::insert(db, cb->fd->hashTable, stmt->cursor->currId, cb->fd->type, cb->fd->dbsSize, cb->fd->dbsOffs, 0); } if (cb->fd->indexType & INDEXED) { dbTtree::insert(db, cb->fd->tTree, stmt->cursor->currId, cb->fd->type, cb->fd->dbsSize, cb->fd->comparator, cb->fd->dbsOffs); } } } response = cli_ok; pack4(response); return session->sock->write(&response, sizeof response); } char* dbServer::checkColumns(dbStatement* stmt, int n_columns, dbTableDescriptor* desc, char* data, int4& response) { dbColumnBinding** cpp = &stmt->columns; response = cli_ok; while (--n_columns >= 0) { int cliType = *data++; char* columnName = data; dbSymbolTable::add(columnName, tkn_ident, true); dbFieldDescriptor* fd = desc->findSymbol(columnName); data += strlen(data) + 1; if (fd != NULL) { if ((cliType == cli_oid && fd->type == dbField::tpReference) || (cliType >= cli_bool && cliType <= cli_int8 && fd->type >= dbField::tpBool && fd->type <= dbField::tpInt8) || (cliType >= cli_real4 && cliType <= cli_real8 && fd->type >= dbField::tpReal4 && fd->type <= dbField::tpReal8) || ((cliType == cli_asciiz || cliType == cli_pasciiz) && fd->type == dbField::tpString) || (cliType == cli_array_of_oid && fd->type == dbField::tpArray && fd->components->type == dbField::tpReference) || (cliType >= cli_array_of_bool && fd->type == dbField::tpArray && cliType-cli_array_of_bool == fd->components->type-dbField::tpBool)) { dbColumnBinding* cb = new dbColumnBinding(fd, cliType); *cpp = cb; cpp = &cb->next; } else { response = cli_incompatible_type; break; } } else { TRACE_MSG(("Field '%s' not found\n", columnName)); response = cli_column_not_found; break; } } return data; } bool dbServer::insert(dbSession* session, int stmt_id, char* data, bool prepare) { dbStatement* stmt = findStatement(session, stmt_id); dbTableDescriptor* desc = NULL; dbColumnBinding* cb; int4 response; char reply_buf[sizeof(cli_oid_t) + 8]; char* dst; oid_t oid = 0; size_t offs; int n_columns; if (stmt == NULL) { if (!prepare) { response = cli_bad_statement; goto return_response; } stmt = new dbStatement(stmt_id); stmt->next = session->stmts; session->stmts = stmt; } else { if (prepare) { stmt->reset(); } else if ((desc = stmt->table) == NULL) { response = cli_bad_descriptor; goto return_response; } } if (prepare) { session->scanner.reset(data); if (session->scanner.get() != tkn_insert || session->scanner.get() != tkn_into || session->scanner.get() != tkn_ident) { response = cli_bad_statement; goto return_response; } desc = db->findTable(session->scanner.ident); if (desc == NULL) { response = cli_table_not_found; goto return_response; } data += strlen(data)+1; n_columns = *data++; data = checkColumns(stmt, n_columns, desc, data, response); if (response != cli_ok) { goto return_response; } stmt->table = desc; } offs = desc->fixedSize; for (cb = stmt->columns; cb != NULL; cb = cb->next) { cb->ptr = data; if (cb->cliType == cli_autoincrement) { ; } else if (cb->cliType >= cli_asciiz) { cb->len = unpack4(data); data += 4 + cb->len*cb->fd->components->dbsSize; offs = DOALIGN(offs, cb->fd->components->alignment) + cb->len*cb->fd->components->dbsSize; } else { data += sizeof_type[cb->cliType]; } } db->beginTransaction(dbDatabase::dbExclusiveLock); db->modified = true; oid = db->allocateRow(desc->tableId, offs); dst = (char*)db->getRow(oid); offs = desc->fixedSize; for (cb = stmt->columns; cb != NULL; cb = cb->next) { dbFieldDescriptor* fd = cb->fd; if (fd->type == dbField::tpArray || fd->type == dbField::tpString) { offs = DOALIGN(offs, fd->components->alignment); ((dbVarying*)(dst + fd->dbsOffs))->offs = offs; ((dbVarying*)(dst + fd->dbsOffs))->size = cb->len; offs += cb->unpackArray(dst, offs)*fd->components->dbsSize; } else { cb->unpackScalar(dst); } } for (cb = stmt->columns; cb != NULL; cb = cb->next) { if (cb->fd->indexType & HASHED) { dbHashTable::insert(db, cb->fd->hashTable, oid, cb->fd->type, cb->fd->dbsSize, cb->fd->dbsOffs, 0); } if (cb->fd->indexType & INDEXED) { dbTtree::insert(db, cb->fd->tTree, oid, cb->fd->type, cb->fd->dbsSize, cb->fd->comparator, cb->fd->dbsOffs); } } response = cli_ok; return_response: pack4(reply_buf, response); if (desc == NULL) { pack4(reply_buf+4, 0); } else { #ifdef AUTOINCREMENT_SUPPORT pack4(reply_buf+4, desc->autoincrementCount); #else pack4(reply_buf+4, ((dbTable*)db->getRow(desc->tableId))->nRows); #endif } pack_oid(reply_buf+8, oid); return session->sock->write(reply_buf, sizeof reply_buf); } bool dbServer::describe_table(dbSession* session, char const* table) { dbTableDescriptor* desc = db->findTableByName(table); if (desc == NULL) { char response[8]; pack4(response, 0); pack4(response+4, -1); return session->sock->write(response, sizeof response); } else { int i, length = 0; dbFieldDescriptor* fd = desc->columns; for (i = desc->nColumns; --i >= 0;) { length += strlen(fd->name)+2+3; if (fd->refTableName != NULL) { length += strlen(fd->refTableName); } else if (fd->type == dbField::tpArray && fd->components->refTableName != NULL) { length += strlen(fd->components->refTableName); } if (fd->inverseRefName != NULL) { length += strlen(fd->inverseRefName); } fd = fd->next; } dbSmallBuffer response(length+8); char* p = (char*)response; pack4(p, length); pack4(p+4, desc->nColumns); p += 8; for (i = desc->nColumns, fd = desc->columns; --i >= 0;) { int flags = 0; *p++ = map_type(fd); if (fd->tTree != 0) { flags |= cli_indexed; } if (fd->hashTable != 0) { flags |= cli_hashed; } *p++ = (char)flags; strcpy(p, fd->name); p += strlen(fd->name)+1; if (fd->refTableName != NULL) { strcpy(p, fd->refTableName); p += strlen(p) + 1; } else if (fd->type == dbField::tpArray && fd->components->refTableName != NULL) { strcpy(p, fd->components->refTableName); p += strlen(p) + 1; } else { *p++ = '\0'; } if (fd->inverseRefName != NULL) { strcpy(p, fd->inverseRefName); p += strlen(p) + 1; } else { *p++ = '\0'; } fd = fd->next; } return session->sock->write(response, length+8); } } bool dbServer::show_tables(dbSession* session) { dbTableDescriptor* desc=db->tables; if (desc == NULL) { char response[8]; pack4(response, 0); pack4(response+4, -1); return session->sock->write(response, sizeof response); } else { int length = 0, n = 0; for (desc=db->tables; desc != NULL; desc=desc->nextDbTable) { if (strcmp(desc->name, "Metatable")) { length += strlen(desc->name)+1; n++; } } dbSmallBuffer response(length+8); char* p = (char*)response; pack4(p, length); pack4(p+4, n); p += 8; for (desc=db->tables; desc != NULL; desc=desc->nextDbTable) { if (strcmp(desc->name, "Metatable")) { strcpy(p, desc->name); p += strlen(desc->name)+1; } } return session->sock->write(response, length+8); } } bool dbServer::create_table(dbSession* session, char* data) { db->beginTransaction(dbDatabase::dbExclusiveLock); db->modified = true; char* tableName = data; data += strlen(data) + 1; int nColumns = *data++ & 0xFF; cli_field_descriptor* columns = new cli_field_descriptor[nColumns]; for (int i = 0; i < nColumns; i++) { columns[i].type = (cli_var_type)*data++; columns[i].flags = *data++ & 0xFF; columns[i].name = data; data += strlen(data) + 1; if (*data != 0) { columns[i].refTableName = data; data += strlen(data) + 1; } else { columns[i].refTableName = NULL; data += 1; } if (*data != 0) { columns[i].inverseRefFieldName = data; data += strlen(data) + 1; } else { columns[i].inverseRefFieldName = NULL; data += 1; } } if (session->existed_tables == NULL) { session->existed_tables = db->tables; } int4 response = dbCLI::create_table(db, tableName, nColumns, columns); pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::drop_table(dbSession* session, char* tableName) { db->beginTransaction(dbDatabase::dbExclusiveLock); dbTableDescriptor* desc = db->findTableByName(tableName); int4 response = cli_ok; if (desc != NULL) { db->dropTable(desc); if (desc == session->existed_tables) { session->existed_tables = desc->nextDbTable; } db->unlinkTable(desc); desc->nextDbTable = session->dropped_tables; session->dropped_tables = desc; } else { response = cli_table_not_found; } pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::alter_index(dbSession* session, char* data) { char* tableName = data; data += strlen(data) + 1; char* fieldName = data; data += strlen(data) + 1; int newFlags = *data++ & 0xFF; int4 response = dbCLI::alter_index(db, tableName, fieldName, newFlags); pack4(response); return session->sock->write(&response, sizeof response); } bool dbServer::select(dbSession* session, int stmt_id, char* msg, bool prepare) { int4 response; int i, n_params, tkn, n_columns; dbStatement* stmt = findStatement(session, stmt_id); dbCursorType cursorType; dbTableDescriptor* desc; if (prepare) { if (stmt == NULL) { stmt = new dbStatement(stmt_id); stmt->next = session->stmts; session->stmts = stmt; } else { stmt->reset(); } stmt->n_params = *msg++; stmt->n_columns = n_columns = *msg++; stmt->params = new dbParameterBinding[stmt->n_params]; stmt->firstFetch = true; int len = unpack2(msg); msg += 2; session->scanner.reset(msg); char *p, *end = msg + len; if (session->scanner.get() != tkn_select) { response = cli_bad_statement; goto return_response; } if ((tkn = session->scanner.get()) == tkn_all) { tkn = session->scanner.get(); } if (tkn == tkn_from && session->scanner.get() == tkn_ident) { if ((desc = db->findTable(session->scanner.ident)) != NULL) { msg = checkColumns(stmt, n_columns, desc, end, response); if (response != cli_ok) { goto return_response; } stmt->cursor = new dbAnyCursor(*desc, dbCursorViewOnly, NULL); stmt->cursor->setPrefetchMode(false); } else { response = cli_table_not_found; goto return_response; } } else { response = cli_bad_statement; goto return_response; } p = session->scanner.p; for (i = 0; p < end; i++) { stmt->query.append(dbQueryElement::qExpression, p); p += strlen(p) + 1; if (p < end) { int cliType = *p++; static const dbQueryElement::ElementType type_map[] = { dbQueryElement::qVarReference, // cli_oid dbQueryElement::qVarBool, // cli_bool dbQueryElement::qVarInt1, // cli_int1 dbQueryElement::qVarInt2, // cli_int2 dbQueryElement::qVarInt4, // cli_int4 dbQueryElement::qVarInt8, // cli_int8 dbQueryElement::qVarReal4, // cli_real4 dbQueryElement::qVarReal8, // cli_real8 dbQueryElement::qVarStringPtr, // cli_asciiz dbQueryElement::qVarStringPtr, // cli_pasciiz }; stmt->params[i].type = cliType; stmt->query.append(type_map[cliType], &stmt->params[i].u); } } } else { if (stmt == NULL) { response = cli_bad_descriptor; goto return_response; } } cursorType = *msg++ ? dbCursorForUpdate : dbCursorViewOnly; for (i = 0, n_params = stmt->n_params; i < n_params; i++) { switch (stmt->params[i].type) { case cli_oid: stmt->params[i].u.oid = unpack_oid(msg); msg += sizeof(cli_oid_t); break; case cli_int1: stmt->params[i].u.i1 = *msg++; break; case cli_int2: msg = unpack2((char*)&stmt->params[i].u.i2, msg); break; case cli_int4: msg = unpack4((char*)&stmt->params[i].u.i4, msg); break; case cli_int8: msg = unpack8((char*)&stmt->params[i].u.i8, msg); break; case cli_real4: msg = unpack4((char*)&stmt->params[i].u.r4, msg); break; case cli_real8: msg = unpack8((char*)&stmt->params[i].u.r8, msg); break; case cli_bool: stmt->params[i].u.b = *msg++; break; case cli_asciiz: case cli_pasciiz: stmt->params[i].u.str = msg; msg += strlen(msg) + 1; break; default: response = cli_bad_statement; goto return_response; } } #ifdef THROW_EXCEPTION_ON_ERROR try { response = stmt->cursor->select(stmt->query, cursorType); } catch (dbException const& x) { response = (x.getErrCode() == dbDatabase::QueryError) ? cli_bad_statement : cli_runtime_error; } #else { dbDatabaseThreadContext* ctx = db->threadContext.get(); ctx->catched = true; int errorCode = setjmp(ctx->unwind); if (errorCode == 0) { response = stmt->cursor->select(stmt->query, cursorType); } else { response = (errorCode == dbDatabase::QueryError) ? cli_bad_statement : cli_runtime_error; } ctx->catched = false; } #endif return_response: pack4(response); return session->sock->write(&response, sizeof response); } void dbServer::serveClient() { dbStatement *sp, **spp; db->attach(); while (true) { dbSession* session; { dbCriticalSection cs(mutex); do { go.wait(mutex); if (cancelWait) { nIdleThreads -= 1; done.signal(); db->detach(); return; } } while (waitList == NULL); session = waitList; waitList = waitList->next; session->next = activeList; activeList = session; nIdleThreads -= 1; nActiveThreads += 1; waitListLength -= 1; } cli_request req; int4 response = cli_ok; bool online = true; while (online && session->sock->read(&req, sizeof req)) { req.unpack(); int length = req.length - sizeof(req); dbSmallBuffer msg(length); if (length > 0) { if (!session->sock->read(msg, length)) { break; } } switch(req.cmd) { case cli_cmd_close_session: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; delete session->dropped_tables; session->dropped_tables = next; } db->commit(); session->in_transaction = false; online = false; break; case cli_cmd_prepare_and_execute: online = select(session, req.stmt_id, msg, true); session->in_transaction = true; break; case cli_cmd_execute: online = select(session, req.stmt_id, msg, false); break; case cli_cmd_get_first: online = get_first(session, req.stmt_id); break; case cli_cmd_get_last: online = get_last(session, req.stmt_id); break; case cli_cmd_get_next: online = get_next(session, req.stmt_id); break; case cli_cmd_get_prev: online = get_prev(session, req.stmt_id); break; case cli_cmd_skip: online = skip(session, req.stmt_id, msg); break; case cli_cmd_seek: online = seek(session, req.stmt_id, msg); break; case cli_cmd_freeze: online = freeze(session, req.stmt_id); break; case cli_cmd_unfreeze: online = unfreeze(session, req.stmt_id); break; case cli_cmd_free_statement: for (spp = &session->stmts; (sp = *spp) != NULL; spp = &sp->next) { if (sp->id == req.stmt_id) { *spp = sp->next; delete sp; break; } } break; case cli_cmd_abort: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; db->linkTable(session->dropped_tables, session->dropped_tables->tableId); session->dropped_tables = next; } if (session->existed_tables != NULL) { while (db->tables != session->existed_tables) { dbTableDescriptor* table = db->tables; db->unlinkTable(table); delete table; } session->existed_tables = NULL; } db->rollback(); session->in_transaction = false; online = session->sock->write(&response, sizeof response); break; case cli_cmd_commit: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; delete session->dropped_tables; session->dropped_tables = next; } session->existed_tables = NULL; db->commit(); session->in_transaction = false; online = session->sock->write(&response, sizeof response); break; case cli_cmd_precommit: db->precommit(); online = session->sock->write(&response, sizeof response); break; case cli_cmd_update: update(session, req.stmt_id, msg); break; case cli_cmd_remove: remove(session, req.stmt_id); break; case cli_cmd_prepare_and_insert: insert(session, req.stmt_id, msg, true); session->in_transaction = true; break; case cli_cmd_insert: insert(session, req.stmt_id, msg, false); break; case cli_cmd_describe_table: describe_table(session, (char*)msg); break; case cli_cmd_show_tables: show_tables(session); break; case cli_cmd_create_table: online = create_table(session, msg); break; case cli_cmd_drop_table: online = drop_table(session, msg); break; case cli_cmd_alter_index: online = alter_index(session, msg); break; } } if (session->in_transaction) { while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; db->linkTable(session->dropped_tables, session->dropped_tables->tableId); session->dropped_tables = next; } if (session->existed_tables != NULL) { while (db->tables != session->existed_tables) { dbTableDescriptor* table = db->tables; db->unlinkTable(table); delete table; } session->existed_tables = NULL; } db->rollback(); } // Finish session { dbCriticalSection cs(mutex); dbSession** spp; delete session->sock; for (spp = &activeList; *spp != session; spp = &(*spp)->next); *spp = session->next; session->next = freeList; freeList = session; nActiveThreads -= 1; if (cancelSession) { done.signal(); break; } if (nActiveThreads + nIdleThreads >= optimalNumberOfThreads) { break; } nIdleThreads += 1; } } db->detach(); } void dbServer::acceptConnection(socket_t* acceptSock) { while (true) { socket_t* sock = acceptSock->accept(); dbCriticalSection cs(mutex); if (cancelAccept) { return; } if (sock != NULL) { if (freeList == NULL) { freeList = new dbSession; freeList->next = NULL; } dbSession* session = freeList; freeList = session->next; session->sock = sock; session->stmts = NULL; session->next = waitList; session->in_transaction = false; session->existed_tables = NULL; session->dropped_tables = NULL; waitList = session; waitListLength += 1; if (nIdleThreads < waitListLength) { dbThread thread; nIdleThreads += 1; thread.create(serverThread, this); thread.detach(); } go.signal(); } } } dbServer::~dbServer() { dbServer** spp; for (spp = &chain; *spp != this; spp = &(*spp)->next); *spp = next; delete globalAcceptSock; delete localAcceptSock; delete[] URL; }