/*-GNU-GPL-BEGIN-*
nepim - network pipemeter
Copyright (C) 2005 Everton da Silva Marques

nepim is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.

nepim is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with nepim; see the file COPYING.  If not, write to
the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*-GNU-GPL-END-*/


/* $Id: client.c,v 1.37 2005/09/22 10:24:56 evertonm Exp $ */

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <assert.h>

#include "conf.h"
#include "sock.h"
#include "pipe.h"
#include "common.h"
#include "usock.h"
#include "str.h"

extern nepim_pipe_set_t pipes;    /* from server.c */
extern nepim_slot_set_t slots;    /* from server.c */
extern nepim_usock_set_t udp_tab; /* from server.c */

static void *on_tcp_interval(oop_source *src, struct timeval tv, void *user);
static void tcp_pipe_stop(int sd);
static void *on_tcp_rate_delay(oop_source *src, struct timeval tv, void *user);

static void schedule_stat_interval(int sd)
{
  nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd);
  nepim_session_t *session;
  assert(pipe);
  session = &pipe->session;

  {
    int result = gettimeofday(&session->tv_interval, 0);
    assert(!result);
  }

  session->tv_interval.tv_sec += session->stat_interval;

  nepim_global.oop_src->on_time(nepim_global.oop_src,
				session->tv_interval,
				on_tcp_interval, pipe);
}

static void *on_tcp_read(oop_source *src, int sd,
                         oop_event event, void *user)
{
  char buf[nepim_global.tcp_read_size];
  int rd;
  nepim_pipe_t *pipe = user;
  nepim_session_t *session;

  assert(event == OOP_READ);
  assert(sd == pipe->sd);

  assert(sizeof(buf) == nepim_global.tcp_read_size);

  errno = 0;

  rd = read(sd, buf, nepim_global.tcp_read_size);
  if (rd <= 0) {
    if (rd) {
      switch (errno) {
      case EINTR:
	fprintf(stderr, "read: EINTR on TCP socket %d\n", sd);
	return OOP_CONTINUE;
      case EAGAIN:
	fprintf(stderr, "read: EAGAIN on TCP socket %d\n", sd);
	return OOP_CONTINUE;
      default:
	fprintf(stderr, "read: errno=%d: %s\n",
		errno, strerror(errno));
      }
    }

    fprintf(stderr, "read: connection lost on TCP socket %d\n", sd);

    report_broken_pipe_stat(stdout, pipe);

    tcp_pipe_stop(sd);
    close(sd);

    return OOP_CONTINUE;
  }

  assert(rd >= 0);
  assert(rd <= sizeof(buf));

  session = &pipe->session;

  nepim_session_read_add(session, rd);

  return OOP_CONTINUE;
}

static void *on_tcp_write(oop_source *src, int sd,
                          oop_event event, void *user)
{
  char buf[nepim_global.tcp_write_size];
  int wr;
  nepim_pipe_t *pipe = user;
  nepim_session_t *session = &pipe->session;

  assert(event == OOP_WRITE);
  assert(sd == pipe->sd);
  assert(session->max_bit_rate < 1);
  assert(session->max_pkt_rate < 1);

  assert(sizeof(buf) == nepim_global.tcp_write_size);

  wr = write(sd, buf, nepim_global.tcp_write_size);
  if (wr < 0) {
    switch (errno) {
    case EINTR:
      fprintf(stderr, "write: EINTR on TCP socket %d\n", sd);
      return OOP_CONTINUE;
    case EAGAIN:
      fprintf(stderr, "write: EAGAIN on TCP socket %d\n", sd);
      return OOP_CONTINUE;
    case EPIPE:
      fprintf(stderr, "write: EPIPE on TCP socket %d\n", sd);
      break;
    }

    fprintf(stderr, "write: connection lost on TCP socket %d\n", sd);

    report_broken_pipe_stat(stdout, pipe);

    tcp_pipe_stop(sd);
    close(sd);

    return OOP_CONTINUE;
  }

  assert(wr >= 0);
  assert(wr <= sizeof(buf));

  nepim_session_write_add(session, wr);

  return OOP_CONTINUE;
}

static void *on_tcp_rate_write(oop_source *src, int sd,
			       oop_event event, void *user)
{
  char buf[nepim_global.tcp_write_size];
  int wr;
  nepim_pipe_t *pipe = user;
  nepim_session_t *session = &pipe->session;
  int to_write;

  assert(event == OOP_WRITE);
  assert(sd == pipe->sd);
#ifndef NDEBUG
  {
    if (session->max_bit_rate <= 0) {
      assert(session->max_pkt_rate > 0);
    }
    if (session->max_pkt_rate <= 0) {
      assert(session->max_bit_rate > 0);
    }
  }
#endif /* NDEBUG */
  
  assert(sizeof(buf) == nepim_global.tcp_write_size);
  
#ifndef NDEBUG
  {
    if (session->rate_remaining <= 0) {
      assert(session->pkt_remaining > 0);
    }
    if (session->pkt_remaining <= 0) {
      assert(session->rate_remaining > 0);
    }
  }
#endif /* NDEBUG */

  if (session->rate_remaining > 0)
    to_write = NEPIM_MIN(session->rate_remaining, sizeof(buf));
  else
    to_write = sizeof(buf);

  assert(to_write >= 0);
  assert(to_write <= sizeof(buf));
  
  wr = write(sd, buf, to_write);
  if (wr < 0) {
    switch (errno) {
    case EINTR:
      fprintf(stderr, "rate_write: EINTR on TCP socket %d\n", sd);
      return OOP_CONTINUE;
    case EAGAIN:
      fprintf(stderr, "rate_write: EAGAIN on TCP socket %d\n", sd);
      return OOP_CONTINUE;
    case EPIPE:
      fprintf(stderr, "rate_write: EPIPE on TCP socket %d\n", sd);
      break;
    }

    fprintf(stderr, "rate_write: connection lost on TCP socket %d\n", sd);

    report_broken_pipe_stat(stdout, pipe);

    tcp_pipe_stop(sd);
    close(sd);

    return OOP_CONTINUE;
  }

  assert(wr >= 0);
  assert(wr <= sizeof(buf));
  assert(wr <= to_write);

  session->rate_remaining -= wr;
  --session->pkt_remaining;

  nepim_session_write_add(session, wr);

  /* finished ? */
  if ( 
      ((session->max_bit_rate > 0) && (session->rate_remaining < 1))
      ||
      ((session->max_pkt_rate > 0) && (session->pkt_remaining < 1))
      ){
    
    /* stop writing */
    nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
				    sd, OOP_WRITE);

    /* schedule for next time saved by on_tcp_rate_delay() */
    nepim_global.oop_src->on_time(nepim_global.oop_src,
				  session->tv_send_rate,
				  on_tcp_rate_delay, pipe);
  }

  return OOP_CONTINUE;
}

static void *on_tcp_rate_delay(oop_source *src, struct timeval tv, void *user)
{
  nepim_pipe_t *pipe = user;
  nepim_session_t *session = &pipe->session;

  assert(timercmp(&tv, &session->tv_send_rate, ==));
#ifndef NDEBUG
 {
   if (session->max_bit_rate <= 0) {
     assert(session->max_pkt_rate > 0);
   }
   if (session->max_pkt_rate <= 0) {
     assert(session->max_bit_rate > 0);
   }
 }
#endif /* NDEBUG */

  /* save next scheduling time */
  {
    int result = gettimeofday(&session->tv_send_rate, 0);
    assert(!result);
  }
  nepim_timer_usec_add(&session->tv_send_rate, session->write_delay);

  /* calculate bytes to be written from rate */
  session->rate_remaining = nepim_bps2bytes(session->max_bit_rate,
					    session->write_delay);
  session->pkt_remaining = nepim_pps2packets(session->max_pkt_rate,
					     session->write_delay);

#ifndef NDEBUG
 {
   if (session->rate_remaining <= 0) {
     assert(session->pkt_remaining > 0);
   }
   if (session->pkt_remaining <= 0) {
     assert(session->rate_remaining > 0);
   }
 }
#endif /* NDEBUG */

  /* start to write */
  nepim_global.oop_src->on_fd(nepim_global.oop_src,
			      pipe->sd, OOP_WRITE,
			      on_tcp_rate_write, pipe);

  return OOP_CONTINUE;
}

static void *on_tcp_duration(oop_source *src, struct timeval tv, void *user)
{
  nepim_pipe_t *pipe = user;
  nepim_session_t *session = &pipe->session;

  assert(timercmp(&tv, &session->tv_duration, ==));

  nepim_pipe_stat(stdout, NEPIM_LABEL_TOTAL, pipe->sd, 
		  session->byte_total_recv,
		  session->byte_total_sent,
		  session->test_duration,
		  session->tv_start.tv_sec,
		  session->test_duration,
		  session->total_reads,
		  session->total_writes);

  session->byte_total_recv = 0;
  session->byte_total_sent = 0;
  session->total_reads     = 0;
  session->total_writes    = 0;

  tcp_pipe_stop(pipe->sd);
  close(pipe->sd);

  return OOP_CONTINUE;
}

static void *on_tcp_interval(oop_source *src, struct timeval tv, void *user)
{
  nepim_pipe_t *pipe = user;
  nepim_session_t *session = &pipe->session;

  assert(timercmp(&tv, &session->tv_interval, ==));

  nepim_pipe_stat(stdout, NEPIM_LABEL_PARTIAL, pipe->sd, 
		  session->byte_interval_recv,
		  session->byte_interval_sent,
		  session->stat_interval,
		  session->tv_start.tv_sec,
		  session->test_duration,
		  session->interval_reads,
		  session->interval_writes);

  session->byte_interval_recv = 0;
  session->byte_interval_sent = 0;
  session->interval_reads     = 0;
  session->interval_writes    = 0;

  schedule_stat_interval(pipe->sd);

  return OOP_CONTINUE;
}

static int greet(int sd)
{
  char buf[1024];
  nepim_greet_t opt;
  int pr;
  int wr;
  char *tmp = "";

  opt.must_send             = nepim_global.duplex_mode ||
    !nepim_global.simplex_client_send;
  opt.bit_rate              = nepim_global.bit_rate;
  opt.pkt_rate              = nepim_global.pkt_rate;
  opt.stat_interval         = nepim_global.stat_interval;
  opt.test_duration         = nepim_global.test_duration;
  opt.write_delay           = nepim_global.write_delay;
  opt.udp_keepalive_send    = -1;
  opt.udp_keepalive_require = -1;

  if (nepim_global.password) {
    opt.password_buf        = nepim_global.password;
    opt.password_buf_size   = strlen(nepim_global.password) + 1;
  }
  else {
    opt.password_buf        = tmp;
    opt.password_buf_size   = strlen(tmp) + 1;
  }

  pr = nepim_write_greetings(&opt, buf, sizeof(buf));
  if (pr < 0)
    return -1;
  assert(pr > 0);

  fprintf(stderr, "%d: sending: %s", sd, buf);

  wr = write(sd, buf, pr);
  if (wr != pr)
    return -1;

  return 0;
}

static void tcp_pipe_start(int sd)
{
  nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd);
  nepim_session_t *session;
  assert(pipe);

  session = &pipe->session;

  nepim_global.oop_src->on_fd(nepim_global.oop_src,
			      sd, OOP_READ,
			      on_tcp_read, pipe);

  if (session->must_send) {
    if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
      session->tv_send_rate = OOP_TIME_NOW;
      nepim_global.oop_src->on_time(nepim_global.oop_src,
				    session->tv_send_rate,
				    on_tcp_rate_delay, pipe);
    }
    else {
      nepim_global.oop_src->on_fd(nepim_global.oop_src,
				  sd, OOP_WRITE,
				  on_tcp_write, pipe);
    }
  }

  {
    int result = gettimeofday(&session->tv_start, 0);
    assert(!result);
    session->tv_duration = session->tv_start;
  }
  session->tv_duration.tv_sec += session->test_duration;
  nepim_global.oop_src->on_time(nepim_global.oop_src,
				session->tv_duration,
				on_tcp_duration, pipe);

  nepim_sock_show_opt(stderr, sd);

  schedule_stat_interval(sd);
}

static void tcp_pipe_stop(int sd)
{
  nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd);
  nepim_session_t *session;
  assert(pipe);

  session = &pipe->session;

  nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
				  sd, OOP_READ);

  if (session->must_send) {
    if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
      /* stop current writing, if any */
      nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
				      sd, OOP_WRITE);

      /* stop periodic write scheduler */
      nepim_global.oop_src->cancel_time(nepim_global.oop_src,
					session->tv_send_rate,
					on_tcp_rate_delay, pipe);
    }
    else {
      nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
				      sd, OOP_WRITE);
    }
  }

  nepim_global.oop_src->cancel_time(nepim_global.oop_src,
				    session->tv_duration,
				    on_tcp_duration, pipe);

  nepim_global.oop_src->cancel_time(nepim_global.oop_src,
				    session->tv_interval,
				    on_tcp_interval, pipe);

  nepim_pipe_set_del(&pipes, sd);

  nepim_sock_show_opt(stderr, sd);
}

static void spawn_one_tcp_client(const char *hostname, const char *portname)
{
  struct addrinfo hints;
  struct addrinfo *ai_res;
  struct addrinfo *ai;
  int result;

  memset(&hints, 0, sizeof(hints));

  hints.ai_socktype = SOCK_STREAM;
  hints.ai_protocol = IPPROTO_TCP;
  hints.ai_flags = AI_CANONNAME;
  hints.ai_family = PF_UNSPEC;
  hints.ai_addrlen = 0;
  hints.ai_addr = 0;
  hints.ai_canonname = 0;

  fprintf(stderr, 
	  "TCP socket solving %s,%s\n",
	  hostname, portname);

  result = getaddrinfo(hostname, portname,
		       &hints, &ai_res);
  if (result) {
    fprintf(stderr, "%s: getaddrinfo(%s,%s): %s\n",
            __PRETTY_FUNCTION__, hostname, portname,
	    gai_strerror(result));
    return;
  }

  for (ai = ai_res; ai; ai = ai->ai_next) {
    int sd;

    fprintf(stderr, 
	    "TCP socket trying %s,%s\n",
	    hostname, portname);

    if (nepim_global.no_inet6 && (ai->ai_family == PF_INET6))
      continue;

    if (nepim_global.no_inet4 && (ai->ai_family == PF_INET))
      continue;

    sd = nepim_connect_client_socket(ai->ai_addr, ai->ai_addrlen,
				     ai->ai_family, ai->ai_socktype, 
				     ai->ai_protocol,
				     nepim_global.pmtu_mode,
				     nepim_global.ttl,
				     nepim_global.win_recv,
				     nepim_global.win_send);
    if (sd < 0) {
      fprintf(stderr, 
	      "could not connect TCP socket to %s,%s: %d\n",
	      hostname, portname, sd);
      continue;
    }

    fprintf(stderr, 
	    "%d: TCP socket connected to: "
	    "host=%s,%s len=%d family=%d type=%d proto=%d\n",
	    sd, hostname, portname, 
	    ai->ai_addrlen, ai->ai_family, 
	    ai->ai_socktype, ai->ai_protocol);

    {
      nepim_greet_t opt;

      opt.must_send             = nepim_global.duplex_mode ||
	nepim_global.simplex_client_send;
      opt.bit_rate              = nepim_global.bit_rate;
      opt.pkt_rate              = nepim_global.pkt_rate;
      opt.stat_interval         = nepim_global.stat_interval;
      opt.test_duration         = nepim_global.test_duration;
      opt.write_delay           = nepim_global.write_delay;
      opt.udp_keepalive_send    = nepim_global.udp_keepalive_send;
      opt.udp_keepalive_require = nepim_global.udp_keepalive_require;

      nepim_pipe_set_add(&pipes, sd, ai->ai_addr, ai->ai_addrlen, &opt);
    }

    break;
  }

  freeaddrinfo(ai_res);
}

static void parse_hosts(const char *host_list)
{
  int size = addr_list_size(host_list);
  int i, j;

  /* scan host_list */
  for (j = 0; j < size; ++j) {
    char hostname[100];
    const char *portname;

    /* get hostname */
    if (addr_list_get(host_list, j, hostname, sizeof(hostname))) {
      fprintf(stderr, 
	      "%s: failure parsing address %d/%d from list: %s\n",
	      nepim_global.prog_name, j, size, host_list);
      continue;
    }

    /* split host/port */
    if (addr_split_port(hostname, sizeof(hostname), &portname))
      portname = nepim_global.portname;

    /* create sockets */
    for (i = 0; i < nepim_global.pipes; ++i)
      spawn_one_tcp_client(hostname, portname);

  } /* scan host_list */
}

static void spawn_tcp_clients(const char *host_list)
{
  int i;

  /* spawn sockets */
  parse_hosts(host_list);

  /* activate sockets */
  for (i = 0; i < pipes.array.capacity; ++i) {
    char peer[100];
    nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, i);
    if (!pipe)
      continue;

    nepim_sock_dump_addr(peer, sizeof(peer),
			 (const struct sockaddr *) &pipe->session.remote);

    if (greet(i)) {
      fprintf(stderr,
	      "%d: could not greet %s,%d\n",
	      i, peer, 
	      nepim_sock_get_port((const struct sockaddr *) &pipe->session.remote));
      
      nepim_pipe_set_del(&pipes, i);
      close(i);
      
      continue;
    }

    fprintf(stderr, 
	    "%d: greetings sent to %s,%d\n",
	    i, peer, 
	    nepim_sock_get_port((const struct sockaddr *) &pipe->session.remote));
    
    tcp_pipe_start(i);
  }

}

void nepim_udp_clients(const char *host_list);

void nepim_client_run()
{
  void *result;

  nepim_pipe_set_init(&pipes);

  if (nepim_global.udp_mode) {
    nepim_slot_set_init(&slots);
    nepim_usock_set_init(&udp_tab);

    nepim_udp_clients(nepim_global.hostname);
  }
  else
    spawn_tcp_clients(nepim_global.hostname);

  result = oop_sys_run(nepim_global.oop_sys);
}


syntax highlighted by Code2HTML, v. 0.9.1