/* Web Polygraph       http://www.web-polygraph.org/
 * (C) 2003-2006 The Measurement Factory
 * Licensed under the Apache License, Version 2.0 */

#include "base/polygraph.h"

#include "client/Client.h"
#include "cache/CacheEntry.h"
#include "cache/Cache.h"
#include "cache/DistrPoint.h"

Array<DistrPoint*> DistrPoint::TheWaits(1024);


DistrPoint::DistrPoint(Cache *aCache, CacheEntry *anEntry): 
	theCache(aCache), theEntry(anEntry),
	theReader(0), theWriter(0), theWaitIdx(-1), theToDo(0),
	gotReply(false) {
}

DistrPoint::~DistrPoint() {
	Assert(!theReader && !theWriter);
	if (theEntry)
		theEntry->theDistrPoint = 0;
	if (waiting())
		willNotCall();
}

void DistrPoint::addReader(CacheReader *r) {
	Assert(theEntry && !theReader);
	theReader = r;
	if (theReadySize >= 0)
		willCall(toPushData);
}

void DistrPoint::addWriter(CacheWriter *w) {
	Assert(theEntry && !theWriter);
	theWriter = w;
	// we do not know what the object size will be
	theReadySize = -1;
	theEntry->objSize(theReadySize);
}

void DistrPoint::delReader(CacheReader *r) {
	Assert(r && r == theReader);
	r->theDistrPoint = 0;
	theReader = 0;
	if (!theWriter)
		stop();
}

void DistrPoint::delWriter(CacheWriter *w) {
	Assert(w && w == theWriter);
	w->theDistrPoint = 0;
	theWriter = 0;
	if (!theReader)
		stop();
	// XXX: else should queue theReader->noteWriterLeft()
}

const RepHdr &DistrPoint::cachedRepHdr() const {
	Assert(hasRepHdr());
	return theRepHdr;
}

RepHdr &DistrPoint::repHdr() {
	Assert(!gotReply); // not a problem here, but clt should not call twice
	gotReply = true;
	return theRepHdr;
}


void DistrPoint::noteDataReady(Size readySize) {
	Assert(theReadySize <= readySize);
	theReadySize = readySize;
	willCall(toPushData);
}

void DistrPoint::mustProvideWriter() {
	Assert(theCache);
	Assert(theReader && !theWriter);
	Assert(theEntry);
	willCall(toProvideWriter);
}

void DistrPoint::provideWriter() {
	Assert(theCache);
	Assert(theCache->client());
	Assert(theReader && !theWriter);
	Assert(theEntry);
	if (!theCache->client()->fetch(theEntry->id(), this)) {
		Assert(!theWriter);
		theReader->noteWriterLeft(); // will call delReader
	}
	// it is unsafe to do anything here
}

void DistrPoint::pushData() {
	if (theReader)
		theReader->noteCacheReady();
	// it is unsafe to do anything here
}

void DistrPoint::callWriterLeft() {
	if (theReader)
		theReader->noteWriterLeft();
}

void DistrPoint::stop() {
	Assert(theEntry);
	Assert(!theReader && !theWriter);
	if (waiting())
		willNotCall();
	theEntry->stopDistributing(this);
	// stopDistributing destroys us
}

void DistrPoint::willCall(unsigned int toDo) {
	//cerr << here << "will do " << toDo << " at " << theWaitIdx << endl;
	if (toDo) {
		theToDo |= toDo;
		if (!waiting())
			theWaitIdx = WillCall(this);
	}
}

void DistrPoint::willNotCall() {
	Assert(waiting());
	WillNotCall(theWaitIdx);
	theToDo = 0;
	theWaitIdx = -1;
}

void DistrPoint::callWaiting() {
	//cerr << here << "doing " << theToDo << " at " << theWaitIdx << endl;
	Assert(waiting());
	theWaitIdx = -1;

	// one to-do call at a time!

	if (theToDo & toPushData) {
		theToDo &= ~toPushData;
		willCall(theToDo);
		pushData();
		return;
	} 

	if (theToDo & toProvideWriter) {
		theToDo &= ~toProvideWriter;
		willCall(theToDo);
		provideWriter();
		return;
	}

	if (theToDo & toCallWriterLeft) {
		theToDo &= ~toCallWriterLeft;
		willCall(theToDo);
		callWriterLeft();
		return;
	}

	Assert(false);
}

int DistrPoint::WillCall(DistrPoint *dp) {
	TheWaits.append(dp);
	return TheWaits.count()-1;
}

void DistrPoint::WillNotCall(int widx) {
	Assert(widx >= 0);
	if (widx < TheWaits.count())
		TheWaits[widx] = 0;
}

void DistrPoint::CallAll() {
	// It is probably safe to call all waits,
	// including those added during the loop.
	// Be fair, use SCAN approach, not "shortest seek"
	while (int count = TheWaits.count()) {
		for (int i = 0; i < count; ++i) {
			if (DistrPoint *dp = TheWaits[i])
				dp->callWaiting();
		}
		TheWaits.shift(count);
	}
}


syntax highlighted by Code2HTML, v. 0.9.1