// -*- mode: c++; c-basic-offset: 4 -*-
/*
 * fromflandump.{cc,hh} -- element reads packets from Chuck Blake's Flan file
 * Eddie Kohler
 *
 * Copyright (c) 2002 International Computer Science Institute
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the "Software"),
 * to deal in the Software without restriction, subject to the conditions
 * listed in the Click LICENSE file. These conditions include: you must
 * preserve this copyright notice, and you cannot mention the copyright
 * holders in advertising related to the Software without their permission.
 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
 * notice is a summary of the Click LICENSE file; the license in that file is
 * legally binding.
 */

#include <click/config.h>
#include "fromflandump.hh"
#include <click/confparse.hh>
#include <click/router.hh>
#include <click/standard/scheduleinfo.hh>
#include <click/error.hh>
#include <click/glue.hh>
#include <click/handlercall.hh>
#include <click/packet_anno.hh>
#include <clicknet/rfc1483.h>
#include <click/userutils.hh>
#include "elements/userlevel/fakepcap.hh"
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#ifdef ALLOW_MMAP
#include <sys/mman.h>
#endif

#define	SWAPLONG(y) \
	((((y)&0xff)<<24) | (((y)&0xff00)<<8) | (((y)&0xff0000)>>8) | (((y)>>24)&0xff))
#define	SWAPSHORT(y) \
	( (((y)&0xff)<<8) | ((u_short)((y)&0xff00)>>8) )

FromFlanDump::FromFlanDump()
    : _task(this)
{
    for (int i = 0; i < FF_LAST; i++)
	_ff[i] = 0;
}

FromFlanDump::~FromFlanDump()
{
}

int
FromFlanDump::configure(Vector<String> &conf, ErrorHandler *errh)
{
    bool stop = false, active = true;
    bool have_packets = false, packets, have_flows = false, flows;
    
    if (cp_va_parse(conf, this, errh,
		    cpFilename, "dump directory name", &_dirname,
		    cpKeywords,
		    "STOP", cpBool, "stop driver when done?", &stop,
		    "ACTIVE", cpBool, "start active?", &active,
		    cpConfirmKeywords,
		    "PACKETS", cpBool, "output packets?", &have_packets, &packets,
		    "FLOWS", cpBool, "output flows?", &have_flows, &flows,
		    cpEnd) < 0)
	return -1;

    // check packets vs. flows
    if (have_packets && have_flows && packets == flows)
	return errh->error("can't specify both PACKETS and FLOWS");
    else if (have_packets)
	flows = !packets;
    else if (!have_packets && !have_flows)
	flows = true;
    _flows = flows;

    // set other variables
    _stop = stop;
    _active = active;
    return 0;
}

void
FromFlanDump::FlanFile::FlanFile()
    : _fd(-1), _buffer(0), _offset(0), _len(0), _pipe(0), _my_buffer(true)
{
}

void
FromFlanDump::FlanFile::~FlanFile()
{
    if (_pipe)
	pclose(_pipe);
    else if (_fd >= 0)
	close(_fd);
    if (_my_buffer)
	delete[] _buffer;
}

int
FromFlanDump::FlanFile::open(const String &basename, const String &filename, int record_size, ErrorHandler *errh)
{
    assert(!_pipe && _fd < 0);
    _record_size = record_size;
    
    String path = basename;
    if (basename.back() != '/')
	path += "/";
    path += filename;

    // look for compressed versions
    if (access(path.c_str(), R_OK) >= 0) {
	_fd = open(path.c_str(), O_RDONLY);
	if (_fd < 0)
	    return errh->error("%s: %s", path.c_str(), strerror(errno));
    } else if (access((path + ".gz").c_str(), R_OK) >= 0) {
	char buf[3] = "\037\213";
	_pipe = open_uncompress_pipe(path + ".gz", (unsigned char *)buf, 2, errh);
	return (_fd = (_pipe ? fileno(_pipe) : -1));
    } else if (access((path + ".bz2").c_str(), R_OK) >= 0) {
	char buf[3] = "BZh";
	_pipe = open_uncompress_pipe(path + ".bz2", (unsigned char *)buf, 3, errh);
	return (_fd = (_pipe ? fileno(_pipe) : -1));
    } else
	return errh->error("%s: no such file", path.c_str());
}

int
FromFlanDump::FlanFile::read_more(off_t start_off)
{
    if (_offset + _len != start_off)
	if (lseek(_fd, start_off, SEEK_SET) == (off_t)-1) {
	    _offset = _len = 0;
	    // XXX
	    return -1;
	}

    // read data
    if (!_buffer && !(_buffer = new uint8_t[BUFFER_SIZE]))
	return -ENOMEM;

    _offset += _len;
    _len = 0;
    
    while (_len < BUFFER_SIZE) {
	ssize_t got = read(_fd, _buffer + _len, BUFFER_SIZE - _len);
	if (got > 0)
	    _len += got;
	else if (got == 0)	// premature end of file
	    return 0;
	else if (got < 0 && errno != EINTR && errno != EAGAIN)
	    return -1; //XXX error_helper(errh, strerror(errno));
    }

    return 0;
}

int
FromFlanDump::error_helper(ErrorHandler *errh, const char *x)
{
    if (errh)
	errh->error("%s: %s", _dirname.c_str(), x);
    else
	click_chatter("%s: %s", declaration().c_str(), x);
    return -1;
}

int
FromFlanDump::initialize(ErrorHandler *errh)
{
    
    if (_filename == "-") {
	_fd = STDIN_FILENO;
	_filename = "<stdin>";
    } else
	_fd = open(_filename.c_str(), O_RDONLY);
    if (_fd < 0)
	return errh->error("%s: %s", _filename.c_str(), strerror(errno));

  retry_file:
#ifdef ALLOW_MMAP
    _mmap_unit = 0;
#endif
    _file_offset = 0;
    int result = read_buffer(errh);
    if (result < 0)
	return -1;
    else if (result == 0)
	return errh->error("%s: empty file", _filename.c_str());

    // check for a gziped or bzip2d dump
    if (_fd == STDIN_FILENO || _pipe)
	/* cannot handle gzip or bzip2 */;
    else if (compressed_data(_buffer, _len)
	     && (_len < DAGCell::PAYLOAD_OFFSET + RFC1483_SNAP_EXPECTED_LEN
		 || memcmp(_buffer + DAGCell::PAYLOAD_OFFSET, RFC1483_SNAP_EXPECTED, RFC1483_SNAP_EXPECTED_LEN) != 0)) {
	close(_fd);
	_fd = -1;
	if (!(_pipe = open_uncompress_pipe(_filename, _buffer, _len, errh)))
	    return -1;
	_fd = fileno(_pipe);
	goto retry_file;
    }
    
    // if forcing IP packets, check datalink type to ensure we understand it
    if (_force_ip) {
	if (!fake_pcap_dlt_force_ipable(_linktype))
	    return errh->error("%s: unknown linktype %d; can't force IP packets", _filename.c_str(), _linktype);
	if (_timing)
	    return errh->error("FORCE_IP and TIMING options are incompatible");
    }

    // check handler call
    if (_last_time_h && _last_time_h->initialize_write(this, errh) < 0)
	return -1;
    
    // try reading a packet
    _pos = 0;
    if (read_packet(errh)) {
	struct timeval now;
	click_gettimeofday(&now);
	timersub(&now, &_packet->timestamp_anno(), &_time_offset);
    }

    if (output_is_push(0))
	ScheduleInfo::initialize_task(this, &_task, _active, errh);
    return 0;
}

void
FromFlanDump::cleanup(CleanupStage)
{
    for (int i = 0; i < FF_LAST; i++)
	delete _ff[i];
}

void
FromFlanDump::set_active(bool active)
{
    if (_active != active) {
	_active = active;
	if (active && output_is_push(0) && !_task.scheduled())
	    _task.reschedule();
    }
}


Packet *
FromFlanDump::read_flow_packet()
{
    if (_record >= _last_record) {
	assert(_record == _last_record);
	_last_record = 0;
	for (int i = FF_FIRST_FLOW; i < FF_LAST_FLOW; i++)
	    if (_ff[i]) {
		_ff[i]->read_more(_record);
		if (_last_record == 0 || _ff[i]->last_record() < _last_record)
		    _last_record = _ff[i]->last_record();
	    }
	if (_record >= _last_record)
	    return 0;
    }

    WritablePacket *q = Packet::make(0, sizeof(click_ip) + sizeof(click_tcp));
    if (!q) {
	error_helper("out of memory!");
	return 0;
    }
    
    q->set_network_header(q->data(), sizeof(click_ip));
    click_ip *iph = q->ip_header();
    iph->ip_v = 4;
    iph->ip_hl = (sizeof(click_ip) >> 2);
    if (_ff[FF_SADDR])
	iph->ip_src.s_addr = _ff[FF_SADDR]->read_uint32(_record);
    if (_ff[FF_DADDR])
	iph->ip_dst.s_addr = _ff[FF_DADDR]->read_uint32(_record);

    click_tcp *tcph = q->tcp_header();
    if (_ff[FF_SPORT])
	tcph->th_sport = _ff[FF_SPORT]->read_uint16(_record);
    if (_ff[FF_DPORT])
	tcph->th_dport = _ff[FF_DPORT]->read_uint16(_record);
    
}

bool
FromFlanDump::read_packet(ErrorHandler *errh)
{
    const DAGCell *cell;
    static DAGCell static_cell;
    struct timeval tv;
    Packet *p;
    bool more = true;
    _packet = 0;

  retry:
    // quit if we sampled or force_ip failed, but we are no longer active
    if (!more)
	return false;
    
    // we may need to read bits of the file
    if (_pos + sizeof(DAGCell) <= _len) {
	cell = reinterpret_cast<const DAGCell *>(_buffer + _pos);
	_pos += sizeof(DAGCell);
    } else {
	cell = &static_cell;
	if (read_into(&static_cell, sizeof(DAGCell), errh) < (int)sizeof(DAGCell))
	    return false;
    }

    // check times
  check_times:
    stamp_to_timeval(swapq(cell->timestamp), tv);
    if (!_have_any_times)
	prepare_times(tv);
    if (_have_first_time) {
	if (timercmp(&tv, &_first_time, <))
	    goto retry;
	else
	    _have_first_time = false;
    }
    if (_have_last_time && !timercmp(&tv, &_last_time, <)) {
	_have_last_time = false;
	(void) _last_time_h->call_write(this, errh);
	if (!_active)
	    more = false;
	// retry _last_time in case someone changed it
	goto check_times;
    }
    
    // checking sampling probability
    if (_sampling_prob < (1 << SAMPLING_SHIFT)
	&& (uint32_t)(random() & ((1<<SAMPLING_SHIFT)-1)) >= _sampling_prob)
	goto retry;
    
    // create packet
    if (cell != &static_cell) {
	p = _data_packet->clone();
	if (!p) {
	    error_helper(errh, "out of memory!");
	    return false;
	}
	p->shrink_packet(_pos - sizeof(DAGCell) + DAGCell::PAYLOAD_OFFSET, sizeof(DAGCell) - DAGCell::PAYLOAD_OFFSET);
	p->set_timestamp_anno(tv);
	
    } else {
	WritablePacket *wp = Packet::make(0, 0, sizeof(DAGCell) - DAGCell::PAYLOAD_OFFSET, 0);
	if (!wp) {
	    error_helper(errh, "out of memory!");
	    return false;
	}
	memcpy(wp->data(), &cell->payload, sizeof(cell->payload));
	wp->set_timestamp_anno(tv);
	
	p = wp;
    }

    if (_force_ip && !fake_pcap_force_ip(p, _linktype)) {
	checked_output_push(1, p);
	goto retry;
    }

    _packet = p;
    return more;
}

bool
FromFlanDump::run_task()
{
    if (!_active)
	return;

    bool more;
    if (_packet || read_packet(0)) {
	if (_timing) {
	    struct timeval now;
	    click_gettimeofday(&now);
	    timersub(&now, &_time_offset, &now);
	    if (timercmp(&_packet->timestamp_anno(), &now, >)) {
		_task.fast_reschedule();
		return;
	    }
	}
	output(0).push(_packet);
	more = read_packet(0);
    } else
	more = false;

    if (more)
	_task.fast_reschedule();
    else if (_stop)
	router()->please_stop_driver();
}

Packet *
FromFlanDump::pull(int)
{
    if (!_active)
	return 0;

    bool more;
    Packet *p;
    if (_packet || read_packet(0)) {
	if (_timing) {
	    struct timeval now;
	    click_gettimeofday(&now);
	    timersub(&now, &_time_offset, &now);
	    if (timercmp(&_packet->timestamp_anno(), &now, >))
		return 0;
	}
	p = _packet;
	more = read_packet(0);
    } else {
	p = 0;
	more = false;
    }

    if (!more && _stop)
	router()->please_stop_driver();
    return p;
}

enum {
    ACTIVE_THUNK, STOP_THUNK, FILESIZE_THUNK, FILEPOS_THUNK
};

String
FromFlanDump::read_handler(Element *e, void *thunk)
{
    FromFlanDump *fd = static_cast<FromFlanDump *>(e);
    switch ((intptr_t)thunk) {
      case ACTIVE_THUNK:
	return cp_unparse_bool(fd->_active);
      case FILESIZE_THUNK: {
	  struct stat s;
	  if (fd->_fd >= 0 && fstat(fd->_fd, &s) >= 0 && S_ISREG(s.st_mode))
	      return String(s.st_size);
	  else
	      return "-";
      }
      case FILEPOS_THUNK:
	return String(fd->_file_offset + fd->_pos);
      default:
	return "<error>";
    }
}

int
FromFlanDump::write_handler(const String &s_in, Element *e, void *thunk, ErrorHandler *errh)
{
    FromFlanDump *fd = static_cast<FromFlanDump *>(e);
    String s = cp_uncomment(s_in);
    switch ((intptr_t)thunk) {
      case ACTIVE_THUNK: {
	  bool active;
	  if (cp_bool(s, &active)) {
	      fd->set_active(active);
	      return 0;
	  } else
	      return errh->error("`active' should be Boolean");
      }
      case STOP_THUNK:
	fd->set_active(false);
	fd->router()->please_stop_driver();
	return 0;
      default:
	return -EINVAL;
    }
}

void
FromFlanDump::add_handlers()
{
    add_read_handler("active", read_handler, (void *)ACTIVE_THUNK);
    add_write_handler("active", write_handler, (void *)ACTIVE_THUNK);
    add_write_handler("stop", write_handler, (void *)STOP_THUNK);
    add_read_handler("filesize", read_handler, (void *)FILESIZE_THUNK);
    add_read_handler("filepos", read_handler, (void *)FILEPOS_THUNK);
    if (output_is_push(0))
	add_task_handlers(&_task);
}

ELEMENT_REQUIRES(userlevel int64 false)
EXPORT_ELEMENT(FromFlanDump)


syntax highlighted by Code2HTML, v. 0.9.1