/* -*- Mode:C++; c-basic-offset:4; tab-width:4; indent-tabs-mode:t -*- */
/*
* Copyright (c) 2000 International Computer Science Institute
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. All advertising materials mentioning features or use of this software
* must display the following acknowledgement:
* This product includes software developed by ACIRI, the AT&T
* Center for Internet Research at ICSI (the International Computer
* Science Institute).
* 4. Neither the name of ACIRI nor of ICSI may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY ICSI AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL ICSI OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "rate-limit.h"
#include "random.h"
#include "ident-tree.h"
#include "pushback.h"
// ######################### RateLmitSessionList Methods #####################
int
RateLimitSessionList::filter(Packet * pkt, int lowDemand) {
RateLimitSession * next = first_;
double dropP = -1;
while (next != NULL) {
double p = next->log(pkt, lowDemand);
if (p >= dropP) dropP = p;
next = next->next_;
}
#ifdef DEBUG_RLSL
if (dropP == -1) {
printf("RLSList: Found a non-member packet at %g\n", Scheduler::instance().clock());
fflush(stdout);
}
#endif
double u = Random::uniform();
if (u <= dropP) {
#ifdef DEBUG_RLSL
printf("RLSList:%d Dropping Packet in filter. \n", first_->logData_->myID_);
fflush(stdout);
#endif
return 1;
}
//not dropped
return 0;
}
int
RateLimitSessionList::insert(RateLimitSession * session) {
RateLimitSession * listItem = first_;
while (listItem != NULL) {
if ( listItem->aggSpec_->equals(session->aggSpec_) ) {
return 0;
}
listItem = listItem->next_;
}
session->setSucc(first_);
first_ = session;
session->localID_ = IDCounter_++;
noSessions_++;
return 1;
}
RateLimitSession *
RateLimitSessionList::containsLocalAggSpec(AggSpec * spec, int myID) {
RateLimitSession * listItem = first_;
while (listItem != NULL) {
if ( listItem->origin_== myID ) {
if ( listItem->aggSpec_->contains(spec) ) {
return listItem;
}
// else if ( spec->contains(listItem->aggSpec_) ) {
// return 2;
// }
}
listItem = listItem->next_;
}
return NULL;
}
int
RateLimitSessionList::containsAggSpec(AggSpec * spec) {
RateLimitSession * listItem = first_;
while (listItem != NULL) {
if ( listItem->aggSpec_->contains(spec) ) {
return 1;
}
listItem = listItem->next_;
}
return 0;
}
//merger will only take place for aggregates started at this node.
void
RateLimitSessionList::mergeSessions(int myID) {
RateLimitSession * session1 = first_;
while (session1 != NULL) {
AggSpec * agg1 = session1->aggSpec_;
RateLimitSession * session2 = session1->next_;
while (session2 != NULL) {
AggSpec * agg2 = session2->aggSpec_;
if (session1->origin_== myID && session2->origin_ == myID &&
agg1->dstON_ && agg2->dstON_) {
int bits = AggSpec::prefixBitsForMerger(agg1, agg2);
if (bits==0) {
//a measure of how much are we broadening.
int bitsDiff = ((agg1->dstBits_<agg2->dstBits_)? agg1->dstBits_: agg2->dstBits_) - bits;
int prefix = PrefixTree::getPrefixBits(agg1->dstPrefix_, bits);
int count = getMySubsetCount(prefix, bits, myID);
if (count <2) {
printf("Error: Anomaly \n");
exit(-1);
}
if (PushbackAgent::mergerAccept(count, bits, bitsDiff)) {
merge(prefix, bits, myID);
}
}
}
session2 = session2->next_;
}
session1 = session1->next_;
}
}
void
RateLimitSessionList::merge(int prefix, int bits, int myID) {
RateLimitSession * listItem = first_;
RateLimitSession * firstItem = NULL;
while (listItem != NULL) {
if ( listItem->origin_== myID &&
listItem->aggSpec_->subsetOfDst(prefix, bits) &&
!listItem->merged_) {
if (firstItem == NULL) {
firstItem = listItem;
} else {
//merge here with firstItem
firstItem = RateLimitSession::merge(firstItem, listItem, bits);
}
}
listItem = listItem->next_;
}
if (firstItem == NULL) {
printf("Error: Anomaly no 2\n");
exit(-1);
}
firstItem->aggSpec_->expand(prefix, bits);
}
int
RateLimitSessionList::getMySubsetCount(int prefix, int bits, int myID) {
RateLimitSession * listItem = first_;
int count=0;
while (listItem != NULL) {
if ( listItem->origin_== myID &&
listItem->aggSpec_->subsetOfDst(prefix, bits))
count++;
listItem = listItem->next_;
}
return count;
}
int
RateLimitSessionList::noMySessions(int myID) {
RateLimitSession * listItem = first_;
int count=0;
while (listItem != NULL) {
if ( listItem->origin_==myID &&
!listItem->merged_ ) {
count++;
}
listItem = listItem->next_;
}
return count;
}
RateLimitSession *
RateLimitSessionList::getSessionByLocalID(int localID) {
RateLimitSession * listItem = first_;
while (listItem != NULL) {
if (listItem->localID_ == localID) {
return listItem;
}
listItem = listItem->next_;
}
return NULL;
}
RateLimitSession *
RateLimitSessionList::getSessionByRemoteID(int remoteID) {
RateLimitSession * listItem = first_;
while (listItem != NULL) {
if (listItem->remoteID_ == remoteID) {
return listItem;
}
listItem = listItem->next_;
}
return NULL;
}
void
RateLimitSessionList::endSession(RateLimitSession * rls) {
if (first_==NULL) {
printf("RLSL: Error. No session in progress\n");
exit(-1);
}
if (first_==rls) {
first_=rls->next_;
noSessions_--;
delete(rls);
return;
}
RateLimitSession * previous = first_;
RateLimitSession * current = first_->next_;
while (current!=NULL) {
if (current == rls) {
previous->next_=current->next_;
noSessions_--;
delete(rls);
return;
}
previous = current;
current=current->next_;
}
printf("RLSL: Error. The correct RLS not found\n");
exit(-1);
}
//descending order
int
RateLimitSessionList::rankRate(int myID, double rate) {
int rank=0;
RateLimitSession * listItem = first_;
while (listItem != NULL) {
if (listItem->origin_ == myID && listItem->getArrivalRateForStatus() > rate) {
rank++;
}
listItem = listItem->next_;
}
return rank;
}
//ascending order
int
RateLimitSessionList::rankSession(int myID, RateLimitSession * session) {
int rank=0;
RateLimitSession * listItem = first_;
while (listItem != NULL) {
if (listItem->origin_ == myID) {
if (listItem->getArrivalRateForStatus() < session->getArrivalRateForStatus()) {
rank++;
}
//to enforce deterministic ordering between sessions with same rate
else if (listItem->getArrivalRateForStatus() == session->getArrivalRateForStatus() &&
listItem < session) {
rank++;
}
}
listItem = listItem->next_;
}
return rank;
}
// ############################# RateLmitSession Methods #####################
//local constructor
RateLimitSession::RateLimitSession(AggSpec * aggSpec, double rateEstimate, int initial,
double limit, int origin, int locQID,
double delay, double lowerBound, Node * node, RouteLogic * rtLogic):
pushbackON_(0), merged_(0), next_(NULL) {
aggSpec_ = aggSpec;
origin_ = origin;
remoteID_ = -1;
localQID_ = locQID;
remoteQID_ = -1;
heightInPTree_ = 0; //always begin as a leaf.
depthInPTree_ = 0;
startTime_ = Scheduler::instance().clock();
expiryTime_ = startTime_ + delay;
refreshTime_ = startTime_;
lowerBound_ = lowerBound;
initialPhase_=initial;
rlStrategy_ = new RateLimitStrategy(limit, aggSpec_->ptype_, aggSpec->ptypeShare_, rateEstimate);
logData_ = new LoggingDataStruct(node, rtLogic, aggSpec->getSampleAddress(), rateEstimate);
}
//remote constructor
RateLimitSession::RateLimitSession(AggSpec * aggSpec, double limit, int origin, int locQID,
int remoteQID, int remoteID, int depth, double delay,
double lowerBound, Node * node, RouteLogic * rtLogic):
pushbackON_(0), merged_(0), initialPhase_(0), next_(NULL) {
aggSpec_ = aggSpec;
origin_ = origin;
remoteID_ = remoteID;
localQID_ = locQID;
remoteQID_ = remoteQID;
heightInPTree_ = 0; //always begin as a leaf.
depthInPTree_ = depth;
startTime_ = Scheduler::instance().clock();
expiryTime_ = startTime_ + delay;
lowerBound_ = lowerBound;
rlStrategy_ = new RateLimitStrategy(limit, aggSpec_->ptype_, aggSpec->ptypeShare_, 0);
logData_ = new LoggingDataStruct(node, rtLogic, aggSpec->getSampleAddress(), 0);
}
double
RateLimitSession::log(Packet *pkt, int lowDemand) {
int member = aggSpec_->member(pkt);
if (member == 0) {
//printf("RLS: Found a non-member packet at %g\n", Scheduler::instance().clock());
return 0;
}
//expired session
// if (expiryTime_ < Scheduler::instance().clock()) {
// printf("RLS: Session expired at %g. expiryTime = %g\n",
// Scheduler::instance().clock(), expiryTime_);
// return 0;
// }
logData_->log(pkt); //log the packet
int mine = (origin_ == logData_->myID_);
double prob = rlStrategy_->process(pkt, mine, lowDemand); //rate limit it.
return prob;
}
double
RateLimitSession::getDropRate() {
return rlStrategy_->getDropRate();
}
void
RateLimitSession::pushbackOn() {
pushbackON_ = 1;
rlStrategy_->reset();
}
void
RateLimitSession::refreshed() {
refreshTime_ = Scheduler::instance().clock();
}
void
RateLimitSession::setAggSpec(AggSpec * aggSpec) {
aggSpec_->dstON_ = aggSpec->dstON_;
aggSpec_->dstPrefix_ = aggSpec->dstPrefix_;
aggSpec_->dstBits_ = aggSpec->dstBits_;
}
void
RateLimitSession::setLimit(double limit) {
rlStrategy_->target_rate_=limit;
}
double
RateLimitSession::getArrivalRateForStatus() {
// for a leaf PBA, this is the rate seen at the rlStrategy_;
// for non-leaf PBAs it is the sum of the rates reported by upstream PBAs
// in their status messages.
double rate;
if (pushbackON_) {
logData_->consolidateStatus();
rate = logData_->statusArrivalRateAll_;
}
else {
rate = rlStrategy_->getArrivalRate();
}
return rate;
}
RateLimitSession *
RateLimitSession::merge(RateLimitSession * session1, RateLimitSession * session2, int bits) {
RateLimitSession *winner, *loser;
if (session1->pushbackON_) {
winner = session1;
loser = session2;
} else {
winner = session2;
loser = session1;
}
loser->merged_=1;
int envelope;
if (session1->aggSpec_->dstBits_==bits)
envelope = 1;
else if (session2->aggSpec_->dstBits_==bits)
envelope=2;
else
envelope=0;
double lowerBound = pick4merge(session1->lowerBound_, session2->lowerBound_, envelope);
winner->lowerBound_=lowerBound;
double target_rate = pick4merge(session1->rlStrategy_->target_rate_,
session2->rlStrategy_->target_rate_,
envelope);
winner->setLimit(target_rate);
double estRate = pick4merge(session1->rlStrategy_->rateEstimator_->estRate_,
session2->rlStrategy_->rateEstimator_->estRate_,
envelope);
winner->rlStrategy_->rateEstimator_->estRate_=estRate;
LoggingDataStruct * log1 = session1->logData_;
LoggingDataStruct * log2 = session2->logData_;
if (log1->count_ != log2->count_ || log1->myID_ != log2->myID_) {
printf("RLS: Error: logdata count or ID anomaly\n");
exit(-1);
}
estRate = pick4merge(log1->rateEstimator_->estRate_,
log2->rateEstimator_->estRate_,
envelope);
winner->logData_->rateEstimator_->estRate_ = estRate;
LoggingDataStructNode * node1 = log1->first_;
LoggingDataStructNode * node2 = log2->first_;
LoggingDataStructNode * nodew = winner->logData_->first_;
while (node1 != NULL && node2!= NULL && nodew != NULL) {
if (node1->nid_ != node2->nid_) {
printf("RLS: Error: Out of order log nodes. Or something more sinister\n");
exit(-1);
}
estRate = pick4merge(node1->rateEstimator_->estRate_,
node2->rateEstimator_->estRate_,
envelope);
nodew->rateEstimator_->estRate_ = estRate;
double statusArrivalRate = pick4merge(node1->statusArrivalRate_,
node2->statusArrivalRate_,
envelope);
nodew->statusArrivalRate_ = statusArrivalRate;
node1=node1->next_;
node2=node2->next_;
nodew=nodew->next_;
}
if (node1 != NULL || node2 !=NULL || nodew != NULL) {
printf("RLS: Error: Different chains\n");
exit(-1);
}
return winner;
}
double
RateLimitSession::pick4merge(double q1, double q2, int envelope) {
if (envelope == 1) {
return q1;
} else if (envelope == 2) {
return q2;
}
return q1+q2;
}
RateLimitSession::~RateLimitSession() {
delete(aggSpec_);
delete(rlStrategy_);
delete(logData_);
}
syntax highlighted by Code2HTML, v. 0.9.1