/* -*- c++ -*-
priqueue.cc
A simple priority queue with a remove packet function
$Id: dsr-priqueue.cc,v 1.2 2002/09/11 20:11:09 buchheim Exp $
*/
#include <object.h>
#include <packet.h>
#include <cmu-trace.h>
#include <dsr-priqueue.h>
#define PRIQUEUE_DEBUG 0
static class CMUPriQueueClass : public TclClass {
public:
CMUPriQueueClass() : TclClass("CMUPriQueue") {}
TclObject* create(int, const char*const*) {
return (new CMUPriQueue);
}
} class_CMUPriQueue;
/* ======================================================================
Priority Queue Handler
====================================================================== */
void CMUPriQueueHandler::handle(Event*)
{
qh_ifq->prq_resume();
}
/* ======================================================================
CMUPriQueue - public routines
====================================================================== */
CMUPriQueue::CMUPriQueue() : Connector(), prq_qh_(this)
{
int i;
for(i = 0; i < IFQ_MAX; i++) {
prq_snd_[i].ifq_head = prq_snd_[i].ifq_tail = 0;
prq_snd_[i].ifq_len = 0;
prq_snd_[i].ifq_maxlen = IFQ_MAXLEN;
prq_snd_[i].ifq_drops = 0;
last_ifqlen_[i] = 0;
}
prq_logtarget_ = 0; // no logging target by default
prq_ipaddr_ = 0;
prq_blocked_ = 0;
bind("qlen_logthresh_", &qlen_logthresh_);
bind("fw_logthresh_", &fw_logthresh_);
assert(qlen_logthresh_ > 0 && fw_logthresh_ > 0);
bzero(last_ifqlen_, sizeof(last_ifqlen_));
stat_send_ = stat_recv_ = stat_blocked_ = 0;
}
int
CMUPriQueue::command(int argc, const char*const* argv)
{
if (argc == 2 && strcasecmp(argv[1], "reset") == 0) {
Terminate();
//FALL-THROUGH to give parents a chance to reset
} else if(argc == 3) {
if(strcmp(argv[1], "logtarget") == 0) {
prq_logtarget_ = (Trace*) TclObject::lookup(argv[2]);
assert(prq_logtarget_);
return (TCL_OK);
}
else if(strcmp(argv[1], "ipaddr") == 0) {
prq_ipaddr_ = atoi(argv[2]);
assert(prq_ipaddr_ > 0);
return (TCL_OK);
}
}
return Connector::command(argc, argv);
}
void
CMUPriQueue::log_stats()
{
int q, s = 0, d = 0;
assert(IFQ_MAX == 4);
/*
* avoid logging useless information
*/
for(q = 0; q < IFQ_MAX; q++) {
s += prq_snd_[q].ifq_len;
d += (prq_snd_[q].ifq_len - last_ifqlen_[q]);
}
if(d < 0)
d = -d;
/*
* Only write an entry to the log file is the queue length
* or "delta change" over the last second meets the threshold
* level OR, if this node is generally busy (forwarding more
* than a certain number of packets per second).
*/
if(s >= qlen_logthresh_ || d >= qlen_logthresh_ ||
stat_recv_ > fw_logthresh_) {
if (prq_logtarget_->pt_->tagged()) {
// using the new tagged trace format
trace("I "TIME_FORMAT" -prq:adr %d -prq:rx %d -prq:tx %d -prq:bl %d -prq:snd0 {%d %d} -prq:snd1 {%d %d} -prq:snd2 {%d %d} -prq:snd3 {%d %d}",
Scheduler::instance().clock(),
prq_ipaddr_,
stat_recv_, stat_send_, stat_blocked_,
prq_snd_[0].ifq_len, prq_snd_[0].ifq_len-last_ifqlen_[0],
prq_snd_[1].ifq_len, prq_snd_[1].ifq_len-last_ifqlen_[1],
prq_snd_[2].ifq_len, prq_snd_[2].ifq_len-last_ifqlen_[2],
prq_snd_[3].ifq_len, prq_snd_[3].ifq_len-last_ifqlen_[3]);
} else {
trace("I %.9f _%d_ ifq rx %d tx %d bl %d [%d %d] [%d %d] [%d %d] [%d %d]",
Scheduler::instance().clock(),
prq_ipaddr_,
stat_recv_, stat_send_, stat_blocked_,
prq_snd_[0].ifq_len, prq_snd_[0].ifq_len-last_ifqlen_[0],
prq_snd_[1].ifq_len, prq_snd_[1].ifq_len-last_ifqlen_[1],
prq_snd_[2].ifq_len, prq_snd_[2].ifq_len-last_ifqlen_[2],
prq_snd_[3].ifq_len, prq_snd_[3].ifq_len-last_ifqlen_[3]);
}
}
for(q = 0; q < IFQ_MAX; q++) {
last_ifqlen_[q] = prq_snd_[q].ifq_len;
}
stat_send_ = stat_recv_ = stat_blocked_ = 0;
}
void
CMUPriQueue::trace(char* fmt, ...)
{
va_list ap;
assert(prq_logtarget_);
va_start(ap, fmt);
vsprintf(prq_logtarget_->pt_->buffer(), fmt, ap);
prq_logtarget_->pt_->dump();
va_end(ap);
}
void
CMUPriQueue::recv(Packet *p, Handler *)
{
#if PRIQUEUE_DEBUG > 0
prq_validate();
#endif
stat_recv_++;
if(prq_blocked_ == 1) {
stat_blocked_++;
} else {
assert(prq_length() == 0);
}
prq_enqueue(p);
#if PRIQUEUE_DEBUG > 0
prq_validate();
#endif
}
void
CMUPriQueue::prq_resume()
{
Packet *p;
assert(prq_blocked_);
#if PRIQUEUE_DEBUG > 0
prq_validate();
#endif
p = prq_dequeue();
if (p != 0) {
stat_send_++;
target_->recv(p, &prq_qh_);
} else {
prq_blocked_ = 0;
}
}
/*
* Called at the end of the simulation to purge the IFQ.
*/
void
CMUPriQueue::Terminate()
{
Packet *p;
while((p = prq_dequeue())) {
drop(p, DROP_END_OF_SIMULATION);
}
}
Packet*
CMUPriQueue::prq_get_nexthop(nsaddr_t id)
{
int q;
Packet *p, *pprev = 0;
struct ifqueue *ifq;
#if PRIQUEUE_DEBUG > 0
prq_validate();
#endif
for(q = 0; q < IFQ_MAX; q++) {
ifq = &prq_snd_[q];
pprev = 0;
for(p = ifq->ifq_head; p; p = p->next_) {
struct hdr_cmn *ch = HDR_CMN(p);
if(ch->next_hop() == id)
break;
pprev = p;
}
if(p) {
if(p == ifq->ifq_head) {
assert(pprev == 0);
IF_DEQUEUE(ifq, p);
/* don't increment drop counter */
#if PRIQUEUE_DEBUG > 0
prq_validate();
#endif
return p;
} else {
assert(pprev);
pprev->next_ = p->next_;
if(p == ifq->ifq_tail)
ifq->ifq_tail = pprev;
ifq->ifq_len--;
#if PRIQUEUE_DEBUG > 0
prq_validate();
#endif
p->next_ = 0;
return p;
}
}
}
return (Packet*) 0;
}
int
CMUPriQueue::prq_isfull(Packet *p)
{
int q = prq_assign_queue(p);
struct ifqueue *ifq = &prq_snd_[q];
if(IF_QFULL(ifq))
return 1;
else
return 0;
}
int
CMUPriQueue::prq_length()
{
int q, tlen = 0;
for(q = 0; q < IFQ_MAX; q++) {
tlen += prq_snd_[q].ifq_len;
}
return tlen;
}
/* ======================================================================
CMUPriQueue - private routines
====================================================================== */
void
CMUPriQueue::prq_enqueue(Packet *p)
{
int q = prq_assign_queue(p);
struct ifqueue *ifq = &prq_snd_[q];
if(IF_QFULL(ifq)) {
IF_DROP(ifq);
drop(p, DROP_IFQ_QFULL);
return;
}
IF_ENQUEUE(ifq, p);
/*
* Start queue if idle...
*/
if(prq_blocked_ == 0) {
p = prq_dequeue();
prq_blocked_ = 1;
stat_send_++;
target_->recv(p, &prq_qh_);
}
}
Packet*
CMUPriQueue::prq_dequeue(void)
{
Packet *p;
int q;
for(q = 0; q < IFQ_MAX; q++) {
if(prq_snd_[q].ifq_len > 0) {
IF_DEQUEUE(&prq_snd_[q], p);
return p;
} else {
assert(prq_snd_[q].ifq_head == 0);
}
}
return (Packet*) 0;
}
int
CMUPriQueue::prq_assign_queue(Packet *p)
{
struct hdr_cmn *ch = HDR_CMN(p);
switch(ch->ptype()) {
case PT_AODV:
case PT_DSR:
case PT_IMEP:
case PT_MESSAGE: /* used by DSDV */
case PT_TORA:
return IFQ_RTPROTO;
case PT_AUDIO:
case PT_VIDEO:
return IFQ_REALTIME;
case PT_ACK:
return IFQ_LOWDELAY;
default:
return IFQ_NORMAL;
}
}
void
CMUPriQueue::prq_validate()
{
int q, qlen;
Packet *p;
struct ifqueue *ifq;
for(q = 0; q < IFQ_MAX; q++) {
ifq = &prq_snd_[q];
qlen = 0;
if(ifq->ifq_head == 0) {
assert(ifq->ifq_len == 0);
assert(ifq->ifq_head == ifq->ifq_tail);
} else {
for(p = ifq->ifq_head; p; p = p->next_) {
if(p->next_ == 0)
assert(p == ifq->ifq_tail);
qlen++;
}
assert(qlen == ifq->ifq_len);
assert(qlen <= ifq->ifq_maxlen);
}
}
}
syntax highlighted by Code2HTML, v. 0.9.1