/*#io
SkipDB ioDoc(
		   docCopyright("Steve Dekorte", 2004)
		   docLicense("BSD revised")
		   docObject("SkipDB")    
		   docDescription("BerkeleyDB style database implemented with skip lists instead of a b-tree.")
		   */

#include "SkipDBM.h"
#include "SkipDB.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

// lookups 

void SkipDB_clearUpdate(SkipDB *self);

// ops 

void SkipDB_readRootRecord(SkipDB *self);

// ---------------------------------------------------------- 

SkipDB *SkipDB_new(void)
{
	SkipDB *self = (SkipDB *)calloc(1, sizeof(SkipDB));
	self->refCount = 1;
	self->p = SKIPDB_PROBABILITY_DISTRIBUTION; 
	self->stream = BStream_new();
	
	self->cursors = List_new();
	
	self->pidsToRemove = List_new();
	self->dirtyRecords = List_new();
	
	self->cacheHighWaterMark = 1000;
	self->cacheLowWaterMark  = 500;
	
	self->pidToRecord = Hash_new();
	self->randomGen = RandomGen_new();
	return self;
}

void SkipDB_sdbm_(SkipDB *self, void *dbm)
{
	self->dbm = dbm;
}

SkipDB *SkipDB_newWithDBM_(void *dbm)
{
	SkipDB *self = SkipDB_new();
	self->dbm = dbm;
	self->header = SkipDBRecord_newWithDB_(self);
	SkipDBRecord_keyDatum_(self->header, Datum_Empty());
	SkipDBRecord_valueDatum_(self->header, Datum_Empty());
	//self->headerPid = SkipDBRecord_pid(self->header);
	return self;
}

SkipDB *SkipDB_newWithDBM_atPid_(void *dbm, PID_TYPE pid)
{
	/*
	 SkipDB *self = SkipDB_new();
	 self->dbm = dbm;
	 self->headerPid = pid;
	 self->header = SkipDBRecord_newWithDB_(self);
	 SkipDBRecord_keyDatum_(self->header, Datum_Empty());
	 SkipDBRecord_valueDatum_(self->header, Datum_Empty());
	 */
	return SkipDB_newWithDBM_(dbm);
}

void SkipDB_dealloc(SkipDB *self)
{
	if (self->dbm) SkipDBM_willFreeDB_(self->dbm, self);
	SkipDB_clearUpdate(self);
	
	BStream_free(self->stream);
	SkipDB_freeAllCachedRecords(self);
	
	// cursors are allocated and must be freed individually.
	List_do_(self->cursors, (ListDoCallback *)SkipDBCursor_release);
	List_free(self->cursors);
	
	List_free(self->pidsToRemove);
	List_free(self->dirtyRecords);
	Hash_free(self->pidToRecord);
	RandomGen_free(self->randomGen);
	free(self);
}


void SkipDB_release(SkipDB *self)
{
	self->refCount --;
	
	if (self->refCount == 0)
	{
		SkipDB_dealloc(self);
	}
}

void SkipDB_retain(SkipDB *self)
{
	self->refCount ++;
}


void SkipDB_headerPid_(SkipDB *self, PID_TYPE pid)
{
	self->headerPid = pid;
}

PID_TYPE SkipDB_headerPid(SkipDB *self)
{
	return self->headerPid;
}

SkipDBRecord *SkipDB_headerRecord(SkipDB *self)
{
	return self->header;
}

UDB *SkipDB_udb(SkipDB *self)
{
	return SkipDBM_udb(self->dbm);
}

BStream *SkipDB_tmpStream(SkipDB *self)
{
	return self->stream;
}

int SkipDB_delete(SkipDB *self)
{
	int count = 0;
	SkipDBRecord *r = self->header;
	
	while (r)
	{
		PID_TYPE pid = SkipDBRecord_pid(r);
		SkipDBRecord *next = SkipDBRecord_nextRecord(self->header);
		
		if (pid)
		{   
			UDB_removeAt_(SkipDB_udb(self), pid);
		}
		
		r = next;
		count ++;
	}
	
	return count;
}

// notifications --------------------------------- 

void SkipDB_noteNewRecord_(SkipDB *self, SkipDBRecord *r)
{
	self->cachedRecordCount ++;
	SkipDB_noteAccessedRecord_(self, r);
}

void SkipDB_noteAccessedRecord_(SkipDB *self, SkipDBRecord *r)
{
	SkipDBRecord *y = self->youngestRecord;
	
	//SkipDBRecord_showAgeList(r);
	
	if (y != r)
	{    
		if (y)
		{
			SkipDBRecord_removeFromAgeList(r);
			SkipDBRecord_setOlderRecord_(r, y);
			SkipDBRecord_setYoungerRecord_(y, r);
		}
		
		self->youngestRecord = r;
	}
	
	//SkipDBRecord_showAgeList(r);
}

void SkipDB_noteDirtyRecord_(SkipDB *self, SkipDBRecord *r)
{
	if (SkipDB_isOpen(self))
	{
		List_append_(self->dirtyRecords, r);
	}
}

void SkipDB_noteAssignedPidToRecord_(SkipDB *self, SkipDBRecord *r)
{
	Hash_at_put_(self->pidToRecord, (void *)SkipDBRecord_pid(r), r);
}

void SkipDB_noteWillFreeRecord_(SkipDB *self, SkipDBRecord *r)
{
	SkipDBFreeObjectFunc *f = self->objectFreeFunc;
	void *object = SkipDBRecord_object(r);
	
	if (f && object)
	{
		(*f)(object);
	}
	
	self->cachedRecordCount --;
	
	if (r == self->youngestRecord)
	{
		self->youngestRecord = SkipDBRecord_olderRecord(r);
		//printf("will free youngestRecord\n");
	}
	
	SkipDBRecord_removeFromAgeList(r);
	Hash_removeKey_(self->pidToRecord, (void *)SkipDBRecord_pid(r));
}

void SkipDB_freeAllCachedRecords(SkipDB *self)
{
	//SkipDBRecord *r = self->youngestRecord;
	//self->headerPid = SkipDBRecord_pid(self->header);
	
	//SkipDBRecord_showAgeList(self->youngestRecord);
	
	while (self->youngestRecord)
	{
		SkipDBRecord_dealloc(self->youngestRecord);
	}
	
	self->header = NULL;
}

// cache ------------------------------------- 

void SkipDB_setCacheHighWaterMark_(SkipDB *self, size_t recordCount)
{
	self->cacheHighWaterMark = recordCount;
	
	if (recordCount < self->cacheLowWaterMark)
	{
		self->cacheLowWaterMark = recordCount / 2;
	}
}

size_t SkipDB_cacheHighWaterMark(SkipDB *self)
{
	return self->cacheHighWaterMark;
}

void SkipDB_setCacheLowWaterMark_(SkipDB *self, size_t recordCount)
{
	self->cacheLowWaterMark = recordCount;
	
	if (recordCount > self->cacheHighWaterMark)
	{
		self->cacheHighWaterMark = recordCount * 2;
	}
}

size_t SkipDB_cacheLowWaterMark(SkipDB *self)
{
	return self->cacheLowWaterMark;
}

int SkipDB_headerIsEmpty(SkipDB *self)
{
	return SkipDBRecord_pointersAreEmpty(self->header);
}

void SkipDB_freeExcessCachedRecords(SkipDB *self)
{
	// this is only called after a sync, so there are no dirty records 
	//return;
	
	if (self->cachedRecordCount > self->cacheHighWaterMark)
	{
		SkipDBRecord *r;
		SkipDBRecord *lastMarkedRecord = NULL;
		size_t lowMark = self->cacheLowWaterMark;
		
		//printf("SkipDB_freeExcessCachedRecords() start %i\n", (int)self->cachedRecordCount);
		
		// make sure we keep the header 
		
		SkipDB_noteAccessedRecord_(self, self->header);
		r = self->youngestRecord;
		
		// mark the cursor records 
		
		List_do_(self->cursors, (ListDoCallback *)SkipDBCursor_mark);
		
		// mark the recent records 
		
		while (lowMark --)
		{
			r->mark = 1;
			r = r->olderRecord;
		}
		
		// unmark the rest of the records 
		
		while (r)
		{
			r->mark = 0;
			r = r->olderRecord;
		}
		
		// remove the references to older records 
		
		lowMark = self->cacheLowWaterMark;
		r = self->youngestRecord;
		
		while (r->mark)
		{
			lowMark --;
			SkipDBRecord_removeReferencesToUnmarked(r);
			lastMarkedRecord = r; // remember this so we can clip the list 
			r = r->olderRecord;
		}
		
		// dealloc the remaining records 
		
		while (r)
		{
			SkipDBRecord *next = r->olderRecord;
			
			// to avoid SkipDBRecord_removeFromAgeList() issues
			r->olderRecord   = NULL;
			r->youngerRecord = NULL;
			
			if (r->mark)
			{
				printf("error - attempt to dealloc marked record\n");
				exit(-1);
			}
			
			SkipDBRecord_dealloc(r); // what about SkipDBRecord_removeFromAgeList()?
			r = next;
		}
		
		lastMarkedRecord->olderRecord = NULL; // clip the list 
		
		//printf("SkipDB_freeExcessCachedRecords() done %i\n", (int)self->cachedRecordCount);
	}
}

int SkipDB_isOpen(SkipDB *self)
{
	return UDB_isOpen(SkipDB_udb(self));
}

void SkipDB_clearCache(SkipDB *self)
{
	printf("\nclearCache\n\n");
	
	if (self->header)
	{
		SkipDB_freeAllCachedRecords(self);
		SkipDB_readRootRecord(self);
	}
}

// transactions --------------------------------------------------- 

void SkipDB_sync(SkipDB *self)
{
	if (SkipDB_isOpen(self))
	{
		if (!self->headerPid)
		{
			self->headerPid = SkipDBRecord_pidAllocIfNeeded(self->header);
		}
		
		SkipDB_saveDirtyRecords(self);
		SkipDB_deleteRecordsToRemove(self);
	}
	
	List_removeAll(self->dirtyRecords);
	List_removeAll(self->pidsToRemove);
	SkipDB_freeExcessCachedRecords(self);
}

void SkipDB_saveDirtyRecords(SkipDB *self)
{
	List_do_(self->dirtyRecords, (ListDoCallback *)SkipDBRecord_save);
}

void SkipDB_deleteRecordsToRemove(SkipDB *self)
{
	UDB *udb = SkipDB_udb(self);	
	LIST_FOREACH(self->pidsToRemove, i, pid, UDB_removeAt_(udb, (PID_TYPE)pid));
}

// ops -------------------------------------------------- 

void SkipDB_readRootRecord(SkipDB *self)
{
	SkipDBRecord *r = SkipDB_recordAtPid_(self, self->headerPid);
	
	if (!r) 
	{
		r = SkipDBRecord_newWithDB_(self);
		SkipDBRecord_keyDatum_(r, Datum_Empty());
		SkipDBRecord_markAsDirty(r);
		self->headerPid = SkipDBRecord_pid(r);
	}
	
	self->header = r;
}

int SkipDB_level(SkipDB *self)
{
	return SkipDBRecord_level(self->header);
}

void SkipDB_level_(SkipDB *self, int level)
{
	SkipDBRecord_level_(self->header, level);
	SkipDBRecord_markAsDirty(self->header);
}

int SkipDB_pickRandomRecordLevel(SkipDB *self)
{
	float r = (float)RandomGen_randomDouble(self->randomGen);
	int level = 1;
	
	while (r < self->p && level < SKIPDB_MAX_LEVEL)
	{
		level ++;
		r = (float)RandomGen_randomDouble(self->randomGen);
	}
	
	if (level > SkipDB_level(self)) 
	{
		level = SkipDB_level(self) + 1;
	}
	
	return level;
}

SkipDBRecord *SkipDB_recordAtPid_(SkipDB *self, PID_TYPE pid)
{
	if (pid)
	{
		SkipDBRecord *r = Hash_at_(self->pidToRecord, (void *)pid);
		
		if (r)
		{
			return r;
		}
		else
		{
			Datum d = UDB_at_(SkipDB_udb(self), pid);
			
			if (d.size)
			{
				SkipDBRecord *r = SkipDBRecord_newWithDB_(self);
				BStream_setData_length_(self->stream, d.data, d.size); // need to optimize this out 
				SkipDBRecord_fromStream_(r, self->stream);
				SkipDBRecord_pid_(r, pid);
				return r;
			}
		}
		
		if (pid != 1) 
		{
			UDB *udb = SkipDB_udb(self);
			printf("MISSING SKIP RECORD WITH PID: %" PID_FORMAT "\n", pid);
			UDB_at_(udb, pid);
		}
	}
	return NULL;
}

// lookups ----------------------------- 

void SkipDB_updateAt_put_(SkipDB *self, int level, SkipDBRecord *r)
{
	self->update[level] = r;
	//if (r) SkipDB_noteAccessedRecord_(self, r);
}

void SkipDB_clearUpdate(SkipDB *self)
{
	int i;
	
	for (i = 0; i < SKIPDB_MAX_LEVEL; i ++)
	{
		SkipDB_updateAt_put_(self, i, NULL);
	} 
}

// Record API ------------------------------------- 

SkipDBRecord *SkipDB_recordAt_(SkipDB *self, Datum k)
{
	SkipDB_clearUpdate(self);
	return SkipDBRecord_find_quick_(self->header, k, 0);
}

SkipDBRecord *SkipDB_recordAt_put_(SkipDB *self, Datum k, Datum v)
{
	SkipDBRecord *r = SkipDB_recordAt_(self, k);
	//SkipDB_showUpdate(self);
	
	if (r)
	{
		// update record 
		SkipDBRecord_valueDatum_(r, v);
		SkipDBRecord_markAsDirty(r);
	}
	else
	{
		// create new record  
		r = SkipDBRecord_newWithDB_(self);
		//printf("%p = SkipDBRecord_newWithDB_(self);\n", (void *)r);
		
		SkipDBRecord_keyDatum_(r, k);
		SkipDBRecord_valueDatum_(r, v);
		
		// pick a level for it, and update header level if needed 
		{
			int level = SkipDB_pickRandomRecordLevel(self);
			
			//printf("picked level = %i\n", level);
			SkipDBRecord_level_(r, level);
			
			if (level > SkipDB_level(self))
			{
				int i; 
				
				for (i = SkipDB_level(self); i < level; i ++)
				{
					SkipDB_updateAt_put_(self, i, self->header);
				}
				
				SkipDB_level_(self, level);
			}
		}
		
		SkipDBRecord_markAsDirty(r); // need to do this to get a pid 
		//SkipDBRecord_show(r);
		
		// set the records in the update vector to point to the inserted record 
		{
			int level = SkipDBRecord_level(r);
			
			while (level --)
			{
				SkipDBRecord *updateRecord = self->update[level];
				
				if (updateRecord) 
				{
					SkipDBRecord_copyLevel_from_(r, level, updateRecord);
					/*
					 printf("after: %s insert: %s\n", 
						   ByteArray_asCString(updateRecord->key), 
						   ByteArray_asCString(r->key));
					*/
					
					// update previous pointer 
					// (r is to the right of the update records)
					if (level == 0)
					{
						SkipDBRecord *nextRecord = SkipDBRecord_nextRecord(updateRecord);
						SkipDBRecord_previousRecord_(r, updateRecord);
						
						if(nextRecord)
						{
							SkipDBRecord_previousRecord_(nextRecord, r);
							SkipDBRecord_markAsDirty(nextRecord);
						}
					}
					
					SkipDBRecord_atLevel_setRecord_(updateRecord, level, r);
					SkipDBRecord_markAsDirty(updateRecord);
				}
			}
		}
		
		SkipDBRecord_markAsDirty(r);
	}
	
	return r;
}

// Datum API ------------------------------------- 

Datum SkipDB_at_(SkipDB *self, Datum k)
{
	SkipDBRecord *r;
	
	SkipDB_clearUpdate(self);
	
	r = SkipDBRecord_find_quick_(self->header, k, 1);
	
	//SkipDBRecord *r = SkipDB_recordAt_(self, k);
	
	if (r)
	{
		return Datum_FromByteArray_(SkipDBRecord_value(r));
	}
	
	return Datum_Empty();
}

void SkipDB_at_put_(SkipDB *self, Datum k, Datum v)
{
	SkipDB_recordAt_put_(self, k, v);
}

void SkipDB_removeAt_(SkipDB *self, Datum k)
{
	SkipDBRecord *r = SkipDB_recordAt_(self, k);
	SkipDBRecord *lastUr = NULL;
	
	if (r)
	{
		int i;
		
		for (i = 0; i < SKIPDB_MAX_LEVEL; i ++)
		{
			SkipDBRecord *ur = self->update[i];
			
			if (ur == r) 
			{
				break;
			}
			
			if (ur && ur != lastUr) 
			{
				SkipDBRecord_willRemove_(ur, r);
			}
			
			// update previous pointer 
			
			if (i == 0)
			{
				SkipDBRecord *nextRecord = SkipDBRecord_nextRecord(r);
				SkipDBRecord *previousRecord = SkipDBRecord_previousRecord(r);
				
				if (nextRecord)
				{
					SkipDBRecord_previousRecord_(nextRecord, previousRecord);
				}
			}
			lastUr = ur;
		}
		
		if (SkipDB_isOpen(self))
		{
			PID_TYPE pid = SkipDBRecord_pid(r);
			
			if (pid)
			{
				List_append_(self->pidsToRemove, (void *)pid);
			}
			//UDB_removeAt_(SkipDB_udb(self), SkipDBRecord_pid(r));
		}
		
		// nothing should be pointing to this record, so we can dealloc it 
		
		//SkipDBRecord_object_(r, NULL);
		SkipDBRecord_dealloc(r);
		
#ifdef DEBUG
		if (SkipDB_recordAt_(self, k))
		{
			printf("SkipDB_removeAt_ error - not removed\n");
			exit(-1);
		}
#endif
	}
}

void SkipDB_showUpdate(SkipDB *self)
{
	int i = self->header->level;
	
	printf("SkipDB update vector:\n");
	
	while (i --)
	{
		printf("  %i : %s\n", i, ByteArray_asCString(SkipDBRecord_key(self->update[i])));
	}
	
	printf("\n");
}

void SkipDB_show(SkipDB *self)
{
	SkipDBRecord *r = self->header;
	printf("\nSkipDB:\n");
	while (r)
	{
		SkipDBRecord_show(r);
		r = SkipDBRecord_recordAtLevel_(r, 0);
	}
	printf("\n\n");
}

// objects -------------------------------------------------- 

void SkipDB_objectMarkFunc_(SkipDB *self, SkipDBObjectMarkFunc *func)
{
	self->objectMarkFunc = func;
}

void SkipDB_freeObjectCallback_(SkipDB *self, SkipDBFreeObjectFunc *func)
{
	self->objectFreeFunc = func;
}

// cursor --------------------------------- 

int SkipDB_count(SkipDB *self)
{
	int count = 0;
	SkipDBRecord *r = SkipDB_firstRecord(self);
	
	if (!r) 
	{
		return 0;
	}
	
	count = 1;
	
	while ((r = SkipDBRecord_nextRecord(r)))
	{
		count ++;
	}
	return count;
}

SkipDBRecord *SkipDB_firstRecord(SkipDB *self)
{
	if (SkipDBRecord_level(self->header) == 0) return NULL;
	return SkipDBRecord_nextRecord(self->header);
}

SkipDBRecord *SkipDB_lastRecord(SkipDB *self)
{
	return SkipDBRecord_findLastRecord(self->header);
}

SkipDBRecord *SkipDB_goto_(SkipDB *self, Datum key)
{
	SkipDBRecord *r = SkipDB_recordAt_(self, key);
	
	if (!r)
	{
		r = self->update[0];
	}
	
	return r;
}

SkipDBCursor *SkipDB_createCursor(SkipDB *self)
{
	SkipDBCursor *cursor = SkipDBCursor_newWithDB_(self);
	List_append_(self->cursors, cursor);
	return cursor;
}

void SkipDB_removeCursor_(SkipDB *self, SkipDBCursor *cursor)
{
	SkipDBCursor_release(cursor);
	List_remove_(self->cursors, cursor);
	//SkipDBCursor_free(cursor);
}

// moving from in-memory to on-disk --------------------------------- 

void SkipDB_mergeInto_(SkipDB *self, SkipDB *other)
{
	SkipDBRecord *r = self->header;
	
	while ((r = SkipDBRecord_nextRecord(r)))
	{
		Datum k = SkipDBRecord_keyDatum(r);
		Datum v = SkipDBRecord_valueDatum(r);
		SkipDB_at_put_(other, k, v);
	}
}


syntax highlighted by Code2HTML, v. 0.9.1