/*-GNU-GPL-BEGIN-*
nepim - network pipemeter
Copyright (C) 2005 Everton da Silva Marques

nepim is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.

nepim is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with nepim; see the file COPYING.  If not, write to
the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*-GNU-GPL-END-*/


/* $Id: udp_client.c,v 1.45 2005/09/23 12:55:13 evertonm Exp $ */


#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <assert.h>

#include "conf.h"
#include "sock.h"
#include "pipe.h"
#include "common.h"
#include "usock.h"
#include "udp_header.h"
#include "str.h"

extern nepim_slot_set_t slots;    /* from server.c */
extern nepim_usock_set_t udp_tab; /* from server.c */

static void *on_udp_rate_delay(oop_source *src, struct timeval tv, void *user);
static void *on_udp_write(oop_source *src, int sd,
			  oop_event event, void *user);
static void udp_slot_cancel_stat_timers(nepim_slot_t *slot);
static void schedule_stat_interval(nepim_slot_t *slot);
static void udp_slot_kill(nepim_slot_t *slot);
static void *on_udp_duration(oop_source *src, struct timeval tv, void *user);
static void *on_udp_time_greet(oop_source *src, struct timeval tv,
			       void *user);
static void soft_cancel_slot_greet_writer(nepim_slot_t *slot);
static void cancel_slot_greet_writer(nepim_slot_t *slot);
static void cancel_slot_greet_timer(nepim_slot_t *slot);
static void cancel_slot_segment_read(nepim_slot_t *slot);

static void will_slot_write(nepim_slot_t *slot)
{
  nepim_session_t *session = &slot->session;
  int sd = slot->udp_sd;

  assert(session->max_bit_rate < 1);
  assert(session->max_pkt_rate < 1);

  assert(!slot->want_write);
  ++slot->want_write;

  if (nepim_usock_writer_add(&udp_tab, sd))
    return;

  nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
			      OOP_WRITE, on_udp_write, 0);
}

static void *on_udp_keepalive_require(oop_source *src, struct timeval tv, void *user)
{
  nepim_slot_t *slot = user;
  nepim_session_t *session = &slot->session;

  assert(session->udp_keepalive_require);
  assert(slot->keepalives_recv >= 0);

  if (slot->keepalives_recv < 1) {
    fprintf(stderr, 
	    "%d %d-%d: broken slot: peer not sending keepalives?\n",
	    slot->udp_sd, slot->index, slot->index_remote);

    report_broken_slot_stat(stdout, slot);

    udp_slot_kill(slot);

    return OOP_CONTINUE;
  }

  slot->keepalives_recv = 0;

  nepim_schedule_keepalive_timer(slot, on_udp_keepalive_require);

  return OOP_CONTINUE;
}

static void *on_udp_keepalive_time(oop_source *src, struct timeval tv, void *user)
{
  nepim_slot_t *slot = user;
  nepim_session_t *session = &slot->session;

  assert(timercmp(&tv, &session->tv_keepalive, ==));
  assert(!session->must_send);
  assert(session->udp_keepalive_send);

  /* save next scheduling time */
  {
    int result = gettimeofday(&session->tv_keepalive, 0);
    assert(!result);
  }
  nepim_timer_usec_add(&session->tv_keepalive, nepim_global.udp_keepalive_delay);

  /* start to write */
  nepim_will_slot_keepalive(slot, on_udp_write);

  return OOP_CONTINUE;
}

static void udp_slot_start(nepim_slot_t *slot)
{
  nepim_session_t *session;

  session = &slot->session;

  if (session->must_send) {
    if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
      session->tv_send_rate = OOP_TIME_NOW;
      nepim_global.oop_src->on_time(nepim_global.oop_src,
				    session->tv_send_rate,
				    on_udp_rate_delay, slot);
    }
    else {
      will_slot_write(slot);
    }
  }
  else {
    if (session->udp_keepalive_send) {

      fprintf(stderr, 
	      "DEBUG %s %s: %d %d-%d: scheduling sending of keepalives\n",
	      __FILE__, __PRETTY_FUNCTION__,
	      slot->udp_sd, slot->index, slot->index_remote);
      
      session->tv_keepalive = OOP_TIME_NOW;
      nepim_global.oop_src->on_time(nepim_global.oop_src,
				    session->tv_keepalive,
				    on_udp_keepalive_time, slot);
    }
  }

  {
    int result = gettimeofday(&session->tv_start, 0);
    assert(!result);
    session->tv_duration = session->tv_start;
  }
  session->tv_duration.tv_sec += session->test_duration;
  nepim_global.oop_src->on_time(nepim_global.oop_src,
				session->tv_duration,
				on_udp_duration, slot);

  nepim_sock_show_opt(stderr, slot->udp_sd);

  schedule_stat_interval(slot);

  if (session->udp_keepalive_require)
    nepim_schedule_keepalive_timer(slot, on_udp_keepalive_require);
}

static void udp_consume(nepim_slot_t *slot, size_t len, uint32_t seq)
{
  nepim_session_t *session = &slot->session;

  /* any valid traffic accounted as keepalive */
  ++slot->keepalives_recv;

  nepim_session_read_add(session, len);

  if (nepim_global.udp_exp_stats)
    nepim_win_add(&slot->win_recv_seq, seq,
		  &slot->interval_pkt_lost, &slot->interval_pkt_dup);
}

static void *on_udp_read_segment(oop_source *src, int sd,
				 oop_event event, void *user)
{
  int rd;
  char buf[nepim_global.udp_read_size];
  union {
    struct sockaddr_in inet;
    struct sockaddr_in6 inet6;
  } from;
  socklen_t fromlen = sizeof(from);
  nepim_udp_hdr_t hdr;

  assert(sizeof(buf) == nepim_global.udp_read_size);
  assert(event == OOP_READ);
  assert(!user);

  rd = recvfrom(sd, buf, nepim_global.udp_read_size, 0, 
		(struct sockaddr *) &from, &fromlen);
  if (rd < 0) {
    switch (errno) {
    case EINTR:
    case EAGAIN:
    case ECONNREFUSED:
      nepim_usock_read_error(&udp_tab, sd, errno);
      return OOP_CONTINUE;
    }

    fprintf(stderr, 
	    "%d: recvfrom: unexpected failure: errno=%d: %s\n",
	    sd, errno, strerror(errno));

    assert(0);

    return OOP_CONTINUE;
  }

  assert(rd >= 0);
  assert(rd <= sizeof(buf));

  nepim_usock_read_good(&udp_tab, sd);

  if (nepim_udp_hdr_parse(&hdr, buf, rd)) {
    fprintf(stderr, 
	    "%d: recvfrom: could not parse header from UDP segment: %d/%d bytes\n",
	    sd, rd, nepim_global.udp_read_size);
    return OOP_CONTINUE;
  }

  if (hdr.version) {
    fprintf(stderr, "%d: recvfrom: ignoring unknown UDP header version: %d\n",
	    sd, hdr.version);
    return OOP_CONTINUE;
  }

  {
    nepim_slot_t *slot = nepim_slot_set_search(&slots, hdr.dst_slot);

    if (!slot) {
      fprintf(stderr, "%d: recvfrom: unknown local slot: %d\n",
	      sd, hdr.dst_slot);
      return OOP_CONTINUE;
    }

#if 0
    fprintf(stderr, 
	    "DEBUG FIXME %s %s: %d %d-%d in_seq=%d\n",
	    __FILE__, __PRETTY_FUNCTION__,
	    slot->udp_sd, hdr.dst_slot, hdr.src_slot, hdr.seq);
#endif

    if (slot->session.remote_len != fromlen) {
      fprintf(stderr, 
	      "%d %d-%d: recvfrom: bad address length: expected=%d got=%d\n",
	      sd, hdr.dst_slot, hdr.src_slot, 
	      slot->session.remote_len, fromlen);
      return OOP_CONTINUE;
    }

    if (memcmp(&slot->session.remote, &from, fromlen)) {
      fprintf(stderr, 
	      "%d %d-%d: recvfrom: address mismatch\n",
	      sd, hdr.dst_slot, hdr.src_slot);
      return OOP_CONTINUE;
    }

    if (slot->client_writer_status == NEPIM_SLOT_CLIENT_GREET) {

      assert(slot->index_remote == 0xFFFF);
      slot->index_remote = hdr.src_slot;

      cancel_slot_greet_timer(slot);
      soft_cancel_slot_greet_writer(slot);

      slot->client_writer_status = NEPIM_SLOT_CLIENT_SEND;
    
      udp_slot_start(slot);
    
      return OOP_CONTINUE;
    }

    assert(slot->index_remote < 0xFFFF);

    if (slot->index_remote != hdr.src_slot) {
      fprintf(stderr, 
	      "%d %d-%d: recvfrom: remote index mismatch: expected=%d got=%d\n",
	      sd, hdr.dst_slot, hdr.src_slot, slot->index_remote, hdr.src_slot);
      return OOP_CONTINUE;
    }

    udp_consume(slot, rd, hdr.seq);
  }

  return OOP_CONTINUE;
}

static int slot_greet_write(nepim_slot_t *slot)
{
  char buf[1024];
  nepim_greet_t opt;
  int pr;
  int wr;
  int buf_avail_len;
  int write_len;
  char *tmp = "";

  assert(sizeof(buf) > UDP_HEADER_LEN);

  opt.must_send             = nepim_global.duplex_mode ||
    !nepim_global.simplex_client_send;
  opt.bit_rate              = nepim_global.bit_rate;
  opt.pkt_rate              = nepim_global.pkt_rate;
  opt.stat_interval         = nepim_global.stat_interval;
  opt.test_duration         = nepim_global.test_duration;
  opt.write_delay           = nepim_global.write_delay;
  opt.udp_keepalive_send    = nepim_global.udp_keepalive_require;
  opt.udp_keepalive_require = nepim_global.udp_keepalive_send;

  if (nepim_global.password) {
    opt.password_buf        = nepim_global.password;
    opt.password_buf_size   = strlen(nepim_global.password) + 1;
  }
  else {
    opt.password_buf        = tmp;
    opt.password_buf_size   = strlen(tmp) + 1;
  }

  buf_avail_len = sizeof(buf) - UDP_HEADER_LEN;
  assert(buf_avail_len > 0);

  pr = nepim_write_greetings(&opt, buf + UDP_HEADER_LEN, buf_avail_len);
  if (pr < 0)
    return -1;
  if (pr >= buf_avail_len)
    return -2;
  assert(pr > 0);

  write_len = UDP_HEADER_LEN + pr;

  wr = nepim_slot_buf_write(slot, buf, write_len, UDP_TYPE_HELLO);
  if (wr < 0) {
    switch (errno) {
    case EINTR:
    case EAGAIN:
    case ECONNREFUSED:
      nepim_usock_write_error(&udp_tab, slot->udp_sd, 
			      slot->index, slot->index_remote,
			      errno);
      return 0; /* ignore as soft error (caller always resend) */
    default:
      fprintf(stderr, 
	      "greet_write: error on UDP socket %d (%d-%d): %d: %s\n", 
	      slot->udp_sd, slot->index, slot->index_remote, errno,
	      strerror(errno));
    }
    assert(0); /* treat unexpected failures as fatal */
    return -3;
  }

#if 0
  fprintf(stderr, 
	  "%d %d-%d: sending: hdr_len=%d greet_len=%d total=%d wrote=%d\n", 
	  slot->udp_sd, slot->index, slot->index_remote,
	  UDP_HEADER_LEN, pr, write_len, wr);
#endif

  if (wr != write_len)
    return -4;

  nepim_usock_write_good(&udp_tab, slot->udp_sd);

  return 0;
}

static void *on_udp_interval(oop_source *src, struct timeval tv, void *user)
{
  nepim_slot_t *slot = user;
  nepim_session_t *session = &slot->session;

  nepim_slot_stat(stdout, NEPIM_LABEL_PARTIAL, 
		  slot->udp_sd,
		  slot->index,
		  slot->index_remote,
		  session->byte_interval_recv,
		  session->byte_interval_sent,
		  session->stat_interval,
		  session->tv_start.tv_sec,
		  session->test_duration,
		  session->interval_reads,
		  session->interval_writes,
		  slot->interval_pkt_lost,
		  slot->interval_pkt_dup);

  session->byte_interval_recv = 0;
  session->byte_interval_sent = 0;
  session->interval_reads     = 0;
  session->interval_writes    = 0;

  slot->total_pkt_lost += slot->interval_pkt_lost;
  slot->total_pkt_dup += slot->interval_pkt_dup;
  slot->interval_pkt_lost = 0;
  slot->interval_pkt_dup = 0;

  schedule_stat_interval(slot);

  return OOP_CONTINUE;
}

static void schedule_stat_interval(nepim_slot_t *slot)
{
  nepim_session_t *session = &slot->session;

  {
    int result = gettimeofday(&session->tv_interval, 0);
    assert(!result);
  }

  session->tv_interval.tv_sec += session->stat_interval;

  nepim_global.oop_src->on_time(nepim_global.oop_src,
				session->tv_interval,
				on_udp_interval, slot);
}

static void *on_udp_duration(oop_source *src, struct timeval tv, void *user)
{
  nepim_slot_t *slot = user;
  nepim_session_t *session = &slot->session;
  int sd = slot->udp_sd;

  slot->total_pkt_lost += slot->interval_pkt_lost;
  slot->total_pkt_dup += slot->interval_pkt_dup;

  nepim_slot_stat(stdout, NEPIM_LABEL_TOTAL, 
		  sd, 
		  slot->index,
		  slot->index_remote,
		  session->byte_total_recv,
		  session->byte_total_sent,
		  session->test_duration,
		  session->tv_start.tv_sec,
		  session->test_duration,
		  session->total_reads,
		  session->total_writes,
		  slot->total_pkt_lost,
		  slot->total_pkt_dup);

  session->byte_total_recv = 0;
  session->byte_total_sent = 0;
  session->total_reads     = 0;
  session->total_writes    = 0;
  session->duration_done   = 1;

  udp_slot_kill(slot);

  return OOP_CONTINUE;
}

static void udp_slot_cancel_stat_timers(nepim_slot_t *slot)
{
  nepim_session_t *session = &slot->session;

  nepim_global.oop_src->cancel_time(nepim_global.oop_src,
				    session->tv_duration,
				    on_udp_duration, slot);

  nepim_global.oop_src->cancel_time(nepim_global.oop_src,
				    session->tv_interval,
				    on_udp_interval, slot);
}

static void cancel_slot_rate_write(nepim_slot_t *slot)
{
  nepim_session_t *session = &slot->session;
  int sd = slot->udp_sd;

#ifndef NDEBUG
  {
    if (session->max_bit_rate <= 0) {
      assert(session->max_pkt_rate > 0);
    }
    if (session->max_pkt_rate <= 0) {
      assert(session->max_bit_rate > 0);
    }
  }
#endif /* NDEBUG */

  assert(slot->want_write == 1);
  --slot->want_write;

  if (nepim_usock_writer_del(&udp_tab, sd))
    return;

  nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
				  sd, OOP_WRITE);
}

static void udp_write_greet(nepim_slot_t *slot)
{
  nepim_session_t *session;
  int result;
  int sd = slot->udp_sd;

  assert(slot->client_writer_status == NEPIM_SLOT_CLIENT_GREET);

  /* stop writing */
  assert(slot->want_write == 1);
  cancel_slot_greet_writer(slot);

  if (slot->greetings_sent >= nepim_global.max_greetings) {

    if (!nepim_global.udp_require_greet_reply) {

      cancel_slot_greet_timer(slot);
      soft_cancel_slot_greet_writer(slot);

      slot->client_writer_status = NEPIM_SLOT_CLIENT_SEND;
    
      udp_slot_start(slot);

      return;
    }

    fprintf(stderr, 
	    "%d %d-%d: max. greetings reached (%d/%d) - server not responding\n",
	    sd, slot->index, slot->index_remote,
	    slot->greetings_sent, nepim_global.max_greetings);

    udp_slot_kill(slot);

    return;
  }

  fprintf(stderr, "%d %d-%d: sending greetings\n",
	  sd, slot->index, slot->index_remote);

  result = slot_greet_write(slot);
  if (result) {
    fprintf(stderr, 
	    "%d %d-%d: greet writing failed: %d\n", 
	    sd, slot->index, slot->index_remote, result);

    udp_slot_kill(slot);

    return;
  }

  ++slot->greetings_sent;

  session = &slot->session;

  /* schedule for next time saved by on_udp_time_greet() */
  nepim_global.oop_src->on_time(nepim_global.oop_src,
				session->tv_greet_rate,
				on_udp_time_greet, slot);
}

static void udp_write_rate(nepim_slot_t *slot) 
{
  char buf[nepim_global.udp_write_size];
  int wr;
  nepim_session_t *session;
  int to_write;
  int sd;

  assert(sizeof(buf) == nepim_global.udp_write_size);
  assert(slot);

  sd = slot->udp_sd;

  session = &slot->session;

#ifndef NDEBUG
  {
    if (session->max_bit_rate <= 0) {
      assert(session->max_pkt_rate > 0);
    }
    if (session->max_pkt_rate <= 0) {
      assert(session->max_bit_rate > 0);
    }
  }
#endif /* NDEBUG */
  
#ifndef NDEBUG
  {
    if (session->rate_remaining <= 0) {
      assert(session->pkt_remaining > 0);
    }
    if (session->pkt_remaining <= 0) {
      assert(session->rate_remaining > 0);
    }
  }
#endif /* NDEBUG */

  if (session->rate_remaining > 0)
    to_write = NEPIM_RANGE(session->rate_remaining, 
			   UDP_HEADER_LEN, 
			   sizeof(buf));
  else
    to_write = sizeof(buf);

  assert(to_write >= 0);
  assert(to_write >= UDP_HEADER_LEN);
  assert(to_write <= sizeof(buf));

  wr = nepim_slot_buf_write(slot, buf, to_write, UDP_TYPE_DATA);
  if (wr < 0) {
    switch (errno) {
    case EINTR:
    case EAGAIN:
    case ECONNREFUSED:
      nepim_usock_write_error(&udp_tab, sd, 
			      slot->index, slot->index_remote,
			      errno);
      return;

    case EPIPE:
      fprintf(stderr, 
	      "rate_write: EPIPE on UDP socket %d (%d-%d)\n", 
	      sd, slot->index, slot->index_remote);
      break;
    default:
      fprintf(stderr, 
	      "rate_write: error on UDP socket %d (%d-%d): %d: %s\n", 
	      sd, slot->index, slot->index_remote, errno,
	      strerror(errno));
    }

    fprintf(stderr,
	    "rate_write: connection lost on UDP socket %d (%d-%d)\n",
	    sd, slot->index, slot->index_remote);

    if (!session->duration_done)
      report_broken_slot_stat(stdout, slot);

    udp_slot_kill(slot);

    return;
  }

  assert(wr >= 0);
  assert(wr <= to_write);

  nepim_usock_write_good(&udp_tab, slot->udp_sd);

  session->rate_remaining -= wr;
  --session->pkt_remaining;

  nepim_session_write_add(session, wr);

  /* finished ? */
  if ( 
      ((session->max_bit_rate > 0) && (session->rate_remaining < 1))
      ||
      ((session->max_pkt_rate > 0) && (session->pkt_remaining < 1))
      ){

    /* stop writing */
    cancel_slot_rate_write(slot);

    /* schedule for next time saved by on_udp_rate_delay() */
    nepim_global.oop_src->on_time(nepim_global.oop_src,
				  session->tv_send_rate,
				  on_udp_rate_delay, slot);
  }
}

static void udp_write_full(nepim_slot_t *slot) 
{
  char buf[nepim_global.udp_write_size];
  int wr;
  nepim_session_t *session;
  int sd;

  assert(nepim_global.udp_write_size == sizeof(buf));
  assert(slot);

  sd = slot->udp_sd;

  session = &slot->session;

  assert(session->max_bit_rate < 1);
  assert(session->max_pkt_rate < 1);

  wr = nepim_slot_buf_write(slot, buf, sizeof(buf), UDP_TYPE_DATA);
  if (wr < 0) {
    switch (errno) {
    case EINTR:
    case EAGAIN:
    case ECONNREFUSED:
      nepim_usock_write_error(&udp_tab, sd, 
			      slot->index, slot->index_remote,
			      errno);
      return;

    case EPIPE:
      fprintf(stderr, 
	      "write: EPIPE on UDP socket %d (%d-%d)\n", 
	      sd, slot->index, slot->index_remote);
      break;
    default:
      fprintf(stderr, 
	      "write: error on UDP socket %d (%d-%d): %d: %s\n", 
	      sd, slot->index, slot->index_remote, errno,
	      strerror(errno));
    }

    fprintf(stderr,
	    "write: connection lost on UDP socket %d (%d-%d)\n",
	    sd, slot->index, slot->index_remote);

    if (!session->duration_done)
      report_broken_slot_stat(stdout, slot);

    udp_slot_kill(slot);

    return;
  }

  assert(wr >= 0);
  assert(wr <= sizeof(buf));

  nepim_usock_write_good(&udp_tab, sd);

  nepim_session_write_add(session, wr);
}

static void *on_udp_write(oop_source *src, int sd,
			  oop_event event, void *user)
{
  char buf[nepim_global.udp_write_size];
  nepim_slot_t *slot;
  nepim_session_t *session;

  assert(nepim_global.udp_write_size == sizeof(buf));
  assert(!user);
  assert(event == OOP_WRITE);

  slot = nepim_slot_find_next_writer(&slots, sd);
  assert(slot);

  assert(sd == slot->udp_sd);

  if (slot->client_writer_status == NEPIM_SLOT_CLIENT_GREET) {
    udp_write_greet(slot);
    return OOP_CONTINUE;
  }

  session = &slot->session;

  if (!session->must_send) {
    if (session->udp_keepalive_send) {
      nepim_udp_write_keepalive(slot, nepim_slot_buf_write,
				on_udp_keepalive_time, udp_slot_kill);
      return OOP_CONTINUE;
    }
  }

  if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
    udp_write_rate(slot);
    return OOP_CONTINUE;
  }

  udp_write_full(slot);

  return OOP_CONTINUE;
}

static void will_slot_rate_write(nepim_slot_t *slot)
{
  nepim_session_t *session = &slot->session;
  int sd = slot->udp_sd;

#ifndef NDEBUG
  {
    if (session->max_bit_rate <= 0) {
      assert(session->max_pkt_rate > 0);
    }
    if (session->max_pkt_rate <= 0) {
      assert(session->max_bit_rate > 0);
    }
  }
#endif /* NDEBUG */

  assert(!slot->want_write);
  ++slot->want_write;

  if (nepim_usock_writer_add(&udp_tab, sd))
    return;

  nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
			      OOP_WRITE, on_udp_write, 0);
}

static void *on_udp_rate_delay(oop_source *src, struct timeval tv, void *user)
{
  nepim_slot_t *slot = user;
  nepim_session_t *session = &slot->session;

  assert(timercmp(&tv, &session->tv_send_rate, ==));
#ifndef NDEBUG
  {
    if (session->max_bit_rate <= 0) {
      assert(session->max_pkt_rate > 0);
    }
    if (session->max_pkt_rate <= 0) {
      assert(session->max_bit_rate > 0);
    }
  }
#endif /* NDEBUG */

  /* save next scheduling time */
  {
    int result = gettimeofday(&session->tv_send_rate, 0);
    assert(!result);
  }
  nepim_timer_usec_add(&session->tv_send_rate, session->write_delay);

  /* calculate bytes to be written from rate */
  session->rate_remaining = nepim_bps2bytes(session->max_bit_rate,
					    session->write_delay);
  session->pkt_remaining = nepim_pps2packets(session->max_pkt_rate,
					     session->write_delay);

#ifndef NDEBUG
 {
   if (session->rate_remaining <= 0) {
     assert(session->pkt_remaining > 0);
   }
   if (session->pkt_remaining <= 0) {
     assert(session->rate_remaining > 0);
   }
 }
#endif /* NDEBUG */

  /* start to write */
  will_slot_rate_write(slot);

  return OOP_CONTINUE;
}

static void cancel_slot_write(nepim_slot_t *slot)
{
  nepim_session_t *session = &slot->session;
  int sd = slot->udp_sd;

  assert(session->max_bit_rate < 1);
  assert(session->max_pkt_rate < 1);

  assert(slot->want_write == 1);
  --slot->want_write;

  if (nepim_usock_writer_del(&udp_tab, sd))
    return;

  nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
				  sd, OOP_WRITE);
}

static void udp_slot_cancel_io(nepim_slot_t *slot)
{
  nepim_session_t *session = &slot->session;

  if (slot->client_writer_status == NEPIM_SLOT_CLIENT_GREET) {
    cancel_slot_greet_timer(slot);
    soft_cancel_slot_greet_writer(slot);

    return;
  }

  if (session->must_send) {
    if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
      /* stop current writing, if any */
      if (slot->want_write) {
	assert(slot->want_write == 1);
	cancel_slot_rate_write(slot);
      }

      /* stop periodic write scheduler */
      nepim_global.oop_src->cancel_time(nepim_global.oop_src,
					session->tv_send_rate,
					on_udp_rate_delay, slot);
    }
    else {
      cancel_slot_write(slot);
    }
  }
  else {
    if (session->udp_keepalive_send) {
      if (slot->want_write) {
	assert(slot->want_write == 1);
	nepim_cancel_slot_keepalive(slot); /* keepalive write */
      }
      
      /* stop periodic keepalive scheduler */
      nepim_global.oop_src->cancel_time(nepim_global.oop_src,
					session->tv_keepalive,
					on_udp_keepalive_time,
					slot);
    }
  }

  assert(!slot->want_write);
}

static void udp_slot_kill(nepim_slot_t *slot)
{
  nepim_sock_show_opt(stderr, slot->udp_sd);

  udp_slot_cancel_stat_timers(slot);
  udp_slot_cancel_io(slot);

  if (slot->session.udp_keepalive_require)
    nepim_global.oop_src->cancel_time(nepim_global.oop_src,
				      slot->session.tv_keepalive_timer,
				      on_udp_keepalive_require, slot);

  cancel_slot_segment_read(slot);

  nepim_slot_set_del(&slots, slot->index);
}

static void will_slot_greet_writer(nepim_slot_t *slot)
{
  int sd = slot->udp_sd;

  assert(!slot->want_write);
  ++slot->want_write;

  if (nepim_usock_writer_add(&udp_tab, sd))
    return;

  nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
			      OOP_WRITE, on_udp_write, 0);
}

static void cancel_slot_greet_writer(nepim_slot_t *slot)
{
  int sd = slot->udp_sd;

  assert(slot->want_write == 1);
  --slot->want_write;

  if (nepim_usock_writer_del(&udp_tab, sd))
    return;

  nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
				  sd, OOP_WRITE);
}

static void soft_cancel_slot_greet_writer(nepim_slot_t *slot)
{
  if (slot->want_write) {
    assert(slot->want_write == 1);
    cancel_slot_greet_writer(slot);
  }
}

static void cancel_slot_greet_timer(nepim_slot_t *slot)
{
  nepim_global.oop_src->cancel_time(nepim_global.oop_src,
				    slot->session.tv_greet_rate,
				    on_udp_time_greet, slot);
}

static void will_slot_segment_read(nepim_slot_t *slot)
{
  int sd = slot->udp_sd;

  if (nepim_usock_reader_add(&udp_tab, sd))
    return;

  nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
			      OOP_READ, on_udp_read_segment, 0);
}

static void cancel_slot_segment_read(nepim_slot_t *slot)
{
  int sd = slot->udp_sd;

  if (nepim_usock_reader_del(&udp_tab, sd))
    return;

  nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
				  sd, OOP_READ);
}

static void *on_udp_time_greet(oop_source *src, struct timeval tv,
			       void *user)
{
  nepim_slot_t *slot = user;
  nepim_session_t *session;
  assert(slot);
  
  session = &slot->session;

  assert(timercmp(&tv, &session->tv_greet_rate, ==));

  /* save next scheduling time */
  {
    int result = gettimeofday(&session->tv_greet_rate, 0);
    assert(!result);
  }
  nepim_timer_usec_add(&session->tv_greet_rate, nepim_global.udp_greet_delay);

  /* wait opportunity to write on shared socket */
  will_slot_greet_writer(slot);

  return OOP_CONTINUE;
}

static int spawn_udp_connection(const char *hostname,
				const char *portname,
				struct sockaddr *remote, 
				socklen_t *remote_len)
{
  struct addrinfo hints;
  struct addrinfo *ai_res;
  struct addrinfo *ai;
  int result;
  int sd = -1;

  memset(&hints, 0, sizeof(hints));

  hints.ai_socktype = SOCK_DGRAM;
  hints.ai_protocol = IPPROTO_UDP;
  hints.ai_flags = AI_CANONNAME;
  hints.ai_family = PF_UNSPEC;
  hints.ai_addrlen = 0;
  hints.ai_addr = 0;
  hints.ai_canonname = 0;

  fprintf(stderr, 
	  "UDP socket solving %s,%s\n",
	  hostname, portname);

  result = getaddrinfo(hostname, portname,
		       &hints, &ai_res);
  if (result) {
    fprintf(stderr, "%s: getaddrinfo(%s,%s): %s\n",
            __PRETTY_FUNCTION__, 
	    hostname, portname,
	    gai_strerror(result));
    return -1;
  }

  for (ai = ai_res; ai; ai = ai->ai_next) {

    fprintf(stderr, 
	    "UDP socket trying %s,%s\n",
	    hostname, portname);

    if (nepim_global.no_inet6 && (ai->ai_family == PF_INET6))
      continue;

    if (nepim_global.no_inet4 && (ai->ai_family == PF_INET))
      continue;

    sd = nepim_connect_client_socket(ai->ai_addr, ai->ai_addrlen,
				     ai->ai_family, ai->ai_socktype, 
				     ai->ai_protocol,
				     nepim_global.pmtu_mode,
				     nepim_global.ttl,
				     nepim_global.win_recv,
				     nepim_global.win_send);
    if (sd < 0) {
      fprintf(stderr, 
	      "could not connect UDP socket to %s,%s: %d\n",
	      hostname, portname, sd);
      continue;
    }

    result = nepim_socket_mcast_ttl(sd, nepim_global.mcast_ttl);
    if (result) {
      fprintf(stderr,
	      "%d: failure setting mcast_ttl=%d: %d\n",
	      sd, nepim_global.mcast_ttl, result);
    }

    /* save remote address */
    assert(*remote_len >= ai->ai_addrlen);
    memcpy(remote, ai->ai_addr, ai->ai_addrlen);
    *remote_len = ai->ai_addrlen;

    {
      union {
	struct sockaddr_in inet;
	struct sockaddr_in6 inet6;
      } local_sockaddr;
      socklen_t local_sockaddr_len = sizeof(local_sockaddr);
      int result;
      char local[100];
      
      result = getsockname(sd, (struct sockaddr *) &local_sockaddr,
			   &local_sockaddr_len);
      assert(!result);

      nepim_sock_dump_addr(local, sizeof(local),
			   (const struct sockaddr *) &local_sockaddr);

      fprintf(stderr, 
	      "%d: UDP socket (%s,%d) connected to: "
	      "host=%s,%s(%d) len=%d family=%d type=%d proto=%d\n",
	      sd, 
	      local,
	      nepim_sock_get_port((struct sockaddr *) &local_sockaddr),
	      hostname, portname, 
	      nepim_sock_get_port(ai->ai_addr),
	      ai->ai_addrlen, ai->ai_family, 
	      ai->ai_socktype, ai->ai_protocol);
    }

    break;
  }

  freeaddrinfo(ai_res);

  return sd;
}

static void parse_hosts(const char *host_list)
{
  int size = addr_list_size(host_list);
  int i, j;

  /* scan host_list */
  for (j = 0; j < size; ++j) {
    char hostname[100];
    const char *portname;

    /* get hostname */
    if (addr_list_get(host_list, j, hostname, sizeof(hostname))) {
      fprintf(stderr, 
	      "%s: failure parsing address %d/%d from list: %s\n",
	      nepim_global.prog_name, j, size, host_list);
      continue;
    }

    /* split host/port */
    if (addr_split_port(hostname, sizeof(hostname), &portname))
      portname = nepim_global.portname;

    /* create sockets */
    for (i = 0; i < nepim_global.pipes; ++i) {
      union {
	struct sockaddr_in inet;
	struct sockaddr_in6 inet6;
      } remote;
      int remote_len = sizeof(remote);
      int sd;
      nepim_greet_t opt;

      sd = spawn_udp_connection(hostname, portname, 
				(struct sockaddr *) &remote, 
				&remote_len);
      if (sd < 0)
	continue;

      /* add socket to table of readers/writers */
      nepim_usock_set_add(&udp_tab, sd);

      opt.must_send             = nepim_global.duplex_mode
	|| nepim_global.simplex_client_send;
      opt.bit_rate              = nepim_global.bit_rate;
      opt.pkt_rate              = nepim_global.pkt_rate;
      opt.stat_interval         = nepim_global.stat_interval;
      opt.test_duration         = nepim_global.test_duration;
      opt.write_delay           = nepim_global.write_delay;
      opt.udp_keepalive_send    = nepim_global.udp_keepalive_send;
      opt.udp_keepalive_require = nepim_global.udp_keepalive_require;
      
      /* create UDP slots */
      for (i = 0; i < nepim_global.pipes; ++i) {
	int local_slot = nepim_slot_find_free(&slots);
	int remote_slot = 0xFFFF;

	nepim_slot_set_add(&slots, sd, 
			   local_slot, remote_slot,
			   (const struct sockaddr *) &remote,
			   remote_len, &opt);
      }
    }

  } /* scan host_list */
}

void nepim_udp_clients(const char *host_list)
{
  int i;

  /* spawn sockets */
  parse_hosts(host_list);

  /* activate monitoring for UDP socket */

  for (i = 0; i < slots.array.capacity; ++i) {
    nepim_slot_t *slot = nepim_slot_set_get(&slots, i);
    nepim_session_t *session;

    if (!slot)
      continue;

    session = &slot->session;

    /* 
       schedule periodic greetings 
       (until answer from server)
    */
    session->tv_greet_rate = OOP_TIME_NOW;
    nepim_global.oop_src->on_time(nepim_global.oop_src,
				  session->tv_greet_rate,
				  on_udp_time_greet, slot);

    /* wait answers */
    will_slot_segment_read(slot);
  }
}


syntax highlighted by Code2HTML, v. 0.9.1