// -*- c-basic-offset: 4 -*-
/*
* aggpktcounter.{cc,hh} -- element counts packets per packet number and
* aggregate annotation
* Eddie Kohler
*
* Copyright (c) 2002 International Computer Science Institute
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, subject to the conditions
* listed in the Click LICENSE file. These conditions include: you must
* preserve this copyright notice, and you cannot mention the copyright
* holders in advertising related to the Software without their permission.
* The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
* notice is a summary of the Click LICENSE file; the license in that file is
* legally binding.
*/
#include <click/config.h>
#include "aggpktcounter.hh"
#include <click/confparse.hh>
#include <click/error.hh>
#include <click/packet_anno.hh>
#include <click/router.hh>
#include <click/straccum.hh>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
CLICK_DECLS
AggregatePacketCounter::Flow::Flow(uint32_t aggregate, int columns)
: _aggregate(aggregate), _next(0), _counts(new Vector<uint32_t>[columns])
{
}
void
AggregatePacketCounter::Flow::add(uint32_t packetno, int column)
{
if (_counts[column].size() <= (int) packetno)
_counts[column].resize(packetno + 1, 0);
_counts[column].at_u(packetno)++;
}
AggregatePacketCounter::packetctr_t
AggregatePacketCounter::Flow::column_count(int column) const
{
packetctr_t count = 0;
for (const uint32_t *v = _counts[column].begin(); v < _counts[column].end(); v++)
count += *v;
return count;
}
void
AggregatePacketCounter::Flow::received(Vector<uint32_t> &v, const AggregatePacketCounter *apc) const
{
int n = 0;
for (int port = 0; port < apc->noutputs(); port++)
if (_counts[port].size() > n)
n = _counts[port].size();
for (int packetno = 0; packetno < n; packetno++)
for (int port = 0; port < apc->noutputs(); port++)
if (packetno < _counts[port].size() && _counts[port].at_u(packetno)) {
v.push_back(packetno);
break;
}
}
void
AggregatePacketCounter::Flow::undelivered(Vector<uint32_t> &undelivered, const AggregatePacketCounter *apc) const
{
assert(apc->noutputs() >= 2);
int packetno;
int min_n = (_counts[0].size() < _counts[1].size() ? _counts[0].size() : _counts[1].size());
for (packetno = 0; packetno < min_n; packetno++)
if (_counts[0].at_u(packetno) > _counts[1].at_u(packetno))
undelivered.push_back(packetno);
for (; packetno < _counts[0].size(); packetno++)
if (_counts[0].at_u(packetno))
undelivered.push_back(packetno);
}
AggregatePacketCounter::AggregatePacketCounter()
{
for (int i = 0; i < NFLOWMAP; i++)
_flowmap[i] = 0;
}
AggregatePacketCounter::~AggregatePacketCounter()
{
}
int
AggregatePacketCounter::configure(Vector<String> &conf, ErrorHandler *errh)
{
Element *e = 0;
_packetno = 0;
if (cp_va_parse(conf, this, errh,
cpKeywords,
"NOTIFIER", cpElement, "aggregate deletion notifier", &e,
"PACKETNO", cpInteger, "packet number annotation (-1..1)", &_packetno,
cpEnd) < 0)
return -1;
if (_packetno > 1)
return errh->error("'PACKETNO' cannot be bigger than 1");
/*if (e && !(_agg_notifier = (AggregateNotifier *)e->cast("AggregateNotifier")))
return errh->error("%s is not an AggregateNotifier", e->name().c_str()); */
return 0;
}
int
AggregatePacketCounter::initialize(ErrorHandler *)
{
_total_flows = _total_packets = 0;
//if (_agg_notifier)
//_agg_notifier->add_listener(this);
//_gc_timer.initialize(this);
return 0;
}
void
AggregatePacketCounter::end_flow(Flow *f, ErrorHandler *)
{
/* if (f->npackets() >= _mincount) {
f->output(errh);
if (_gzip && f->filename() != "-")
if (add_compressable(f->filename(), errh) < 0)
_gzip = false;
} else
f->unlink(errh);*/
delete f;
}
void
AggregatePacketCounter::cleanup(CleanupStage)
{
ErrorHandler *errh = ErrorHandler::default_handler();
for (int i = 0; i < NFLOWMAP; i++)
while (Flow *f = _flowmap[i]) {
_flowmap[i] = f->next();
end_flow(f, errh);
}
if (_total_packets > 0 && _total_flows == 0)
errh->lwarning(declaration(), "saw no packets with aggregate annotations");
}
AggregatePacketCounter::Flow *
AggregatePacketCounter::find_flow(uint32_t agg)
{
if (agg == 0)
return 0;
int bucket = (agg & (NFLOWMAP - 1));
Flow *prev = 0, *f = _flowmap[bucket];
while (f && f->aggregate() != agg) {
prev = f;
f = f->next();
}
if (f)
/* nada */;
else if ((f = new Flow(agg, ninputs()))) {
prev = f;
_total_flows++;
} else
return 0;
if (prev) {
prev->set_next(f->next());
f->set_next(_flowmap[bucket]);
_flowmap[bucket] = f;
}
return f;
}
inline void
AggregatePacketCounter::smaction(int port, Packet *p)
{
_total_packets++;
if (Flow *f = find_flow(AGGREGATE_ANNO(p))) {
if (_packetno >= 0)
f->add(PACKET_NUMBER_ANNO(p, _packetno), port);
else
f->add(0, port);
}
}
void
AggregatePacketCounter::push(int port, Packet *p)
{
smaction(port, p);
output(port).push(p);
}
Packet *
AggregatePacketCounter::pull(int port)
{
if (Packet *p = input(port).pull()) {
smaction(port, p);
return p;
} else
return 0;
}
/*
void
AggregatePacketCounter::aggregate_notify(uint32_t agg, AggregateEvent event, const Packet *)
{
if (event == DELETE_AGG && find_aggregate(agg, 0)) {
_gc_aggs.push_back(agg);
_gc_aggs.push_back(click_jiffies());
if (!_gc_timer.scheduled())
_gc_timer.schedule_after_msec(250);
}
}
void
AggregatePacketCounter::gc_hook(Timer *t, void *thunk)
{
AggregatePacketCounter *td = static_cast<AggregatePacketCounter *>(thunk);
uint32_t limit_jiff = click_jiffies() - (CLICK_HZ / 4);
int i;
for (i = 0; i < td->_gc_aggs.size() && SEQ_LEQ(td->_gc_aggs[i+1], limit_jiff); i += 2)
if (Flow *f = td->find_aggregate(td->_gc_aggs[i], 0)) {
int bucket = (f->aggregate() & (NFLOWMAP - 1));
assert(td->_flowmap[bucket] == f);
td->_flowmap[bucket] = f->next();
td->end_flow(f, ErrorHandler::default_handler());
}
if (i < td->_gc_aggs.size()) {
td->_gc_aggs.erase(td->_gc_aggs.begin(), td->_gc_aggs.begin() + i);
t->schedule_after_msec(250);
}
}
*/
enum { H_CLEAR, H_COUNT };
String
AggregatePacketCounter::read_handler(Element *e, void *thunk)
{
AggregatePacketCounter *ac = static_cast<AggregatePacketCounter *>(e);
switch ((intptr_t)thunk) {
case H_COUNT: {
packetctr_t count = 0;
for (int i = 0; i < NFLOWMAP; i++)
for (const Flow *f = ac->_flowmap[i]; f; f = f->next())
for (int col = 0; col < ac->ninputs(); col++)
count += f->column_count(col);
return String(count);
}
default:
return "<error>";
}
}
int
AggregatePacketCounter::write_handler(const String &, Element *e, void *thunk, ErrorHandler *errh)
{
AggregatePacketCounter *td = static_cast<AggregatePacketCounter *>(e);
switch ((intptr_t)thunk) {
case H_CLEAR:
for (int i = 0; i < NFLOWMAP; i++)
while (Flow *f = td->_flowmap[i]) {
td->_flowmap[i] = f->next();
td->end_flow(f, errh);
}
return 0;
default:
return -1;
}
}
String
AggregatePacketCounter::flow_handler(uint32_t aggregate, FlowFunc func)
{
Vector<uint32_t> v;
if (Flow *f = find_flow(aggregate))
(f->*func)(v, this);
StringAccum sa;
for (int i = 0; i < v.size(); i++)
sa << v[i] << '\n';
return sa.take_string();
}
int
AggregatePacketCounter::thing_read_handler(int, String& s, Element* e, const Handler* h, ErrorHandler* errh)
{
uint32_t aggregate;
if (!s)
aggregate = 0;
else if (!cp_unsigned(cp_uncomment(s), &aggregate))
return errh->error("argument should be aggregate number");
FlowFunc ff = (h->thunk1() ? &Flow::undelivered : &Flow::received);
AggregatePacketCounter *apc = static_cast<AggregatePacketCounter *>(e);
s = apc->flow_handler(aggregate, ff);
return 0;
}
void
AggregatePacketCounter::add_handlers()
{
add_write_handler("clear", write_handler, (void *)H_CLEAR);
add_read_handler("count", read_handler, (void *)H_COUNT);
set_handler("received", Handler::OP_READ | Handler::READ_PARAM, thing_read_handler, 0);
set_handler("undelivered", Handler::OP_READ | Handler::READ_PARAM, thing_read_handler, (void*) 1);
}
CLICK_ENDDECLS
ELEMENT_REQUIRES(userlevel)
EXPORT_ELEMENT(AggregatePacketCounter)
syntax highlighted by Code2HTML, v. 0.9.1