/* -*- Mode:C++; c-basic-offset:4; tab-width:4; indent-tabs-mode:t -*- */
/*
* Copyright (c) 2000 International Computer Science Institute
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. All advertising materials mentioning features or use of this software
* must display the following acknowledgement:
* This product includes software developed by ACIRI, the AT&T
* Center for Internet Research at ICSI (the International Computer
* Science Institute).
* 4. Neither the name of ACIRI nor of ICSI may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY ICSI AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL ICSI OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "pushback.h"
#include "ident-tree.h"
#include "pushback-queue.h"
#include "rate-limit.h"
#include "pushback-message.h"
//#define DEBUG
int hdr_pushback::offset_;
static class PushbackHeaderClass : public PacketHeaderClass {
public:
PushbackHeaderClass() : PacketHeaderClass("PacketHeader/Pushback",
sizeof(hdr_pushback)) {
bind_offset(&hdr_pushback::offset_);
}
} class_Pushback_hdr;
static class PushbackClass : public TclClass {
public:
PushbackClass() : TclClass("Agent/Pushback") {}
TclObject* create(int, const char*const*) {
return (new PushbackAgent());
}
} class_Pushback;
PushbackAgent::PushbackAgent() : Agent(PT_PUSHBACK), last_index_(0), intResult_(-1) {
bind("last_index_", &last_index_);
bind("intResult_", &intResult_);
bind_bool("enable_pushback_", &enable_pushback_);
bind_bool("verbose_", &verbose_);
timer_ = new PushbackTimer(this);
debugLevel = 3;
// debugLevel = 0;
}
int
PushbackAgent::command(int argc, const char*const* argv) {
Tcl& tcl = Tcl::instance();
if (argc == 4 ) {
if (strcmp(argv[1], "initialize") == 0) {
//get the node and routeLogic object
node_ = (Node *)TclObject::lookup(argv[2]);
rtLogic_ = (RouteLogic *)TclObject::lookup(argv[3]);
if (node_ == NULL || rtLogic_ == NULL) {
if (verbose_) printf("Improper Initialization for Pushback Agent\n");
return(TCL_ERROR);
}
sprintf(prnMsg, "node=%s rtLogic=%s id=%d address=%d\n", node_->name(),
rtLogic_->name(), node_->nodeid(), node_->address());
printMsg(prnMsg,0);
return(TCL_OK);
}
}
else if (argc == 3) {
//$pba add-queue $queue
if (strcmp(argv[1], "add-queue") == 0) {
if (last_index_==MAX_QUEUES) {
printf("queue list size exhausted - recompile with a bigger MAX_QUEUES\n");
exit(-1);
}
PushbackQueue * queue = (PushbackQueue *) TclObject::lookup(argv[2]);
if (queue == NULL) {
printf("NULL queue passed \n");
exit(-1);
}
int index = last_index_++;
queue_list_[index].pbq_ = queue;
queue_list_[index].idTree_ = new IdentStruct();
tcl.resultf("%d", index);
return (TCL_OK);
}
}
return (Agent::command(argc, argv));
}
void
PushbackAgent::reportDrop(int qid, Packet * p) {
if (!checkQID(qid)) {
sprintf(prnMsg,"Got invalid qid %d\n", qid);
printMsg(prnMsg,0);
exit(-1);
}
hdr_ip * iph = hdr_ip::access(p);
ns_addr_t src = iph->src();
ns_addr_t dst = iph->dst();
int fid = iph->flowid();
sprintf(prnMsg,"DropDetails from queue %d: %d.%d -> %d.%d (%d)\n", qid,
src.addr_, src.port_, dst.addr_, dst.port_, fid);
printMsg(prnMsg, 5);
queue_list_[qid].idTree_->registerDrop(p);
}
void
PushbackAgent::calculateLowerBound(int qid, double arrRate) {
if (!checkQID(qid)) {
sprintf(prnMsg, "Got invalid id from queue in identifyAggregate\n");
printMsg(prnMsg,0);
exit(-1);
}
AggReturn * aggReturn = queue_list_[qid].idTree_->calculateLowerBound();
if (aggReturn == NULL) {
//not sure what to do here.
//maybe lower bound should be left as it is
return;
}
double lowerBound = 0;
int i = 0;
for (; i <= aggReturn->finalIndex_; i++) {
cluster currCluster = aggReturn->clusterList_[i];
AggSpec * aggSpec = new AggSpec(1, currCluster.prefix_, currCluster.bits_);
RateLimitSession * rls1 =
queue_list_[qid].pbq_->rlsList_->containsLocalAggSpec(aggSpec, node_->nodeid());
if (rls1 !=NULL) continue;
lowerBound = (currCluster.count_)*(arrRate/aggReturn->totalCount_);
sprintf(prnMsg, "LB: count: %d totalCount_: %d arrRate: %g\n", currCluster.count_, aggReturn->totalCount_, arrRate);
printMsg(prnMsg,0);
break;
}
if (i == aggReturn->finalIndex_+1) {
sprintf(prnMsg, "Warning: All clusters being rate limited\n");
printMsg(prnMsg,0);
//exit(-1);
}
queue_list_[qid].idTree_->setLowerBound(lowerBound, 1);
delete(aggReturn);
}
void
PushbackAgent::identifyAggregate(int qid, double arrRate, double linkBW) {
//set up refresh timer for this queue, if this is the firstime you come here.
if (!timer_->containsRefresh(qid)) {
PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
timer_->insert(event);
}
// if (debug_)
sprintf(prnMsg, "identifyAggregate for %d\n", qid);
printMsg(prnMsg,0);
if (!checkQID(qid)) {
sprintf(prnMsg, "Got invalid id from queue in identifyAggregate\n");
printMsg(prnMsg,0);
exit(-1);
}
if (verbose_) queue_list_[qid].idTree_->traverse();
//this is a quick way of achieving this.
//but it can be justified on some grounds. will do a check with Sally later.
int noSessions = queue_list_[qid].pbq_->rlsList_->noMySessions(node_->nodeid());
// if (noSessions >= MAX_SESSIONS) {
// sprintf(prnMsg, "My hands are full\n");
// printMsg(prnMsg,0);
// return;
// }
AggReturn * aggReturn = queue_list_[qid].idTree_->identifyAggregate(arrRate, linkBW);
if (aggReturn == NULL) return;
for (int i=0; i<=aggReturn->finalIndex_; i++) {
cluster currCluster = aggReturn->clusterList_[i];
AggSpec * aggSpec = new AggSpec(1, currCluster.prefix_, currCluster.bits_);
//don't insert the same aggregate again.
RateLimitSession * rls1 =
queue_list_[qid].pbq_->rlsList_->containsLocalAggSpec(aggSpec, node_->nodeid());
if (rls1 != NULL) {
sprintf(prnMsg, "got subset aggregate. Lowerbound = %g. agg = ", aggReturn->limit_);
printMsg(prnMsg,0);
aggSpec->print(); fflush(stdout);
delete(aggSpec);
//this could keep the lowerbound unnecessarily down.
//but don't be sympathetic with aggregates, which have been identified again.
if (aggReturn->limit_ < rls1->lowerBound_) {
rls1->lowerBound_ = aggReturn->limit_;
}
//set the last misbehavior signal.
rls1->refreshed();
continue;
}
double estimate = (currCluster.count_)*(arrRate/aggReturn->totalCount_);
if (noSessions >= MAX_SESSIONS) {
int rank = queue_list_[qid].pbq_->rlsList_->rankRate(node_->nodeid(), estimate);
if (rank >= MAX_SESSIONS) {
sprintf(prnMsg, "got rate <= minRate. agg = ");
printMsg(prnMsg,0);aggSpec->print(); fflush(stdout);
delete(aggSpec);
continue;
}
}
sprintf(prnMsg, "starting rate-limiting lower=%g estimate=%g agg ",
aggReturn->limit_, estimate);
printMsg(prnMsg,0);
aggSpec->print(); fflush(stdout);
double initialLimit = estimate; //*(1 - ambientDropRate);
RateLimitSession * rls = new RateLimitSession(aggSpec, estimate, 1, initialLimit,
node_->nodeid(), qid,
RATE_LIMIT_TIME_DEFAULT, aggReturn->limit_,
node_, rtLogic_);
queue_list_[qid].pbq_->rlsList_->insert(rls);
PushbackEvent * event = new PushbackEvent(INITIAL_UPDATE_TIME, INITIAL_UPDATE_EVENT, rls);
timer_->insert(event);
// }
noSessions++;
}
queue_list_[qid].idTree_->setLowerBound(aggReturn->limit_, 0);
delete(aggReturn);
}
void
PushbackAgent::resetDropLog(int qid) {
sprintf(prnMsg, " drop log reset for qid %d\n", qid);
printMsg(prnMsg,5);
if (!checkQID(qid)) {
printf("Got invalid id from queue in resetDropLog\n");
exit(-1);
}
queue_list_[qid].idTree_->reset();
}
void
PushbackAgent::timeout(PushbackEvent * event) {
sprintf(prnMsg, " %s event for qid %d\n", PushbackEvent::type(event), event->qid_);
printMsg(prnMsg,0);
switch (event->eventID_) {
case PUSHBACK_CHECK_EVENT: pushbackCheck(event->rls_);
break;
case PUSHBACK_REFRESH_EVENT: pushbackRefresh(event->qid_);
break;
case PUSHBACK_STATUS_EVENT: pushbackStatus(event->rls_);
break;
case INITIAL_UPDATE_EVENT: initialUpdate(event->rls_);
break;
default: sprintf(prnMsg, " Unrecognized event %d\n", event->eventID_);
printMsg(prnMsg,0);
break;
}
}
void
PushbackAgent::initialUpdate(RateLimitSession * rls) {
if ( !rls->initialPhase_ ) {
sprintf(prnMsg, " Error: Update when not in initialphase\n");
printMsg(prnMsg,0);
exit(-1);
}
double qdrop = queue_list_[rls->localQID_].pbq_->getDropRate();
double dropRate = rls->getDropRate();
double arrRate = rls->getArrivalRateForStatus();
double newLimit = arrRate*(1 - 2*(dropRate+qdrop));
sprintf(prnMsg,"Initial-Update: qdrop=%g dr=%g newL=%g oldTarget=%g lowerBound=%g arr=%g\n",
qdrop, dropRate, newLimit, rls->rlStrategy_->target_rate_, rls->lowerBound_, arrRate);
printMsg(prnMsg,0);
//cancel right now, if arrRate is significantly less than lower bound.
if (arrRate < 0.75*rls->lowerBound_) {
#ifdef DEBUG
double now = Scheduler::instance().clock();
printf("Cancel pushback A time: %5.3f\n", now);
#endif
pushbackCancel(rls);
return;
}
if (newLimit > rls->lowerBound_) {
rls->setLimit(newLimit);
PushbackEvent * event = new PushbackEvent(INITIAL_UPDATE_TIME, INITIAL_UPDATE_EVENT, rls);
timer_->insert(event);
}
else {
rls->setLimit(rls->lowerBound_);
rls->initialPhase_ = 0;
if (rls->logData_->count_!=0 && enable_pushback_) {
PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
timer_->insert(event);
}
}
}
void
PushbackAgent::pushbackCheck(RateLimitSession * rls) {
double dropRate = rls->getDropRate();
if (dropRate >= DROP_RATE_FOR_PUSHBACK) {
rls->pushbackOn();
rls->heightInPTree_++;
double totalRate = rls->rlStrategy_->target_rate_;
int count = rls->logData_->count_;
double fairShare = totalRate/count;
int done = count;
//max-min allocation of limit.
while (done != 0) {
LoggingDataStructNode * lgdsNode = rls->logData_->first_;
int countThisRound=0;
while (lgdsNode != NULL) {
double rate = lgdsNode->rateEstimator_->estRate_;
if (rate <= fairShare && !lgdsNode->pushbackSent_) {
AggSpec * aggSpec = rls->aggSpec_->clone();
PushbackMessage * msg;
if (rate < fairShare/2.0) {
msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
rls->localID_, aggSpec, INFINITE_LIMIT,
rls->depthInPTree_);
lgdsNode->pushbackSent(INFINITE_LIMIT, rate);
}
else {
msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
rls->localID_, aggSpec, rate, rls->depthInPTree_);
lgdsNode->pushbackSent(rate, rate);
}
sendMsg(msg);
countThisRound++;
done--;
totalRate -= rate;
}
lgdsNode = lgdsNode->next_;
}
if (done == 0) break;
if (countThisRound==0) {
//allocate fairshare to everyone and end.
LoggingDataStructNode * lgdsNode = rls->logData_->first_;
while (lgdsNode != NULL) {
if (!lgdsNode->pushbackSent_) {
AggSpec * aggSpec = rls->aggSpec_->clone();
PushbackMessage * msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_,
rls->localQID_, rls->localID_,
aggSpec, fairShare,
rls->depthInPTree_);
lgdsNode->pushbackSent(fairShare,lgdsNode->rateEstimator_->estRate_);
sendMsg(msg);
done--;
totalRate-=fairShare;
}
lgdsNode = lgdsNode->next_;
}
}
else {
fairShare= totalRate/done;
}
}
}
else {
//set up pushback check for later.
PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
timer_->insert(event);
}
}
void
PushbackAgent::pushbackStatus(RateLimitSession * rls) {
if (rls->pushbackON_) {
sprintf(prnMsg, " Warning: status timer expired for non-leaf node\n");
printMsg(prnMsg,0);
//exit(-1);
}
double rate = rls->getArrivalRateForStatus();
rls->logData_->resetStatus();
PushbackMessage * msg = new PushbackStatusMessage(node_->nodeid(), rls->origin_,
rls->remoteQID_, rls->remoteID_,
rate, rls->heightInPTree_);
sendMsg(msg);
}
void
PushbackAgent::pushbackRefresh(int qid) {
PushbackQueue * pbq = queue_list_[qid].pbq_;
int oldSessions = pbq->rlsList_->noMySessions(node_->nodeid());
if (!oldSessions) {
//set up refresh timers for a later time and return.
// PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
// timer_->insert(event);
return;
}
int noSessions = oldSessions;
if (MERGER_MODE == 1) {
pbq->rlsList_->mergeSessions(node_->nodeid());
noSessions = pbq->rlsList_->noMySessions(node_->nodeid());
if (noSessions!=oldSessions) {
sprintf(prnMsg, " Some sessions merged. old = %d new = %d\n", oldSessions, noSessions);
printMsg(prnMsg,0);
//get rid of merged RLS's
RateLimitSession * listItem = pbq->rlsList_->first_;
while (listItem != NULL) {
if (listItem->origin_ == node_->nodeid() && listItem->merged_) {
pushbackCancel(listItem);
listItem = listItem->next_;
}
}
} else {
sprintf(prnMsg, " No sessions merged. number = %d\n", noSessions);
printMsg(prnMsg,0);
}
} else {
sprintf(prnMsg, "Number of sessions = %d\n", noSessions);
printMsg(prnMsg,0);
}
double now = Scheduler::instance().clock();
//check if some sessions need to be discarded because of rate-limiting too many sessions
RateLimitSession * listItem1 = pbq->rlsList_->first_;
while (noSessions > MAX_SESSIONS && listItem1 != NULL) {
int rank = pbq->rlsList_->rankRate(node_->nodeid(), listItem1->getArrivalRateForStatus());
if (listItem1->origin_ == node_->nodeid() &&
rank >= MAX_SESSIONS && (now - listItem1->startTime_) >= EARLIEST_TIME_TO_FREE) {
sprintf(prnMsg,"Releasing because of too many being rate-limited\n");
printMsg(prnMsg,0);
if (LOWER_BOUND_MODE == 1 &&
queue_list_[qid].idTree_->lowerBound_ < listItem1->getArrivalRateForStatus()) {
queue_list_[qid].idTree_->lowerBound_ = listItem1->getArrivalRateForStatus();
}
pushbackCancel(listItem1);
noSessions--;
}
listItem1 = listItem1->next_;
}
double linkBW = pbq->getBW();
double arrRate = pbq->getRate();
double targetRate = linkBW/(1 - TARGET_DROPRATE);
double totalRateLimitedArrivalRate = 0;
double totalLimit=0;
double lowerBound=-1;
RateLimitSession * listItem = pbq->rlsList_->first_;
while (listItem != NULL) {
if (listItem->origin_ == node_->nodeid() && !listItem->merged_) {
double sessionArrRate = listItem->getArrivalRateForStatus();
double sessionLimit = listItem->rlStrategy_->target_rate_;
totalRateLimitedArrivalRate+= sessionArrRate;
totalLimit+= (sessionArrRate > sessionLimit)? sessionLimit: sessionArrRate;
if (listItem->lowerBound_ < lowerBound || lowerBound == -1) {
lowerBound = listItem->lowerBound_;
}
}
listItem = listItem->next_;
}
if (LOWER_BOUND_MODE == 1) {
lowerBound = queue_list_[qid].idTree_->lowerBound_;
}
double excessRate = (arrRate - totalLimit + totalRateLimitedArrivalRate) - targetRate;
sprintf(prnMsg,"arr=%g totalLimit=%g totalRateLimit=%g excess=%g\n", arrRate, totalLimit,
totalRateLimitedArrivalRate, excessRate);
printMsg(prnMsg,0);
if (excessRate < 0) {
sprintf(prnMsg, "Negative Excess Rate. Things maybe fine now.\n");
printMsg(prnMsg,0);
//this would make all sessions go away after a while.
#ifdef DEBUG
printf("Negative Excess Rate - time: %5.3f\n", now);
#endif
requiredLimit_ = 2*totalRateLimitedArrivalRate;
} else {
//Should we allow such an abrupt increase when the number of sessions
// changes?
// How about: Let L be the requiredLimit.
// We need Sum (session arrival rate - L ) = excessRate
requiredLimit_ = (totalRateLimitedArrivalRate - excessRate)/noSessions;
if (requiredLimit_ < lowerBound) {
requiredLimit_ = lowerBound;
}
#ifdef DEBUG
printf("New requiredLimit - time: %5.3f limit: %5.3f lowerBound:%5.3f \n", now, requiredLimit_, lowerBound);
#endif
}
sprintf(prnMsg,"Refresh. target=%g limit=%g floor=%g\n", targetRate, requiredLimit_,
lowerBound);
printMsg(prnMsg,0);
//consider all sessions in ascending order of their arrival rate
for (int i=0; i<noSessions; i++) {
listItem = pbq->rlsList_->first_;
while (listItem != NULL ) {
if (listItem->origin_ == node_->nodeid() &&
pbq->rlsList_->rankSession(node_->nodeid(),listItem) == i)
break;
listItem = listItem->next_;
}
if (listItem == NULL) {
printf("Error: Rank %d not found\n", i);
exit(0);
}
double oldLimit = listItem->rlStrategy_->target_rate_;
double sendRate = listItem->getArrivalRateForStatus();
#ifdef DEBUG
printf("time: %5.3f ID: %d sendRate %5.3f oldLimit %5.3f requiredLimit %5.3f\n", now,
listItem->localID_, sendRate, oldLimit, requiredLimit_);
#endif
//Session sending less than the limit.
if (sendRate < requiredLimit_) {
//if it has been sending less for "some" time.
if (now - listItem->refreshTime_ >= MIN_TIME_TO_FREE) {
#ifdef DEBUG
printf("time: %5.3f ID: %d refreshTime %5.3f MIN %d Cancel pushback B \n",
now, listItem->localID_, listItem->refreshTime_, MIN_TIME_TO_FREE);
#endif
pushbackCancel(listItem); //cancel rate-limiting
requiredLimit_+= (requiredLimit_ - sendRate)/(noSessions - i - 1);
i--; noSessions--;
}
else {
//refresh upstream with double of max(sending rate, old limit)
//just using sending rate, limits the amount an aggregate can grow till next refresh
//using just old limit is tricky when different aggregates have different limits.
//at the same time, we would prefer not to loosen the hold too much in one step.
#ifdef DEBUG
printf("time: %5.3f ID: %d double limit\n", now, listItem->localID_);
#endif
double maxR = sendRate>oldLimit? sendRate: oldLimit;
if (now - listItem->refreshTime_ <= PRIMARY_WAITING_ZONE) {
sprintf(prnMsg,"Waiting Zone 1: sendRate=%g oldLimit=%g\n", sendRate, oldLimit);
printMsg(prnMsg,0);
}
else {
sprintf(prnMsg,"Waiting Zone 2: sendRate=%g oldLimit=%g\n", sendRate, oldLimit);
printMsg(prnMsg,0);
maxR *= 1.5;
}
if (maxR < requiredLimit_) {
listItem->setLimit(maxR);
requiredLimit_ += (requiredLimit_ - maxR)/(noSessions - i - 1);
}
else {
listItem->setLimit(requiredLimit_);
}
if (listItem->pushbackON_)
refreshUpstreamLimits(listItem);
}
}
else {
//change the rate limit most half way.
double newLimit;
if (oldLimit > 1.25 * requiredLimit_ || oldLimit ==0)
newLimit = requiredLimit_;
else
newLimit = 0.5*requiredLimit_ + 0.5*oldLimit;
if (newLimit < lowerBound)
newLimit = lowerBound;
listItem->refreshed();
listItem->setLimit(newLimit);
if (listItem->pushbackON_)
refreshUpstreamLimits(listItem);
#ifdef DEBUG
printf("time: %5.3f ID: %d newLimit %5.3f oldLimit %5.3f requiredLimit %5.3f\n",
now, listItem->localID_, newLimit, oldLimit, requiredLimit_);
#endif
}
}
//setup refresh timer again
noSessions = pbq->rlsList_->noMySessions(node_->nodeid());
if (noSessions) {
PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
timer_->insert(event);
}
}
void
PushbackAgent::pushbackCancel(RateLimitSession * rls) {
sprintf(prnMsg,"Stopping rate-limiting for aggregate: ");
printMsg(prnMsg,0);
rls->aggSpec_->print();
fflush(stdout);
#ifdef DEBUG
double now = Scheduler::instance().clock();
printf("time: %5.3f ID: %d Cancel pushback C\n", now, rls->localID_);
#endif
if (rls->pushbackON_) {
LoggingDataStructNode * lgdsNode = rls->logData_->first_;
while (lgdsNode != NULL) {
PushbackMessage * msg = new PushbackCancelMessage(node_->nodeid(), lgdsNode->nid_,
rls->localQID_, rls->localID_);
sendMsg(msg);
lgdsNode = lgdsNode->next_;
}
}
//remove all events that point to this rls.
timer_->removeEvents(rls);
//local cancellation here.
queue_list_[rls->localQID_].pbq_->rlsList_->endSession(rls);
}
//######################## Message Receiving Code #####################
void
PushbackAgent::recv(Packet * pkt, Handler * h) {
hdr_pushback * hdr_push = ((hdr_pushback*)pkt)->access(pkt);
PushbackMessage * msg = hdr_push->msg_;
sprintf(prnMsg, " %s msg from %d\n", PushbackMessage::type(msg), msg->senderID_);
printMsg(prnMsg,0);
switch (msg->msgID_) {
case PUSHBACK_REQUEST_MSG : processPushbackRequest((PushbackRequestMessage *)msg);
break;
case PUSHBACK_STATUS_MSG : processPushbackStatus((PushbackStatusMessage *) msg);
break;
case PUSHBACK_REFRESH_MSG : processPushbackRefresh((PushbackRefreshMessage *) msg);
break;
case PUSHBACK_CANCEL_MSG : processPushbackCancel((PushbackCancelMessage *) msg);
break;
default: fprintf(stderr,"PBA: %s Undefined Message ID %d\n", name(),msg->msgID_);
}
delete(msg);
}
void
PushbackAgent::processPushbackRequest(PushbackRequestMessage * msg) {
int qid = getQID(msg->senderID_);
sprintf(prnMsg, " pushback request from %d for qid=%d limit=%g\n", msg->senderID_,
qid, msg->limit_);
printMsg(prnMsg,0);
AggSpec * aggSpec = msg->aggSpec_;
if (queue_list_[qid].pbq_->rlsList_->containsAggSpec(aggSpec)) {
fprintf(stdout,"PBA: %s got a pushback req for agg I already rate-limit. \
Feature not yet Implemented\n",name());
exit(-1);
}
RateLimitSession * rls = new RateLimitSession(aggSpec, msg->limit_, msg->senderID_, qid,
msg->qid_, msg->rlsID_, msg->depth_+1,
RATE_LIMIT_TIME_DEFAULT, -1, node_, rtLogic_);
queue_list_[qid].pbq_->rlsList_->insert(rls);
//pushback propagation check if there are valid upstream neighbors && enable_pushback_
if (rls->logData_->count_ && enable_pushback_) {
PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
timer_->insert(event);
}
}
void
PushbackAgent::processPushbackStatus(PushbackStatusMessage * msg) {
int qid = msg->qid_;
if (!checkQID(qid)) {
sprintf(prnMsg, " Got invalid qid from %d in status message\n", msg->senderID_);
printMsg(prnMsg,0);
exit(-1);
}
RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByLocalID(msg->rlsID_);
if (rls == NULL) {
sprintf(prnMsg, " session %d not found\n", msg->rlsID_);
printMsg(prnMsg,0);
exit(-1);
}
//increase your height if you need to.
if (msg->height_ + 1 > rls->heightInPTree_) {
rls->heightInPTree_ = msg->height_ + 1;
sprintf(prnMsg, " height increased to %d\n", rls->heightInPTree_);
printMsg(prnMsg,0);
}
rls->logData_->registerStatus(msg->senderID_, msg->arrivalRate_);
sprintf(prnMsg, " got rate %g\n", msg->arrivalRate_);
printMsg(prnMsg,0);
//send status if you are not root.
if (rls->origin_!= node_->nodeid()) {
// 1. check to see if status from all the upstream neighbors has arrived.
// 2. if yes, then send status downstream.
int gotAll = rls->logData_->consolidateStatus();
if (gotAll==1) {
//send status down
double rate = rls->logData_->statusArrivalRateAll_;
PushbackMessage * msg = new PushbackStatusMessage(node_->nodeid(), rls->origin_,
rls->remoteQID_, rls->remoteID_,
rate, rls->heightInPTree_);
sendMsg(msg);
timer_->cancelStatus(rls);
//reset status arrivals.
rls->logData_->resetStatus();
}
}
}
void
PushbackAgent::processPushbackRefresh(PushbackRefreshMessage *msg) {
int qid = getQID(msg->senderID_);
sprintf(prnMsg, " pushback refresh from %d for qid=%d with limit=%g\n", msg->senderID_, qid, msg->limit_);
printMsg(prnMsg,0);
RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByRemoteID(msg->rlsID_);
if (rls == NULL) {
sprintf(prnMsg, " session %d not found\n", msg->rlsID_);
printMsg(prnMsg,0);
exit(-1);
}
//1. change your own rate limit
rls->setAggSpec(msg->aggSpec_);
delete(msg->aggSpec_);
double newLimit = msg->limit_;
rls->setLimit(newLimit);
//2. if pushback has been propagated send out refreshes upstream with new limits
if (rls->pushbackON_) {
refreshUpstreamLimits(rls);
}
//3. set up status timer.
PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME - 0.1*rls->depthInPTree_,
PUSHBACK_STATUS_EVENT, rls);
timer_->insert(event);
}
void
PushbackAgent::processPushbackCancel(PushbackCancelMessage *msg) {
int qid = getQID(msg->senderID_);
sprintf(prnMsg, " pushback cancel from %d for queue index %d\n", msg->senderID_, qid);
printMsg(prnMsg,0);
RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByRemoteID(msg->rlsID_);
if (rls == NULL) {
sprintf(prnMsg, " session %d not found\n", msg->rlsID_);
printMsg(prnMsg,0);
exit(-1);
}
pushbackCancel(rls);
}
void
PushbackAgent::refreshUpstreamLimits(RateLimitSession * rls) {
double totalRate = rls->rlStrategy_->target_rate_;
int count = rls->logData_->count_;
double fairShare = totalRate/count;
int done = count;
double arrRate = rls->getArrivalRateForStatus();
sprintf(prnMsg, "Sending refresh messages to %d nodes. Limit = %g arrRate = %g\n", count, totalRate, arrRate);
printMsg(prnMsg,0);
int excess = 0;
if (totalRate > arrRate) {
excess = 1;
}
//max-min allocation of limit.
while (done != 0) {
LoggingDataStructNode * lgdsNode = rls->logData_->first_;
int countThisRound=0;
while (lgdsNode != NULL) {
double rate;
rate = lgdsNode->statusArrivalRate_;
if (rate <= fairShare && !lgdsNode->sentRefresh_) {
AggSpec * aggSpec = rls->aggSpec_->clone();
PushbackMessage * msg;
if (rate < fairShare/2.0) {
msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
rls->localID_, aggSpec, INFINITE_LIMIT);
lgdsNode->sentRefresh(INFINITE_LIMIT);
}
else if (!excess) {
msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
rls->localID_, aggSpec, rate);
lgdsNode->sentRefresh(rate);
}
else {
msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
rls->localID_, aggSpec, fairShare);
lgdsNode->sentRefresh(fairShare);
rate = fairShare;
}
sendMsg(msg);
countThisRound++;
done--;
totalRate -= rate;
}
lgdsNode = lgdsNode->next_;
}
if (done == 0) break;
if (countThisRound==0) {
//allocate fairshare to everyone and end.
LoggingDataStructNode * lgdsNode = rls->logData_->first_;
while (lgdsNode != NULL) {
if (!lgdsNode->sentRefresh_) {
AggSpec * aggSpec = rls->aggSpec_->clone();
PushbackMessage * msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_,
rls->localQID_, rls->localID_,
aggSpec, fairShare);
lgdsNode->sentRefresh(fairShare);
sendMsg(msg);
done--;
totalRate-=fairShare;
}
lgdsNode = lgdsNode->next_;
}
}
else {
fairShare = totalRate/done;
}
}
//reset all the sentRefresh bits
LoggingDataStructNode * lgdsNode = rls->logData_->first_;
while (lgdsNode != NULL) {
lgdsNode->sentRefresh_ = 0;
lgdsNode = lgdsNode->next_;
}
}
int
PushbackAgent::getQID(int sender) {
Tcl& tcl = Tcl::instance();
intResult_ = -1;
int index = 0;
// there gotta be better ways of doing this; todoLater.
// like make Tcl call you back and set a variable using command.
for (; index <last_index_; index++) {
tcl.evalf("%s set intResult_ [%s check-queue %d %d %s]", name(), name(),
node_->nodeid(), sender , queue_list_[index].pbq_->name());
if (intResult_ == 1) break;
}
if (index == last_index_) {
sprintf(prnMsg, " right queue not found\n");
printMsg(prnMsg,0);
exit(-1);
}
return index;
}
void
PushbackAgent::sendMsg(PushbackMessage * msg) {
Tcl& tcl = Tcl::instance();
dst_.addr_ = msg->targetID_;
//this assumes that all pushback agents have port zero.
tcl.evalf("%s set intResult_ [%s get-pba-port %d]", name(), name(),dst_.addr_ );
if ( intResult_ == -1 ) {
fprintf(stderr,"PBA: %s Pushback Agent not found on Node %d\n", name(), dst_.addr_);
return;
}
dst_.port_ = intResult_;
Packet *pkt = allocpkt();
hdr_pushback * hdr_push = ((hdr_pushback*)pkt)->access(pkt);
hdr_push->msg_ = msg;
sprintf(prnMsg, " sent %s message to %d.%d\n", PushbackMessage::type(msg), dst_.addr_, dst_.port_);
printMsg(prnMsg,4);
send(pkt,0);
}
void
PushbackAgent::printMsg(char * msg, int msgLevel) {
if (msgLevel < debugLevel) {
if (verbose_) printf("PBA:%d (%g) %s", node_->nodeid(), Scheduler::instance().clock(), msg);
fflush(stdout);
}
}
int
PushbackAgent::checkQID(int qid) {
if (qid < 0 || qid >= last_index_)
return 0;
else
return 1;
}
//decide whether to accept a merger involving "count" aggregates,
//the number of bits in the resultant aggregate would be "bits"
//the aggregate is being broadended by "bitsDiff" (measured from shortest prefix)
int
PushbackAgent::mergerAccept(int count, int bits, int bitsDiff) {
//todo: think of a smarter way.
//currently merge if bits < some value.
//return (bits <= MIN_BITS_FOR_MERGER);
return 0;
}
// ############################### PushbackTimer Methods ############################
void
PushbackTimer::expire(Event *e) {
if (firstEvent_ == NULL) {
printf("PushbackTimer: No event found on expiry\n");
exit(-1);
}
PushbackEvent * event = firstEvent_;
firstEvent_= firstEvent_->next_;
schedule();
agent_->timeout(event);
delete(event);
}
void
PushbackTimer::insert(PushbackEvent * event) {
sprintf(agent_->prnMsg,"%s timer set\n", PushbackEvent::type(event));
agent_->printMsg(agent_->prnMsg,4);
if (firstEvent_ == NULL) {
firstEvent_ = event;
schedule();
return;
}
if (event->time_ < firstEvent_->time_) {
event->setSucc(firstEvent_);
firstEvent_=event;
schedule();
return;
}
PushbackEvent * listItem = firstEvent_;
while (listItem->next_!=NULL && listItem->next_->time_ <= event->time_) {
listItem = listItem->next_;
}
event->setSucc(listItem->next_);
listItem->setSucc(event);
//comment the sanity check out later
sanityCheck();
return;
}
void
PushbackTimer::removeEvents(RateLimitSession * rls) {
if (firstEvent_==NULL) return;
while (firstEvent_!= NULL && firstEvent_->rls_==rls) {
cancel();
PushbackEvent * event = firstEvent_;
firstEvent_=firstEvent_->next_;
delete(event);
schedule();
}
if (firstEvent_==NULL) return;
PushbackEvent * previous = firstEvent_;
PushbackEvent * current = firstEvent_->next_;
while (current!=NULL) {
if (current->rls_==rls) {
previous->next_=current->next_;
delete(current);
current = previous->next_;
continue;
}
previous=current;
current=current->next_;
}
}
void
PushbackTimer::schedule() {
if (firstEvent_== NULL) {
sprintf(agent_->prnMsg,"Timer: Nothing to schedule\n");
agent_->printMsg(agent_->prnMsg, 0);
return;
}
resched(firstEvent_->time_ - Scheduler::instance().clock());
}
void
PushbackTimer::cancelStatus(RateLimitSession * rls) {
if (firstEvent_==NULL) {
sprintf(agent_->prnMsg, " Error timer list empty\n");
agent_->printMsg(agent_->prnMsg, 0);
//return;
exit(-1);
}
if (firstEvent_->eventID_==PUSHBACK_STATUS_EVENT && firstEvent_->rls_==rls) {
cancel();
PushbackEvent * event = firstEvent_;
firstEvent_=firstEvent_->next_;
delete(event);
schedule();
return;
}
PushbackEvent * previous = firstEvent_;
PushbackEvent * current = firstEvent_->next_;
while (current!=NULL) {
if (current->eventID_ == PUSHBACK_STATUS_EVENT && current->rls_==rls) {
previous->next_=current->next_;
delete(current);
return;
}
previous=current;
current=current->next_;
}
sprintf(agent_->prnMsg, "Error status timer not found\n");
agent_->printMsg(agent_->prnMsg, 0);
exit(-1);
}
int
PushbackTimer::containsRefresh(int qid) {
PushbackEvent * listItem = firstEvent_;
while (listItem!=NULL) {
if (listItem->eventID_ == PUSHBACK_REFRESH_EVENT && listItem->qid_==qid)
return 1;
listItem = listItem->next_;
}
return 0;
}
void
PushbackTimer::sanityCheck() {
if (firstEvent_==NULL || firstEvent_->next_ == NULL) return;
PushbackEvent * listItem = firstEvent_;
while (listItem->next_!=NULL) {
if (listItem->time_ > listItem->next_->time_) {
sprintf(agent_->prnMsg, "Sanity Check Failed\n");
agent_->printMsg(agent_->prnMsg, 0);
exit(-1);
}
listItem = listItem->next_;
}
}
syntax highlighted by Code2HTML, v. 0.9.1