/* * aggcounter.{cc,hh} -- count packets/bytes with given aggregate * Eddie Kohler after Greg Minshall * * Copyright (c) 2001 International Computer Science Institute * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, subject to the conditions * listed in the Click LICENSE file. These conditions include: you must * preserve this copyright notice, and you cannot mention the copyright * holders in advertising related to the Software without their permission. * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This * notice is a summary of the Click LICENSE file; the license in that file is * legally binding. */ #include #include "aggcounter.hh" #include #include #include #include #include // for first_bit_set #include CLICK_DECLS AggregateCounter::AggregateCounter() : _root(0), _free(0), _call_nnz_h(0), _call_count_h(0) { } AggregateCounter::~AggregateCounter() { } AggregateCounter::Node * AggregateCounter::new_node_block() { assert(!_free); int block_size = 1024; Node *block = new Node[block_size]; if (!block) return 0; _blocks.push_back(block); for (int i = 1; i < block_size - 1; i++) block[i].child[0] = &block[i+1]; block[block_size - 1].child[0] = 0; _free = &block[1]; return &block[0]; } int AggregateCounter::configure(Vector &conf, ErrorHandler *errh) { bool bytes = false; bool ip_bytes = false; bool packet_count = true; bool extra_length = true; uint32_t freeze_nnz, stop_nnz; uint64_t freeze_count, stop_count; String call_nnz, call_count; freeze_nnz = stop_nnz = _call_nnz = (uint32_t)(-1); freeze_count = stop_count = _call_count = (uint64_t)(-1); if (cp_va_parse(conf, this, errh, cpKeywords, "BYTES", cpBool, "count bytes?", &bytes, "IP_BYTES", cpBool, "do not count link header bytes?", &ip_bytes, "MULTIPACKET", cpBool, "use packet count annotation?", &packet_count, "EXTRA_LENGTH", cpBool, "use extra length annotation?", &extra_length, "AGGREGATE_FREEZE", cpUnsigned, "freeze after N nonzero aggregates", &freeze_nnz, "COUNT_FREEZE", cpUnsigned64, "freeze after count reaches N", &freeze_count, "AGGREGATE_STOP", cpUnsigned, "stop router after N nonzero aggregates", &stop_nnz, "COUNT_STOP", cpUnsigned64, "stop router after count reaches N", &stop_count, "AGGREGATE_CALL", cpArgument, "call handler after N nonzero aggregates", &call_nnz, "COUNT_CALL", cpArgument, "call handler after count reaches N", &call_count, "BANNER", cpString, "output banner", &_output_banner, cpEnd) < 0) return -1; _bytes = bytes; _ip_bytes = ip_bytes; _use_packet_count = packet_count; _use_extra_length = extra_length; if ((freeze_nnz != (uint32_t)(-1)) + (stop_nnz != (uint32_t)(-1)) + ((bool)call_nnz) > 1) return errh->error("'AGGREGATE_FREEZE', 'AGGREGATE_STOP', and 'AGGREGATE_CALL' are mutually exclusive"); else if (freeze_nnz != (uint32_t)(-1)) { _call_nnz = freeze_nnz; _call_nnz_h = new HandlerCall(name() + ".freeze true"); } else if (stop_nnz != (uint32_t)(-1)) { _call_nnz = stop_nnz; _call_nnz_h = new HandlerCall(name() + ".stop"); } else if (call_nnz) { if (!cp_unsigned(cp_pop_spacevec(call_nnz), &_call_nnz)) return errh->error("'AGGREGATE_CALL' first word should be unsigned (number of aggregates)"); _call_nnz_h = new HandlerCall(call_nnz); } if ((freeze_count != (uint64_t)(-1)) + (stop_count != (uint64_t)(-1)) + ((bool)call_count) > 1) return errh->error("'COUNT_FREEZE', 'COUNT_STOP', and 'COUNT_CALL' are mutually exclusive"); else if (freeze_count != (uint64_t)(-1)) { _call_count = freeze_count; _call_count_h = new HandlerCall(name() + ".freeze true"); } else if (stop_count != (uint64_t)(-1)) { _call_count = stop_count; _call_count_h = new HandlerCall(name() + ".stop"); } else if (call_count) { if (!cp_unsigned64(cp_pop_spacevec(call_count), &_call_count)) return errh->error("'COUNT_CALL' first word should be unsigned (count)"); _call_count_h = new HandlerCall(call_count); } return 0; } int AggregateCounter::initialize(ErrorHandler *errh) { if (_call_nnz_h && _call_nnz_h->initialize_write(this, errh) < 0) return -1; if (_call_count_h && _call_count_h->initialize_write(this, errh) < 0) return -1; if (clear(errh) < 0) return -1; _frozen = false; _active = true; return 0; } void AggregateCounter::cleanup(CleanupStage) { for (int i = 0; i < _blocks.size(); i++) delete[] _blocks[i]; _blocks.clear(); delete _call_nnz_h; delete _call_count_h; _call_nnz_h = _call_count_h = 0; } AggregateCounter::Node * AggregateCounter::make_peer(uint32_t a, Node *n, bool frozen) { /* * become a peer * algo: create two nodes, the two peers. leave orig node as * the parent of the two new ones. */ if (frozen) return 0; Node *down[2]; if (!(down[0] = new_node())) return 0; if (!(down[1] = new_node())) { free_node(down[0]); return 0; } // swivel is first bit 'a' and 'old->input' differ int swivel = ffs_msb(a ^ n->aggregate); // bitvalue is the value of that bit of 'a' int bitvalue = (a >> (32 - swivel)) & 1; // mask masks off all bits before swivel uint32_t mask = (swivel == 1 ? 0 : (0xFFFFFFFFU << (33 - swivel))); down[bitvalue]->aggregate = a; down[bitvalue]->count = 0; down[bitvalue]->child[0] = down[bitvalue]->child[1] = 0; *down[1 - bitvalue] = *n; /* copy orig node down one level */ n->aggregate = (down[0]->aggregate & mask); if (down[0]->aggregate == n->aggregate) { n->count = down[0]->count; down[0]->count = 0; } else n->count = 0; n->child[0] = down[0]; /* point to children */ n->child[1] = down[1]; return (n->aggregate == a ? n : down[bitvalue]); } AggregateCounter::Node * AggregateCounter::find_node(uint32_t a, bool frozen) { // straight outta tcpdpriv Node *n = _root; while (n) { if (n->aggregate == a) return (n->count || !frozen ? n : 0); if (!n->child[0]) n = make_peer(a, n, frozen); else { // swivel is the first bit in which the two children differ int swivel = ffs_msb(n->child[0]->aggregate ^ n->child[1]->aggregate); if (ffs_msb(a ^ n->aggregate) < swivel) // input differs earlier n = make_peer(a, n, frozen); else if (a & (1 << (32 - swivel))) n = n->child[1]; else n = n->child[0]; } } if (!frozen) click_chatter("AggregateCounter: out of memory!"); return 0; } inline bool AggregateCounter::update(Packet *p, bool frozen) { if (!_active) return false; // AGGREGATE_ANNO is already in host byte order! uint32_t agg = AGGREGATE_ANNO(p); Node *n = find_node(agg, frozen); if (!n) return false; uint32_t amount; if (!_bytes) amount = 1 + (_use_packet_count ? EXTRA_PACKETS_ANNO(p) : 0); else { amount = p->length() + (_use_extra_length ? EXTRA_LENGTH_ANNO(p) : 0); if (_ip_bytes && p->network_header()) amount -= p->network_header_offset(); } // update _num_nonzero; possibly call handler if (amount && !n->count) { if (_num_nonzero >= _call_nnz) { _call_nnz = (uint32_t)(-1); _call_nnz_h->call_write(); // handler may have changed our state; reupdate return update(p, frozen || _frozen); } _num_nonzero++; } n->count += amount; _count += amount; if (_count >= _call_count) { _call_count = (uint64_t)(-1); _call_count_h->call_write(); } return true; } void AggregateCounter::push(int port, Packet *p) { port = !update(p, _frozen || (port == 1)); output(noutputs() == 1 ? 0 : port).push(p); } Packet * AggregateCounter::pull(int port) { Packet *p = input(ninputs() == 1 ? 0 : port).pull(); if (p && _active) update(p, _frozen || (port == 1)); return p; } // CLEAR, REAGGREGATE void AggregateCounter::clear_node(Node *n) { if (n->child[0]) { clear_node(n->child[0]); clear_node(n->child[1]); } free_node(n); } int AggregateCounter::clear(ErrorHandler *errh) { if (_root) clear_node(_root); if (!(_root = new_node())) { if (errh) errh->error("out of memory!"); return -1; } _root->aggregate = 0; _root->count = 0; _root->child[0] = _root->child[1] = 0; _num_nonzero = 0; _count = 0; return 0; } void AggregateCounter::reaggregate_node(Node *n) { Node *l = n->child[0], *r = n->child[1]; uint32_t count = n->count; free_node(n); if ((n = find_node(count, false))) { if (!n->count) _num_nonzero++; n->count++; _count++; } if (l) { reaggregate_node(l); reaggregate_node(r); } } void AggregateCounter::reaggregate_counts() { Node *old_root = _root; _root = 0; clear(); reaggregate_node(old_root); } // HANDLERS static void write_batch(FILE *f, AggregateCounter::WriteFormat format, uint32_t *buffer, int pos, double count, ErrorHandler *) { if (format == AggregateCounter::WR_BINARY) fwrite(buffer, sizeof(uint32_t), pos, f); else if (format == AggregateCounter::WR_TEXT_IP) for (int i = 0; i < pos; i += 2) fprintf(f, "%d.%d.%d.%d %u\n", (buffer[i] >> 24) & 255, (buffer[i] >> 16) & 255, (buffer[i] >> 8) & 255, buffer[i] & 255, buffer[i+1]); else if (format == AggregateCounter::WR_TEXT_PDF) for (int i = 0; i < pos; i += 2) fprintf(f, "%u %.12g\n", buffer[i], buffer[i+1] / count); else if (format == AggregateCounter::WR_TEXT) for (int i = 0; i < pos; i += 2) fprintf(f, "%u %u\n", buffer[i], buffer[i+1]); } void AggregateCounter::write_nodes(Node *n, FILE *f, WriteFormat format, uint32_t *buffer, int &pos, int len, ErrorHandler *errh) const { if (n->count > 0) { buffer[pos++] = n->aggregate; buffer[pos++] = n->count; if (pos == len) { write_batch(f, format, buffer, pos, _count, errh); pos = 0; } } if (n->child[0]) write_nodes(n->child[0], f, format, buffer, pos, len, errh); if (n->child[1]) write_nodes(n->child[1], f, format, buffer, pos, len, errh); } int AggregateCounter::write_file(String where, WriteFormat format, ErrorHandler *errh) const { FILE *f; if (where == "-") f = stdout; else f = fopen(where.c_str(), (format == WR_BINARY ? "wb" : "w")); if (!f) return errh->error("%s: %s", where.c_str(), strerror(errno)); fprintf(f, "!IPAggregate 1.0\n"); fwrite(_output_banner.data(), 1, _output_banner.length(), f); if (_output_banner.length() && _output_banner.back() != '\n') fputc('\n', f); fprintf(f, "!num_nonzero %u\n", _num_nonzero); if (format == WR_BINARY) { #if CLICK_BYTE_ORDER == CLICK_BIG_ENDIAN fprintf(f, "!packed_be\n"); #elif CLICK_BYTE_ORDER == CLICK_LITTLE_ENDIAN fprintf(f, "!packed_le\n"); #else format = WR_TEXT; #endif } else if (format == WR_TEXT_IP) fprintf(f, "!ip\n"); uint32_t buf[1024]; int pos = 0; write_nodes(_root, f, format, buf, pos, 1024, errh); if (pos) write_batch(f, format, buf, pos, _count, errh); bool had_err = ferror(f); if (f != stdout) fclose(f); if (had_err) return errh->error("%s: file error", where.c_str()); else return 0; } int AggregateCounter::write_file_handler(const String &data, Element *e, void *thunk, ErrorHandler *errh) { AggregateCounter *ac = static_cast(e); String fn; if (!cp_filename(cp_uncomment(data), &fn)) return errh->error("argument should be filename"); int int_thunk = (intptr_t)thunk; return ac->write_file(fn, (WriteFormat)int_thunk, errh); } enum { AC_FROZEN, AC_ACTIVE, AC_BANNER, AC_STOP, AC_REAGGREGATE, AC_CLEAR, AC_AGGREGATE_CALL, AC_COUNT_CALL, AC_NAGG, AC_COUNT }; String AggregateCounter::read_handler(Element *e, void *thunk) { AggregateCounter *ac = static_cast(e); switch ((intptr_t)thunk) { case AC_FROZEN: return cp_unparse_bool(ac->_frozen); case AC_ACTIVE: return cp_unparse_bool(ac->_active); case AC_BANNER: return ac->_output_banner; case AC_AGGREGATE_CALL: if (ac->_call_nnz == (uint32_t)(-1)) return ""; else return String(ac->_call_nnz) + " " + ac->_call_nnz_h->unparse(); case AC_COUNT_CALL: if (ac->_call_count == (uint64_t)(-1)) return ""; else return String(ac->_call_count) + " " + ac->_call_count_h->unparse(); case AC_COUNT: return String(ac->_count); case AC_NAGG: return String(ac->_num_nonzero); default: return ""; } } int AggregateCounter::write_handler(const String &data, Element *e, void *thunk, ErrorHandler *errh) { AggregateCounter *ac = static_cast(e); String s = cp_uncomment(data); switch ((intptr_t)thunk) { case AC_FROZEN: { bool val; if (!cp_bool(s, &val)) return errh->error("argument to 'frozen' should be bool"); ac->_frozen = val; return 0; } case AC_ACTIVE: { bool val; if (!cp_bool(s, &val)) return errh->error("argument to 'active' should be bool"); ac->_active = val; return 0; } case AC_STOP: ac->_active = false; ac->router()->please_stop_driver(); return 0; case AC_REAGGREGATE: ac->reaggregate_counts(); return 0; case AC_BANNER: ac->_output_banner = data; if (data && data.back() != '\n') ac->_output_banner += '\n'; else if (data && data.length() == 1) ac->_output_banner = ""; return 0; case AC_CLEAR: return ac->clear(errh); case AC_AGGREGATE_CALL: { uint32_t new_nnz = (uint32_t)(-1); if (s) { if (!cp_unsigned(cp_pop_spacevec(s), &new_nnz)) return errh->error("argument to 'aggregate_call' should be 'N HANDLER [VALUE]'"); else if (HandlerCall::reset_write(ac->_call_nnz_h, s, ac, errh) < 0) return -1; } ac->_call_nnz = new_nnz; return 0; } case AC_COUNT_CALL: { uint64_t new_count = (uint64_t)(-1); if (s) { if (!cp_unsigned64(cp_pop_spacevec(s), &new_count)) return errh->error("argument to 'count_call' should be 'N HANDLER [VALUE]'"); else if (HandlerCall::reset_write(ac->_call_count_h, s, ac, errh) < 0) return -1; } ac->_call_count = new_count; return 0; } default: return errh->error("internal error"); } } void AggregateCounter::add_handlers() { add_write_handler("write_text_file", write_file_handler, (void *)WR_TEXT); add_write_handler("write_ascii_file", write_file_handler, (void *)WR_TEXT); add_write_handler("write_file", write_file_handler, (void *)WR_BINARY); add_write_handler("write_ip_file", write_file_handler, (void *)WR_TEXT_IP); add_write_handler("write_pdf_file", write_file_handler, (void *)WR_TEXT_PDF); add_read_handler("freeze", read_handler, (void *)AC_FROZEN); add_write_handler("freeze", write_handler, (void *)AC_FROZEN); add_read_handler("active", read_handler, (void *)AC_ACTIVE); add_write_handler("active", write_handler, (void *)AC_ACTIVE); add_write_handler("stop", write_handler, (void *)AC_STOP); add_write_handler("reaggregate_counts", write_handler, (void *)AC_REAGGREGATE); add_write_handler("counts_pdf", write_handler, (void *)AC_REAGGREGATE); add_read_handler("banner", read_handler, (void *)AC_BANNER); add_write_handler("banner", write_handler, (void *)AC_BANNER); set_handler_flags("banner", Handler::RAW); add_write_handler("clear", write_handler, (void *)AC_CLEAR); add_read_handler("aggregate_call", read_handler, (void *)AC_AGGREGATE_CALL); add_write_handler("aggregate_call", write_handler, (void *)AC_AGGREGATE_CALL); add_read_handler("count_call", read_handler, (void *)AC_COUNT_CALL); add_write_handler("count_call", write_handler, (void *)AC_COUNT_CALL); add_read_handler("count", read_handler, (void *)AC_COUNT); add_read_handler("nagg", read_handler, (void *)AC_NAGG); } ELEMENT_REQUIRES(userlevel) EXPORT_ELEMENT(AggregateCounter) CLICK_ENDDECLS