// -*- c-basic-offset: 4 -*-
/*
 * toipflowdumps.{cc,hh} -- creates separate trace files for each flow
 * Eddie Kohler
 *
 * Copyright (c) 2002 International Computer Science Institute
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the "Software"),
 * to deal in the Software without restriction, subject to the conditions
 * listed in the Click LICENSE file. These conditions include: you must
 * preserve this copyright notice, and you cannot mention the copyright
 * holders in advertising related to the Software without their permission.
 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
 * notice is a summary of the Click LICENSE file; the license in that file is
 * legally binding.
 */

#include <click/config.h>
#include "toipflowdumps.hh"
#include <click/confparse.hh>
#include <click/error.hh>
#include <click/packet_anno.hh>
#include <click/router.hh>
#include <click/standard/scheduleinfo.hh>
#include <clicknet/udp.h>
#include <clicknet/icmp.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include "elements/analysis/ipsumdumpinfo.hh"
CLICK_DECLS

#ifdef i386
# define PUT4(p, d)	*reinterpret_cast<uint32_t *>((p)) = htonl((d))
#else
# define PUT4(p, d)	do { (p)[0] = (d)>>24; (p)[1] = (d)>>16; (p)[2] = (d)>>8; (p)[3] = (d); } while (0)
#endif
#define PUT1(p, d)	((p)[0] = (d))

ToIPFlowDumps::Flow::Flow(const Packet *p, const String &filename,
			  bool absolute_time, bool absolute_seq, bool binary,
			  bool ip_id, int tcp_opt, bool tcp_window)
    : _next(0),
      _flowid(p), _ip_p(p->ip_header()->ip_p),
      _aggregate(AGGREGATE_ANNO(p)), _packet_count(0), _note_count(0),
      _filename(filename), _outputted(false), _binary(binary),
      _tcp_opt(tcp_opt), _npkt(0), _nnote(0)
{
    // use the encapsulated IP header for ICMP errors
    if (_ip_p == IP_PROTO_ICMP) {
	const click_icmp *icmph = p->icmp_header();
	// should assert some things here
	const click_ip *embedded_iph = reinterpret_cast<const click_ip *>(icmph + 1);
	_flowid = IPFlowID(embedded_iph);
	_ip_p = embedded_iph->ip_p;
    }

    if (PAINT_ANNO(p) & 1)	// reverse _flowid
	_flowid = _flowid.rev();
    
    _have_first_seq[0] = _have_first_seq[1] = absolute_seq;
    _first_seq[0] = _first_seq[1] = 0;

    if (absolute_time)
	_first_timestamp = Timestamp();
    else			// make first packet have timestamp .000001
	_first_timestamp = p->timestamp_anno() - Timestamp::epsilon();

    if (ip_id)
	_ip_ids = new uint16_t[NPKT];
    else
	_ip_ids = 0;

    if (tcp_window && _ip_p == IP_PROTO_TCP)
	_tcp_windows = new uint16_t[NPKT];
    else
	_tcp_windows = 0;
    
    // sanity checks
    assert(_aggregate && (_ip_p == IP_PROTO_TCP || _ip_p == IP_PROTO_UDP));
}

ToIPFlowDumps::Flow::~Flow()
{
    delete[] _ip_ids;
    delete[] _tcp_windows;
}

int
ToIPFlowDumps::Flow::create_directories(const String &n, ErrorHandler *errh)
{
    int slash = n.find_right('/');
    if (slash <= 0)
	return 0;
    String component = n.substring(0, slash);
    if (access(component.c_str(), F_OK) >= 0)
	return 0;
    else if (create_directories(component, errh) < 0)
	return -1;
    else if (mkdir(component.c_str(), 0777) < 0)
	return errh->error("making directory %s: %s", component.c_str(), strerror(errno));
    else
	return 0;
}

void
ToIPFlowDumps::Flow::output_binary(StringAccum &sa)
{
    union {
	uint32_t u[8];
	uint16_t us[16];
	char c[32];
    } buf;
    int pi = 0, ni = 0;
    const uint16_t *opt = reinterpret_cast<const uint16_t *>(_opt_info.data());
    const uint16_t *end_opt = opt + (_opt_info.length() / 2);
    
    while (pi < _npkt || ni < _nnote)
	if (ni >= _nnote || _note[ni].before_pkt > pi) {
	    int pos;

	    buf.u[1] = ntohl(_pkt[pi].timestamp.sec());
#if HAVE_NANOTIMESTAMP
	    buf.u[2] = ntohl(_pkt[pi].timestamp.nsec());
#else
	    buf.u[2] = ntohl(_pkt[pi].timestamp.usec());
#endif
	    if (_ip_p == IP_PROTO_TCP) {
		buf.u[3] = ntohl(_pkt[pi].th_seq);
		buf.u[4] = ntohl(_pkt[pi].payload_len);
		buf.u[5] = ntohl(_pkt[pi].th_ack);
		pos = 24;
	    } else {
		buf.u[3] = ntohl(_pkt[pi].payload_len);
		pos = 16;
	    }

	    if (_ip_ids)
		buf.us[pos>>1] = _ip_ids[pi], pos += 2;
	    if (_tcp_windows)
		buf.us[pos>>1] = _tcp_windows[pi], pos += 2;
	    if (_ip_p == IP_PROTO_TCP)
		buf.c[pos++] = _pkt[pi].th_flags;
	    buf.c[pos++] = _pkt[pi].direction;
	    
	    buf.u[0] = ntohl(pos);
	    sa.append(&buf.c[0], pos);

	    // handle TCP options specially
	    if (opt < end_opt && opt[0] == pi) {
		int original_pos = sa.length() - pos;
		IPSummaryDump::unparse_tcp_opt_binary(sa, reinterpret_cast<const uint8_t *>(opt + 2), opt[1], _tcp_opt);
		PUT4(sa.data() + original_pos, sa.length() - original_pos);
		opt += 2 + (opt[1] / 2);
	    }

	    pi++;
	} else {
	    int len = (ni == _nnote - 1 ? _note_text.length() : _note[ni+1].pos) - _note[ni].pos;
	    int record_len = (4 + len + 2);
	    buf.u[0] = ntohl(record_len | 0x80000000U);
	    buf.c[4] = '#';
	    sa.append(&buf.c[0], 5);
	    sa.append(_note_text.data() + _note[ni].pos, len);
	    sa.append("\n", 1);
	    ni++;
	}
}

int
ToIPFlowDumps::Flow::output(ErrorHandler *errh)
{
    static StringAccum sa;
    
    int fd;
    if (_filename == "-")
	fd = STDOUT_FILENO;
    else if (_outputted)
	fd = open(_filename.c_str(), O_WRONLY | O_APPEND);
    else if (create_directories(_filename, errh) < 0)
	return -1;
    else
	fd = open(_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
    if (fd < 0)
	return errh->error("%s: %s", _filename.c_str(), strerror(errno));

    // make a guess about how much data we'll need
    sa.clear();
    sa.reserve(_npkt * (_binary ? 28 : 40) + _note_text.length() + _nnote * 8 + _opt_info.length() + 16);
    
    if (!_outputted) {
	sa << "!IPSummaryDump 1.2\n!flowid "
	   << _flowid.saddr() << ' ' << ntohs(_flowid.sport()) << ' '
	   << _flowid.daddr() << ' ' << ntohs(_flowid.dport()) << ' '
	   << (_ip_p == IP_PROTO_TCP ? 'T' : 'U')
	   << "\n!aggregate " << _aggregate << '\n';

#if HAVE_NANOTIMESTAMP
	sa << "!data ntimestamp";
#else
	sa << "!data timestamp";
#endif
	if (_binary) {
	    if (_ip_p == IP_PROTO_TCP)
		sa << " tcp_seq payload_len tcp_ack";
	    else
		sa << " payload_len";
	    if (_ip_ids)
		sa << " ip_id";
	    if (_tcp_windows)
		sa << " tcp_window";
	    if (_ip_p == IP_PROTO_TCP)
		sa << " tcp_flags";
	    sa << " direction";
	    if (_ip_p == IP_PROTO_TCP) {
		if (_tcp_opt & IPSummaryDump::DO_TCPOPT_TIMESTAMP)
		    sa << " tcp_opt";
		else
		    sa << " tcp_ntopt";
	    }
	} else {
	    sa << " direction";
	    if (_ip_ids)
		sa << " ip_id";
	    if (_ip_p == IP_PROTO_TCP) {
		sa << " tcp_flags tcp_seq payload_len tcp_ack";
		if (_tcp_opt & IPSummaryDump::DO_TCPOPT_TIMESTAMP)
		    sa << " tcp_opt";
		else
		    sa << " tcp_ntopt";
	    } else
		sa << " payload_len";
	}
	sa << '\n';
	
	if (_have_first_seq[0] && _first_seq[0] && _ip_p == IP_PROTO_TCP)
	    sa << "!firstseq > " << _first_seq[0] << '\n';
	if (_have_first_seq[1] && _first_seq[1] && _ip_p == IP_PROTO_TCP)
	    sa << "!firstseq < " << _first_seq[1] << '\n';
	if (_first_timestamp)
	    sa << "!firsttime " << _first_timestamp << '\n';
	if (_binary)
	    sa << "!binary\n";
    }

    if (_binary)
	output_binary(sa);
    else {
	int pi = 0, ni = 0;
	const uint16_t *opt = reinterpret_cast<const uint16_t *>(_opt_info.data());
	const uint16_t *end_opt = opt + (_opt_info.length() / 2);
	
	while (pi < _npkt || ni < _nnote)
	    if (ni >= _nnote || _note[ni].before_pkt > pi) {
		int direction = _pkt[pi].direction;
		sa << _pkt[pi].timestamp << ' '
		   << (direction == 0 ? '>' : '<') << ' ';

		if (_ip_ids)
		    sa << _ip_ids[pi] << ' ';
		
		if (_ip_p == IP_PROTO_TCP) {
		    int flags = _pkt[pi].th_flags;
		    if (flags == TH_ACK)
			sa << 'A';
		    else if (flags == (TH_ACK | TH_PUSH))
			sa << 'P' << 'A';
		    else if (flags == 0)
			sa << '.';
		    else
			for (int flag = 0; flag < 7; flag++)
			    if (flags & (1 << flag))
				sa << IPSummaryDump::tcp_flags_word[flag];

		    sa << ' ' << _pkt[pi].th_seq
		       << ' ' << _pkt[pi].payload_len
		       << ' ' << _pkt[pi].th_ack;

		    if (_tcp_windows)
			sa << ' ' << ntohs(_tcp_windows[pi]);
		    
		    if (opt < end_opt && opt[0] == pi) {
			sa << ' ';
			IPSummaryDump::unparse_tcp_opt(sa, reinterpret_cast<const uint8_t *>(opt + 2), opt[1], _tcp_opt);
			opt += 2 + (opt[1] / 2);
		    }
		    
		    sa << '\n';
		} else
		    sa << _pkt[pi].payload_len << '\n';

		pi++;
	    } else {
		int len = (ni == _nnote - 1 ? _note_text.length() : _note[ni+1].pos) - _note[ni].pos;
		sa << '#';
		sa.append(_note_text.data() + _note[ni].pos, len);
		sa << '\n';
		ni++;
	    }
    }

    _npkt = 0;
    _opt_info.clear();
    
    _note_text.clear();
    _nnote = 0;

    // actually write data
    int pos = 0;
    while (pos < sa.length()) {
	int written = write(fd, sa.data() + pos, sa.length() - pos);
	if (written < 0 && errno != EINTR) {
	    errh->error("%s: %s", _filename.c_str(), strerror(errno));
	    break;
	}
	pos += written;
    }
    
    if (fd != STDOUT_FILENO)
	close(fd);
    _outputted = true;
    return 0;
}

inline void
ToIPFlowDumps::Flow::unlink(ErrorHandler *errh)
{
    if (_outputted && ::unlink(_filename.c_str()) < 0)
	errh->error("%s: %s", _filename.c_str(), strerror(errno));
}

void
ToIPFlowDumps::Flow::store_opt(const click_tcp *tcph, int direction)
{
    const uint8_t *opt = reinterpret_cast<const uint8_t *>(tcph + 1);
    const uint8_t *end_opt = opt + ((tcph->th_off << 2) - sizeof(click_tcp));
    bool any = false;
    int original_len = _opt_info.length();
    char *data;
    
    while (opt < end_opt)
	switch (*opt) {
	  case TCPOPT_EOL:
	    goto done;
	  case TCPOPT_NOP:
	    opt++;
	    break;
	  case TCPOPT_MAXSEG:
	    if (opt[1] != TCPOLEN_MAXSEG || opt + opt[1] > end_opt)
		goto bad_opt;
	    else
		goto good_opt;
	  case TCPOPT_WSCALE:
	    if (opt[1] != TCPOLEN_WSCALE || opt + opt[1] > end_opt)
		goto bad_opt;
	    else
		goto good_opt;
	  case TCPOPT_SACK_PERMITTED:
	    if (opt[1] != TCPOLEN_SACK_PERMITTED || opt + opt[1] > end_opt)
		goto bad_opt;
	    else
		goto good_opt;
	  case TCPOPT_TIMESTAMP:
	    if (opt[1] != TCPOLEN_TIMESTAMP || opt + opt[1] > end_opt)
		goto bad_opt;
	    else if (_tcp_opt & IPSummaryDump::DO_TCPOPT_TIMESTAMP)
		goto good_opt;
	    else
		goto ignore_opt;
	  good_opt:
	    if (!any && (data = _opt_info.extend(4)))
		*(reinterpret_cast<uint16_t *>(data)) = _npkt;
	    if ((data = _opt_info.extend(opt[1])))
		memcpy(data, opt, opt[1]);
	    opt += opt[1];
	    any = true;
	    break;
	  case TCPOPT_SACK:
	    if (opt[1] % 8 != 2 || opt + opt[1] > end_opt)
		goto bad_opt;
	    if (!any && (data = _opt_info.extend(4)))
		*(reinterpret_cast<uint16_t *>(data)) = _npkt;
	    if ((data = _opt_info.extend(opt[1]))) {
		// argh... must renumber sequence numbers in sack blocks 
		memcpy(data, opt, 2);
		const uint8_t *end_sack_opt = opt + opt[1];
		for (opt += 2, data += 2; opt < end_sack_opt; opt += 8, data += 8) {
		    uint32_t buf[2];
		    memcpy(buf, opt, 8);
		    if (!_have_first_seq[!direction]) {
			_first_seq[!direction] = ntohl(buf[0]);
			_have_first_seq[!direction] = true;
		    }
		    buf[0] = htonl(ntohl(buf[0]) - _first_seq[!direction]);
		    buf[1] = htonl(ntohl(buf[1]) - _first_seq[!direction]);
		    memcpy(data, buf, 8);
		}
	    } else
		opt += opt[1];
	    any = true;
	    break;
	  default:
	    if (opt[1] == 0 || opt + opt[1] > end_opt)
		goto bad_opt;
	    else
		goto ignore_opt;
	  ignore_opt:
	    opt += opt[1];
	    break;
	}

  done:
    if (any) {
	if (_opt_info.length() & 1)
	    _opt_info.append('\0');
	*(reinterpret_cast<uint16_t *>(_opt_info.data() + original_len) + 1) = _opt_info.length() - original_len - 4;
    }
    return;
    
  bad_opt:
    _opt_info.set_length(original_len);
}

int
ToIPFlowDumps::Flow::add_pkt(const Packet *p, ErrorHandler *errh)
{
    // ICMP errors are handled as notes, not packets
    if (PAINT_ANNO(p) >= 2) {
	assert(p->ip_header()->ip_p == IP_PROTO_ICMP);
	StringAccum sa;
	sa << p->timestamp_anno() << ' ' << (PAINT_ANNO(p) & 1 ? '>' : '<') << " ICMP_error";
	// this doesn't count as a note, really; it is a packet
	_note_count--;
	if (_packet_count < 0xFFFFFFFFU)
	    _packet_count++;
	return add_note(sa.take_string(), errh);
    }
    
    if (_npkt >= NPKT && output(errh) < 0)
	return -1;
    
    int direction = (PAINT_ANNO(p) & 1);
    const click_ip *iph = p->ip_header();
    assert(iph->ip_p == _ip_p);
    
    _pkt[_npkt].timestamp = p->timestamp_anno() - _first_timestamp;
    _pkt[_npkt].direction = direction;

    if (_ip_ids)
	_ip_ids[_npkt] = iph->ip_id;

    if (_ip_p == IP_PROTO_TCP) {
	const click_tcp *tcph = p->tcp_header();
	
	tcp_seq_t s = ntohl(tcph->th_seq);
	if (!_have_first_seq[direction]) {
	    _first_seq[direction] = s;
	    _have_first_seq[direction] = true;
	}
	tcp_seq_t a = ntohl(tcph->th_ack);
	if (!(tcph->th_flags & TH_ACK))
	    a = _first_seq[!direction];
	else if (!_have_first_seq[!direction]) {
	    _first_seq[!direction] = a;
	    _have_first_seq[!direction] = true;
	}
	
	_pkt[_npkt].th_seq = s - _first_seq[direction];
	_pkt[_npkt].th_ack = a - _first_seq[!direction];
	_pkt[_npkt].th_flags = tcph->th_flags;
	_pkt[_npkt].payload_len = ntohs(iph->ip_len) - (iph->ip_hl << 2) - (tcph->th_off << 2); // XXX check for correctness?

	if (_tcp_opt
	    && tcph->th_off > (sizeof(click_tcp) >> 2)
	    && (tcph->th_off != 8
		|| *(reinterpret_cast<const uint32_t *>(tcph + 1)) != htonl(0x0101080A)
		|| (_tcp_opt & IPSummaryDump::DO_TCPOPT_TIMESTAMP)))
	    store_opt(tcph, direction);

	if (_tcp_windows)
	    _tcp_windows[_npkt] = tcph->th_win;
	
    } else
	_pkt[_npkt].payload_len = ntohs(iph->ip_len) - sizeof(click_udp);
    
    _npkt++;
    if (_packet_count < 0xFFFFFFFFU)
	_packet_count++;
    
    return 0;
}

int
ToIPFlowDumps::Flow::add_note(const String &s, ErrorHandler *errh)
{
    if (_nnote >= NNOTE && output(errh) < 0)
	return -1;

    _note[_nnote].before_pkt = _npkt;
    _note[_nnote].pos = _note_text.length();
    _note_text << s;
    
    _nnote++;
    _note_count++;

    return 0;
}


ToIPFlowDumps::ToIPFlowDumps()
    : _nnoagg(0), _nagg(0), _agg_notifier(0), _task(this),
      _gc_timer(gc_hook, this), _compress_child(-1)
{
    for (int i = 0; i < NFLOWMAP; i++)
	_flowmap[i] = 0;
}

ToIPFlowDumps::~ToIPFlowDumps()
{
}

String
ToIPFlowDumps::output_pattern() const
{
    return (_gzip ? _filename_pattern + ".gz" : _filename_pattern);
}

int
ToIPFlowDumps::configure(Vector<String> &conf, ErrorHandler *errh)
{
    Element *e = 0;
    bool absolute_time = false, absolute_seq = false, binary = false, all_tcp_opt = false, tcp_opt = false, tcp_window = false, ip_id = false, gzip = false;
    _mincount = 0;
    
    if (cp_va_parse(conf, this, errh,
		    cpOptional,
		    cpFilename, "output filename pattern", &_filename_pattern,
		    cpKeywords,
		    "OUTPUT_PATTERN", cpFilename, "output filename pattern", &_filename_pattern,
		    "NOTIFIER", cpElement, "aggregate deletion notifier", &e,
		    "ABSOLUTE_TIME", cpBool, "print absolute timestamps?", &absolute_time,
		    "ABSOLUTE_SEQ", cpBool, "print absolute sequence numbers?", &absolute_seq,
		    "BINARY", cpBool, "output binary records?", &binary,
		    "ALL_TCP_OPT", cpBool, "output all TCP options?", &all_tcp_opt,
		    "TCP_OPT", cpBool, "output TCP options?", &tcp_opt,
		    "TCP_WINDOW", cpBool, "output TCP windows?", &tcp_window,
		    "GZIP", cpBool, "gzip output files?", &gzip,
		    "IP_ID", cpBool, "output IP IDs?", &ip_id,
		    "MINCOUNT", cpUnsigned, "output flows with at least this many packets", &_mincount,
		    cpEnd) < 0)
	return -1;

    if (!_filename_pattern)
	_filename_pattern = "-";
    if (find(_filename_pattern, '%') == _filename_pattern.end())
	errh->warning("OUTPUT_PATTERN has no %% escapes, so output files will get overwritten");
    
    if (e && !(_agg_notifier = (AggregateNotifier *)e->cast("AggregateNotifier")))
	return errh->error("%s is not an AggregateNotifier", e->name().c_str());

    _absolute_time = absolute_time;
    _absolute_seq = absolute_seq;
    _binary = binary;
    if (all_tcp_opt)
	_tcp_opt = IPSummaryDump::DO_TCPOPT_ALL_NOPAD;
    else if (tcp_opt)
	_tcp_opt = IPSummaryDump::DO_TCPOPT_MSS | IPSummaryDump::DO_TCPOPT_WSCALE | IPSummaryDump::DO_TCPOPT_SACK;
    else
	_tcp_opt = 0;
    _tcp_window = tcp_window;
    _ip_id = ip_id;
    _gzip = gzip;

    return 0;
}

extern "C" char **environ;

int
ToIPFlowDumps::add_compressable(const String &filename, ErrorHandler *errh)
{
    bool nowait = filename.length();
    static int arg_space = -1;

    // append current filename
    if (filename)
	_compressables.push_back(filename);
    if (_compressables.size() < 10 && nowait)
	return 0;

    // wait for current compression child
    if (_compress_child >= 0) {
	int status;
	int retval = waitpid(_compress_child, &status, (nowait ? WNOHANG : 0));
	if (retval == 0)
	    return 0;
	else if (retval < 0)
	    return errh->lerror(declaration(), "compressor: waitpid: %s;\ncancelling compression", strerror(errno));
	else {
	    _compress_child = -1;
	    if (WIFSIGNALED(status))
		return errh->lerror(declaration(), "compressor exited with signal %d;\ncancelling compression", WTERMSIG(status));
	    else if (!WIFEXITED(status) || WEXITSTATUS(status))
		return errh->lerror(declaration(), "compressor did not exit normally;\ncancelling compression");
	}
    }
    if (_compressables.size() == 0)
	return 0;

    // calculate maximum argument list size
    if (arg_space < 0) {
#ifdef _SC_ARG_MAX
      arg_space = sysconf(_SC_ARG_MAX);
#elif defined(ARG_MAX)
      arg_space = ARG_MAX;
#else
      arg_space = 1024;
#endif
      for (char **eptr = environ; *eptr; eptr++)
	arg_space -= strlen(*eptr) + 1;
      arg_space = (arg_space < 64 ? 1024 : arg_space - 32);
    }

    // fork child
    Vector<const char *> args;
    args.push_back("gzip");
    args.push_back("-f");
    int n = 0, my_arg_space = arg_space;
    for (int i = _compressables.size() - 1; i >= 0; i--, n++) {
	if (_compressables[i][0] == '-') // beware initial dashes
	    _compressables[i] = "./" + _compressables[i];
	my_arg_space -= _compressables[i].length() + 1; // not too long a line
	if (my_arg_space < 0)
	    break;
	args.push_back(_compressables[i].c_str());
    }
    args.push_back((const char *) 0);

    if ((_compress_child = fork()) == 0) {
	if (execvp("gzip", (char * const *) &args[0]) < 0) {
	    errh->lerror(declaration(), "gzip failed: %s", strerror(errno));
	    abort();
	}
    } else if (_compress_child < 0)
	errh->lerror(declaration(), "fork failed: %s;\ncancelling compression", strerror(errno));

    // remove old compressables
    _compressables.resize(_compressables.size() - n);

    // done
    if (n == 0)
	return errh->lerror(declaration(), "compressor failed: argument list too long; cancelling compression");
    else
	return 0;
}

void
ToIPFlowDumps::end_flow(Flow *f, ErrorHandler *errh)
{
    if (f->npackets() >= _mincount) {
	f->output(errh);
	if (_gzip && f->filename() != "-")
	    if (add_compressable(f->filename(), errh) < 0)
		_gzip = false;
    } else
	f->unlink(errh);
    delete f;
    _nflows--;
}

void
ToIPFlowDumps::cleanup(CleanupStage)
{
    ErrorHandler *errh = ErrorHandler::default_handler();
    for (int i = 0; i < NFLOWMAP; i++)
	while (Flow *f = _flowmap[i]) {
	    _flowmap[i] = f->next();
	    end_flow(f, errh);
	}
    if (_nnoagg > 0 && _nagg == 0)
	errh->lwarning(declaration(), "saw no packets with aggregate annotations");
    while ((_compress_child >= 0 || _compressables.size())
	   && add_compressable("", errh) >= 0)
	/* nada */;
}

int
ToIPFlowDumps::initialize(ErrorHandler *errh)
{
    if (input_is_pull(0) && noutputs() == 0) {
	ScheduleInfo::join_scheduler(this, &_task, errh);
	_signal = Notifier::upstream_empty_signal(this, 0, &_task);
    }
    if (_agg_notifier)
	_agg_notifier->add_listener(this);
    _gc_timer.initialize(this);
    return 0;
}

String
ToIPFlowDumps::expand_filename(const Packet *pkt, ErrorHandler *errh) const
{
    const char *data = _filename_pattern.data();
    int len = _filename_pattern.length();
    StringAccum sa;
    
    for (int p = 0; p < len; p++)
	if (data[p] == '%') {
	    p++;
	    bool zero_pad = false;
	    int field_width = -1;
	    int precision = -1;
	    if (p < len && data[p] == '0')
		zero_pad = true, p++;
	    if (p < len && isdigit(data[p])) {
		field_width = data[p] - '0';
		for (p++; p < len && isdigit(data[p]); p++)
		    field_width = (field_width * 10) + data[p] - '0';
	    }
	    if (p < len && data[p] == '.') {
		precision = 0;
		for (p++; p < len && isdigit(data[p]); p++)
		    precision = (precision * 10) + data[p] - '0';
	    }
	    
	    StringAccum subsa;
	    if (p >= len)
		errh->error("bad filename pattern");
	    else if (data[p] == 'n' || data[p] == 'x' || data[p] == 'X') {
		char format[3] = "%d";
		if (data[p] != 'n')
		    format[1] = data[p];
		uint32_t value = AGGREGATE_ANNO(pkt);
		if (precision >= 0 && precision <= 3)
		    value = (value >> ((3 - precision) * 8)) & 255;
		else if (precision >= 4 && precision <= 5)
		    value = (value >> ((5 - precision) * 16)) & 65535;
		subsa.snprintf(20, format, value);
	    } else if (data[p] == 's' && (precision < 0 || precision > 3))
		subsa << IPAddress(pkt->ip_header()->ip_src);
	    else if (data[p] == 's')
		subsa << ((ntohl(pkt->ip_header()->ip_src.s_addr) >> ((3 - precision) * 8)) & 255);
	    else if (data[p] == 'd' && (precision < 0 || precision > 3))
		subsa << IPAddress(pkt->ip_header()->ip_dst);
	    else if (data[p] == 'd')
		subsa << ((ntohl(pkt->ip_header()->ip_dst.s_addr) >> ((3 - precision) * 8)) & 255);
	    else if (data[p] == 'S')
		subsa << ntohs(pkt->tcp_header()->th_sport);
	    else if (data[p] == 'D')
		subsa << ntohs(pkt->tcp_header()->th_dport);
	    else if (data[p] == 'p')
		subsa << (pkt->ip_header()->ip_p == IP_PROTO_TCP ? 'T' : 'U');
	    else if (data[p] == '%')
		subsa << '%';
	    else
		errh->error("bad filename pattern `%%%c'", data[p]);
	    
	    if (field_width >= 0 && subsa.length() < field_width)
		for (int l = field_width - subsa.length(); l > 0; l--)
		    sa << (zero_pad ? '0' : '_');
	    sa << subsa;
	} else
	    sa << _filename_pattern[p];

    return sa.take_string();
}

ToIPFlowDumps::Flow *
ToIPFlowDumps::find_aggregate(uint32_t agg, const Packet *p)
{
    if (agg == 0)
	return 0;
    
    int bucket = (agg & (NFLOWMAP - 1));
    Flow *prev = 0, *f = _flowmap[bucket];
    while (f && f->aggregate() != agg) {
	prev = f;
	f = f->next();
    }

    if (f)
	/* nada */;
    else if (p && (f = new Flow(p, expand_filename(p, ErrorHandler::default_handler()), _absolute_time, _absolute_seq, _binary, _ip_id, _tcp_opt, _tcp_window))) {
	prev = f;
	_nflows++;
    } else
	return 0;

    if (prev) {
	prev->set_next(f->next());
	f->set_next(_flowmap[bucket]);
	_flowmap[bucket] = f;
    }
    
    return f;
}

inline void
ToIPFlowDumps::smaction(Packet *p)
{
    if (Flow *f = find_aggregate(AGGREGATE_ANNO(p), p)) {
	_nagg++;
	f->add_pkt(p, ErrorHandler::default_handler());
    } else
	_nnoagg++;
}

void
ToIPFlowDumps::push(int, Packet *p)
{
    smaction(p);
    checked_output_push(0, p);
}

Packet *
ToIPFlowDumps::pull(int)
{
    if (Packet *p = input(0).pull()) {
	smaction(p);
	return p;
    } else
	return 0;
}

bool
ToIPFlowDumps::run_task()
{
    Packet *p = input(0).pull();
    if (p) {
	smaction(p);
	p->kill();
    } else if (!_signal)
	return false;
    _task.fast_reschedule();
    return p != 0;
}

void
ToIPFlowDumps::add_note(uint32_t agg, const String &s, ErrorHandler *errh)
{
    if (Flow *f = find_aggregate(agg, 0))
	f->add_note(s, (errh ? errh : ErrorHandler::default_handler()));
    else if (errh)
	errh->warning("aggregate not found");
}

void
ToIPFlowDumps::aggregate_notify(uint32_t agg, AggregateEvent event, const Packet *)
{
    if (event == DELETE_AGG && find_aggregate(agg, 0)) {
	_gc_aggs.push_back(agg);
	_gc_aggs.push_back(click_jiffies());
	if (!_gc_timer.scheduled())
	    _gc_timer.schedule_after_msec(250);
    }
}

void
ToIPFlowDumps::gc_hook(Timer *t, void *thunk)
{
    ToIPFlowDumps *td = static_cast<ToIPFlowDumps *>(thunk);
    uint32_t limit_jiff = click_jiffies() - (CLICK_HZ / 4);
    int i;
    for (i = 0; i < td->_gc_aggs.size() && SEQ_LEQ(td->_gc_aggs[i+1], limit_jiff); i += 2)
	if (Flow *f = td->find_aggregate(td->_gc_aggs[i], 0)) {
	    int bucket = (f->aggregate() & (NFLOWMAP - 1));
	    assert(td->_flowmap[bucket] == f);
	    td->_flowmap[bucket] = f->next();
	    td->end_flow(f, ErrorHandler::default_handler());
	}
    if (i < td->_gc_aggs.size()) {
	td->_gc_aggs.erase(td->_gc_aggs.begin(), td->_gc_aggs.begin() + i);
	t->schedule_after_msec(250);
    }
}

enum { H_CLEAR };

int
ToIPFlowDumps::write_handler(const String &, Element *e, void *thunk, ErrorHandler *errh)
{
    ToIPFlowDumps *td = static_cast<ToIPFlowDumps *>(e);
    switch ((intptr_t)thunk) {
      case H_CLEAR:
	for (int i = 0; i < NFLOWMAP; i++)
	    while (Flow *f = td->_flowmap[i]) {
		td->_flowmap[i] = f->next();
		td->end_flow(f, errh);
	    }
	return 0;
      default:
	return -1;
    }
}

void
ToIPFlowDumps::add_handlers()
{
    add_write_handler("clear", write_handler, (void *)H_CLEAR);
}

CLICK_ENDDECLS
ELEMENT_REQUIRES(userlevel AggregateNotifier IPSummaryDump_TCP)
EXPORT_ELEMENT(ToIPFlowDumps)


syntax highlighted by Code2HTML, v. 0.9.1