/*-GNU-GPL-BEGIN-*
nepim - network pipemeter
Copyright (C) 2005 Everton da Silva Marques
nepim is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
nepim is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with nepim; see the file COPYING. If not, write to
the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*-GNU-GPL-END-*/
/* $Id: udp_client.c,v 1.45 2005/09/23 12:55:13 evertonm Exp $ */
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <assert.h>
#include "conf.h"
#include "sock.h"
#include "pipe.h"
#include "common.h"
#include "usock.h"
#include "udp_header.h"
#include "str.h"
extern nepim_slot_set_t slots; /* from server.c */
extern nepim_usock_set_t udp_tab; /* from server.c */
static void *on_udp_rate_delay(oop_source *src, struct timeval tv, void *user);
static void *on_udp_write(oop_source *src, int sd,
oop_event event, void *user);
static void udp_slot_cancel_stat_timers(nepim_slot_t *slot);
static void schedule_stat_interval(nepim_slot_t *slot);
static void udp_slot_kill(nepim_slot_t *slot);
static void *on_udp_duration(oop_source *src, struct timeval tv, void *user);
static void *on_udp_time_greet(oop_source *src, struct timeval tv,
void *user);
static void soft_cancel_slot_greet_writer(nepim_slot_t *slot);
static void cancel_slot_greet_writer(nepim_slot_t *slot);
static void cancel_slot_greet_timer(nepim_slot_t *slot);
static void cancel_slot_segment_read(nepim_slot_t *slot);
static void will_slot_write(nepim_slot_t *slot)
{
nepim_session_t *session = &slot->session;
int sd = slot->udp_sd;
assert(session->max_bit_rate < 1);
assert(session->max_pkt_rate < 1);
assert(!slot->want_write);
++slot->want_write;
if (nepim_usock_writer_add(&udp_tab, sd))
return;
nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
OOP_WRITE, on_udp_write, 0);
}
static void *on_udp_keepalive_require(oop_source *src, struct timeval tv, void *user)
{
nepim_slot_t *slot = user;
nepim_session_t *session = &slot->session;
assert(session->udp_keepalive_require);
assert(slot->keepalives_recv >= 0);
if (slot->keepalives_recv < 1) {
fprintf(stderr,
"%d %d-%d: broken slot: peer not sending keepalives?\n",
slot->udp_sd, slot->index, slot->index_remote);
report_broken_slot_stat(stdout, slot);
udp_slot_kill(slot);
return OOP_CONTINUE;
}
slot->keepalives_recv = 0;
nepim_schedule_keepalive_timer(slot, on_udp_keepalive_require);
return OOP_CONTINUE;
}
static void *on_udp_keepalive_time(oop_source *src, struct timeval tv, void *user)
{
nepim_slot_t *slot = user;
nepim_session_t *session = &slot->session;
assert(timercmp(&tv, &session->tv_keepalive, ==));
assert(!session->must_send);
assert(session->udp_keepalive_send);
/* save next scheduling time */
{
int result = gettimeofday(&session->tv_keepalive, 0);
assert(!result);
}
nepim_timer_usec_add(&session->tv_keepalive, nepim_global.udp_keepalive_delay);
/* start to write */
nepim_will_slot_keepalive(slot, on_udp_write);
return OOP_CONTINUE;
}
static void udp_slot_start(nepim_slot_t *slot)
{
nepim_session_t *session;
session = &slot->session;
if (session->must_send) {
if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
session->tv_send_rate = OOP_TIME_NOW;
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_send_rate,
on_udp_rate_delay, slot);
}
else {
will_slot_write(slot);
}
}
else {
if (session->udp_keepalive_send) {
fprintf(stderr,
"DEBUG %s %s: %d %d-%d: scheduling sending of keepalives\n",
__FILE__, __PRETTY_FUNCTION__,
slot->udp_sd, slot->index, slot->index_remote);
session->tv_keepalive = OOP_TIME_NOW;
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_keepalive,
on_udp_keepalive_time, slot);
}
}
{
int result = gettimeofday(&session->tv_start, 0);
assert(!result);
session->tv_duration = session->tv_start;
}
session->tv_duration.tv_sec += session->test_duration;
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_duration,
on_udp_duration, slot);
nepim_sock_show_opt(stderr, slot->udp_sd);
schedule_stat_interval(slot);
if (session->udp_keepalive_require)
nepim_schedule_keepalive_timer(slot, on_udp_keepalive_require);
}
static void udp_consume(nepim_slot_t *slot, size_t len, uint32_t seq)
{
nepim_session_t *session = &slot->session;
/* any valid traffic accounted as keepalive */
++slot->keepalives_recv;
nepim_session_read_add(session, len);
if (nepim_global.udp_exp_stats)
nepim_win_add(&slot->win_recv_seq, seq,
&slot->interval_pkt_lost, &slot->interval_pkt_dup);
}
static void *on_udp_read_segment(oop_source *src, int sd,
oop_event event, void *user)
{
int rd;
char buf[nepim_global.udp_read_size];
union {
struct sockaddr_in inet;
struct sockaddr_in6 inet6;
} from;
socklen_t fromlen = sizeof(from);
nepim_udp_hdr_t hdr;
assert(sizeof(buf) == nepim_global.udp_read_size);
assert(event == OOP_READ);
assert(!user);
rd = recvfrom(sd, buf, nepim_global.udp_read_size, 0,
(struct sockaddr *) &from, &fromlen);
if (rd < 0) {
switch (errno) {
case EINTR:
case EAGAIN:
case ECONNREFUSED:
nepim_usock_read_error(&udp_tab, sd, errno);
return OOP_CONTINUE;
}
fprintf(stderr,
"%d: recvfrom: unexpected failure: errno=%d: %s\n",
sd, errno, strerror(errno));
assert(0);
return OOP_CONTINUE;
}
assert(rd >= 0);
assert(rd <= sizeof(buf));
nepim_usock_read_good(&udp_tab, sd);
if (nepim_udp_hdr_parse(&hdr, buf, rd)) {
fprintf(stderr,
"%d: recvfrom: could not parse header from UDP segment: %d/%d bytes\n",
sd, rd, nepim_global.udp_read_size);
return OOP_CONTINUE;
}
if (hdr.version) {
fprintf(stderr, "%d: recvfrom: ignoring unknown UDP header version: %d\n",
sd, hdr.version);
return OOP_CONTINUE;
}
{
nepim_slot_t *slot = nepim_slot_set_search(&slots, hdr.dst_slot);
if (!slot) {
fprintf(stderr, "%d: recvfrom: unknown local slot: %d\n",
sd, hdr.dst_slot);
return OOP_CONTINUE;
}
#if 0
fprintf(stderr,
"DEBUG FIXME %s %s: %d %d-%d in_seq=%d\n",
__FILE__, __PRETTY_FUNCTION__,
slot->udp_sd, hdr.dst_slot, hdr.src_slot, hdr.seq);
#endif
if (slot->session.remote_len != fromlen) {
fprintf(stderr,
"%d %d-%d: recvfrom: bad address length: expected=%d got=%d\n",
sd, hdr.dst_slot, hdr.src_slot,
slot->session.remote_len, fromlen);
return OOP_CONTINUE;
}
if (memcmp(&slot->session.remote, &from, fromlen)) {
fprintf(stderr,
"%d %d-%d: recvfrom: address mismatch\n",
sd, hdr.dst_slot, hdr.src_slot);
return OOP_CONTINUE;
}
if (slot->client_writer_status == NEPIM_SLOT_CLIENT_GREET) {
assert(slot->index_remote == 0xFFFF);
slot->index_remote = hdr.src_slot;
cancel_slot_greet_timer(slot);
soft_cancel_slot_greet_writer(slot);
slot->client_writer_status = NEPIM_SLOT_CLIENT_SEND;
udp_slot_start(slot);
return OOP_CONTINUE;
}
assert(slot->index_remote < 0xFFFF);
if (slot->index_remote != hdr.src_slot) {
fprintf(stderr,
"%d %d-%d: recvfrom: remote index mismatch: expected=%d got=%d\n",
sd, hdr.dst_slot, hdr.src_slot, slot->index_remote, hdr.src_slot);
return OOP_CONTINUE;
}
udp_consume(slot, rd, hdr.seq);
}
return OOP_CONTINUE;
}
static int slot_greet_write(nepim_slot_t *slot)
{
char buf[1024];
nepim_greet_t opt;
int pr;
int wr;
int buf_avail_len;
int write_len;
char *tmp = "";
assert(sizeof(buf) > UDP_HEADER_LEN);
opt.must_send = nepim_global.duplex_mode ||
!nepim_global.simplex_client_send;
opt.bit_rate = nepim_global.bit_rate;
opt.pkt_rate = nepim_global.pkt_rate;
opt.stat_interval = nepim_global.stat_interval;
opt.test_duration = nepim_global.test_duration;
opt.write_delay = nepim_global.write_delay;
opt.udp_keepalive_send = nepim_global.udp_keepalive_require;
opt.udp_keepalive_require = nepim_global.udp_keepalive_send;
if (nepim_global.password) {
opt.password_buf = nepim_global.password;
opt.password_buf_size = strlen(nepim_global.password) + 1;
}
else {
opt.password_buf = tmp;
opt.password_buf_size = strlen(tmp) + 1;
}
buf_avail_len = sizeof(buf) - UDP_HEADER_LEN;
assert(buf_avail_len > 0);
pr = nepim_write_greetings(&opt, buf + UDP_HEADER_LEN, buf_avail_len);
if (pr < 0)
return -1;
if (pr >= buf_avail_len)
return -2;
assert(pr > 0);
write_len = UDP_HEADER_LEN + pr;
wr = nepim_slot_buf_write(slot, buf, write_len, UDP_TYPE_HELLO);
if (wr < 0) {
switch (errno) {
case EINTR:
case EAGAIN:
case ECONNREFUSED:
nepim_usock_write_error(&udp_tab, slot->udp_sd,
slot->index, slot->index_remote,
errno);
return 0; /* ignore as soft error (caller always resend) */
default:
fprintf(stderr,
"greet_write: error on UDP socket %d (%d-%d): %d: %s\n",
slot->udp_sd, slot->index, slot->index_remote, errno,
strerror(errno));
}
assert(0); /* treat unexpected failures as fatal */
return -3;
}
#if 0
fprintf(stderr,
"%d %d-%d: sending: hdr_len=%d greet_len=%d total=%d wrote=%d\n",
slot->udp_sd, slot->index, slot->index_remote,
UDP_HEADER_LEN, pr, write_len, wr);
#endif
if (wr != write_len)
return -4;
nepim_usock_write_good(&udp_tab, slot->udp_sd);
return 0;
}
static void *on_udp_interval(oop_source *src, struct timeval tv, void *user)
{
nepim_slot_t *slot = user;
nepim_session_t *session = &slot->session;
nepim_slot_stat(stdout, NEPIM_LABEL_PARTIAL,
slot->udp_sd,
slot->index,
slot->index_remote,
session->byte_interval_recv,
session->byte_interval_sent,
session->stat_interval,
session->tv_start.tv_sec,
session->test_duration,
session->interval_reads,
session->interval_writes,
slot->interval_pkt_lost,
slot->interval_pkt_dup);
session->byte_interval_recv = 0;
session->byte_interval_sent = 0;
session->interval_reads = 0;
session->interval_writes = 0;
slot->total_pkt_lost += slot->interval_pkt_lost;
slot->total_pkt_dup += slot->interval_pkt_dup;
slot->interval_pkt_lost = 0;
slot->interval_pkt_dup = 0;
schedule_stat_interval(slot);
return OOP_CONTINUE;
}
static void schedule_stat_interval(nepim_slot_t *slot)
{
nepim_session_t *session = &slot->session;
{
int result = gettimeofday(&session->tv_interval, 0);
assert(!result);
}
session->tv_interval.tv_sec += session->stat_interval;
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_interval,
on_udp_interval, slot);
}
static void *on_udp_duration(oop_source *src, struct timeval tv, void *user)
{
nepim_slot_t *slot = user;
nepim_session_t *session = &slot->session;
int sd = slot->udp_sd;
slot->total_pkt_lost += slot->interval_pkt_lost;
slot->total_pkt_dup += slot->interval_pkt_dup;
nepim_slot_stat(stdout, NEPIM_LABEL_TOTAL,
sd,
slot->index,
slot->index_remote,
session->byte_total_recv,
session->byte_total_sent,
session->test_duration,
session->tv_start.tv_sec,
session->test_duration,
session->total_reads,
session->total_writes,
slot->total_pkt_lost,
slot->total_pkt_dup);
session->byte_total_recv = 0;
session->byte_total_sent = 0;
session->total_reads = 0;
session->total_writes = 0;
session->duration_done = 1;
udp_slot_kill(slot);
return OOP_CONTINUE;
}
static void udp_slot_cancel_stat_timers(nepim_slot_t *slot)
{
nepim_session_t *session = &slot->session;
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
session->tv_duration,
on_udp_duration, slot);
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
session->tv_interval,
on_udp_interval, slot);
}
static void cancel_slot_rate_write(nepim_slot_t *slot)
{
nepim_session_t *session = &slot->session;
int sd = slot->udp_sd;
#ifndef NDEBUG
{
if (session->max_bit_rate <= 0) {
assert(session->max_pkt_rate > 0);
}
if (session->max_pkt_rate <= 0) {
assert(session->max_bit_rate > 0);
}
}
#endif /* NDEBUG */
assert(slot->want_write == 1);
--slot->want_write;
if (nepim_usock_writer_del(&udp_tab, sd))
return;
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_WRITE);
}
static void udp_write_greet(nepim_slot_t *slot)
{
nepim_session_t *session;
int result;
int sd = slot->udp_sd;
assert(slot->client_writer_status == NEPIM_SLOT_CLIENT_GREET);
/* stop writing */
assert(slot->want_write == 1);
cancel_slot_greet_writer(slot);
if (slot->greetings_sent >= nepim_global.max_greetings) {
if (!nepim_global.udp_require_greet_reply) {
cancel_slot_greet_timer(slot);
soft_cancel_slot_greet_writer(slot);
slot->client_writer_status = NEPIM_SLOT_CLIENT_SEND;
udp_slot_start(slot);
return;
}
fprintf(stderr,
"%d %d-%d: max. greetings reached (%d/%d) - server not responding\n",
sd, slot->index, slot->index_remote,
slot->greetings_sent, nepim_global.max_greetings);
udp_slot_kill(slot);
return;
}
fprintf(stderr, "%d %d-%d: sending greetings\n",
sd, slot->index, slot->index_remote);
result = slot_greet_write(slot);
if (result) {
fprintf(stderr,
"%d %d-%d: greet writing failed: %d\n",
sd, slot->index, slot->index_remote, result);
udp_slot_kill(slot);
return;
}
++slot->greetings_sent;
session = &slot->session;
/* schedule for next time saved by on_udp_time_greet() */
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_greet_rate,
on_udp_time_greet, slot);
}
static void udp_write_rate(nepim_slot_t *slot)
{
char buf[nepim_global.udp_write_size];
int wr;
nepim_session_t *session;
int to_write;
int sd;
assert(sizeof(buf) == nepim_global.udp_write_size);
assert(slot);
sd = slot->udp_sd;
session = &slot->session;
#ifndef NDEBUG
{
if (session->max_bit_rate <= 0) {
assert(session->max_pkt_rate > 0);
}
if (session->max_pkt_rate <= 0) {
assert(session->max_bit_rate > 0);
}
}
#endif /* NDEBUG */
#ifndef NDEBUG
{
if (session->rate_remaining <= 0) {
assert(session->pkt_remaining > 0);
}
if (session->pkt_remaining <= 0) {
assert(session->rate_remaining > 0);
}
}
#endif /* NDEBUG */
if (session->rate_remaining > 0)
to_write = NEPIM_RANGE(session->rate_remaining,
UDP_HEADER_LEN,
sizeof(buf));
else
to_write = sizeof(buf);
assert(to_write >= 0);
assert(to_write >= UDP_HEADER_LEN);
assert(to_write <= sizeof(buf));
wr = nepim_slot_buf_write(slot, buf, to_write, UDP_TYPE_DATA);
if (wr < 0) {
switch (errno) {
case EINTR:
case EAGAIN:
case ECONNREFUSED:
nepim_usock_write_error(&udp_tab, sd,
slot->index, slot->index_remote,
errno);
return;
case EPIPE:
fprintf(stderr,
"rate_write: EPIPE on UDP socket %d (%d-%d)\n",
sd, slot->index, slot->index_remote);
break;
default:
fprintf(stderr,
"rate_write: error on UDP socket %d (%d-%d): %d: %s\n",
sd, slot->index, slot->index_remote, errno,
strerror(errno));
}
fprintf(stderr,
"rate_write: connection lost on UDP socket %d (%d-%d)\n",
sd, slot->index, slot->index_remote);
if (!session->duration_done)
report_broken_slot_stat(stdout, slot);
udp_slot_kill(slot);
return;
}
assert(wr >= 0);
assert(wr <= to_write);
nepim_usock_write_good(&udp_tab, slot->udp_sd);
session->rate_remaining -= wr;
--session->pkt_remaining;
nepim_session_write_add(session, wr);
/* finished ? */
if (
((session->max_bit_rate > 0) && (session->rate_remaining < 1))
||
((session->max_pkt_rate > 0) && (session->pkt_remaining < 1))
){
/* stop writing */
cancel_slot_rate_write(slot);
/* schedule for next time saved by on_udp_rate_delay() */
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_send_rate,
on_udp_rate_delay, slot);
}
}
static void udp_write_full(nepim_slot_t *slot)
{
char buf[nepim_global.udp_write_size];
int wr;
nepim_session_t *session;
int sd;
assert(nepim_global.udp_write_size == sizeof(buf));
assert(slot);
sd = slot->udp_sd;
session = &slot->session;
assert(session->max_bit_rate < 1);
assert(session->max_pkt_rate < 1);
wr = nepim_slot_buf_write(slot, buf, sizeof(buf), UDP_TYPE_DATA);
if (wr < 0) {
switch (errno) {
case EINTR:
case EAGAIN:
case ECONNREFUSED:
nepim_usock_write_error(&udp_tab, sd,
slot->index, slot->index_remote,
errno);
return;
case EPIPE:
fprintf(stderr,
"write: EPIPE on UDP socket %d (%d-%d)\n",
sd, slot->index, slot->index_remote);
break;
default:
fprintf(stderr,
"write: error on UDP socket %d (%d-%d): %d: %s\n",
sd, slot->index, slot->index_remote, errno,
strerror(errno));
}
fprintf(stderr,
"write: connection lost on UDP socket %d (%d-%d)\n",
sd, slot->index, slot->index_remote);
if (!session->duration_done)
report_broken_slot_stat(stdout, slot);
udp_slot_kill(slot);
return;
}
assert(wr >= 0);
assert(wr <= sizeof(buf));
nepim_usock_write_good(&udp_tab, sd);
nepim_session_write_add(session, wr);
}
static void *on_udp_write(oop_source *src, int sd,
oop_event event, void *user)
{
char buf[nepim_global.udp_write_size];
nepim_slot_t *slot;
nepim_session_t *session;
assert(nepim_global.udp_write_size == sizeof(buf));
assert(!user);
assert(event == OOP_WRITE);
slot = nepim_slot_find_next_writer(&slots, sd);
assert(slot);
assert(sd == slot->udp_sd);
if (slot->client_writer_status == NEPIM_SLOT_CLIENT_GREET) {
udp_write_greet(slot);
return OOP_CONTINUE;
}
session = &slot->session;
if (!session->must_send) {
if (session->udp_keepalive_send) {
nepim_udp_write_keepalive(slot, nepim_slot_buf_write,
on_udp_keepalive_time, udp_slot_kill);
return OOP_CONTINUE;
}
}
if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
udp_write_rate(slot);
return OOP_CONTINUE;
}
udp_write_full(slot);
return OOP_CONTINUE;
}
static void will_slot_rate_write(nepim_slot_t *slot)
{
nepim_session_t *session = &slot->session;
int sd = slot->udp_sd;
#ifndef NDEBUG
{
if (session->max_bit_rate <= 0) {
assert(session->max_pkt_rate > 0);
}
if (session->max_pkt_rate <= 0) {
assert(session->max_bit_rate > 0);
}
}
#endif /* NDEBUG */
assert(!slot->want_write);
++slot->want_write;
if (nepim_usock_writer_add(&udp_tab, sd))
return;
nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
OOP_WRITE, on_udp_write, 0);
}
static void *on_udp_rate_delay(oop_source *src, struct timeval tv, void *user)
{
nepim_slot_t *slot = user;
nepim_session_t *session = &slot->session;
assert(timercmp(&tv, &session->tv_send_rate, ==));
#ifndef NDEBUG
{
if (session->max_bit_rate <= 0) {
assert(session->max_pkt_rate > 0);
}
if (session->max_pkt_rate <= 0) {
assert(session->max_bit_rate > 0);
}
}
#endif /* NDEBUG */
/* save next scheduling time */
{
int result = gettimeofday(&session->tv_send_rate, 0);
assert(!result);
}
nepim_timer_usec_add(&session->tv_send_rate, session->write_delay);
/* calculate bytes to be written from rate */
session->rate_remaining = nepim_bps2bytes(session->max_bit_rate,
session->write_delay);
session->pkt_remaining = nepim_pps2packets(session->max_pkt_rate,
session->write_delay);
#ifndef NDEBUG
{
if (session->rate_remaining <= 0) {
assert(session->pkt_remaining > 0);
}
if (session->pkt_remaining <= 0) {
assert(session->rate_remaining > 0);
}
}
#endif /* NDEBUG */
/* start to write */
will_slot_rate_write(slot);
return OOP_CONTINUE;
}
static void cancel_slot_write(nepim_slot_t *slot)
{
nepim_session_t *session = &slot->session;
int sd = slot->udp_sd;
assert(session->max_bit_rate < 1);
assert(session->max_pkt_rate < 1);
assert(slot->want_write == 1);
--slot->want_write;
if (nepim_usock_writer_del(&udp_tab, sd))
return;
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_WRITE);
}
static void udp_slot_cancel_io(nepim_slot_t *slot)
{
nepim_session_t *session = &slot->session;
if (slot->client_writer_status == NEPIM_SLOT_CLIENT_GREET) {
cancel_slot_greet_timer(slot);
soft_cancel_slot_greet_writer(slot);
return;
}
if (session->must_send) {
if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
/* stop current writing, if any */
if (slot->want_write) {
assert(slot->want_write == 1);
cancel_slot_rate_write(slot);
}
/* stop periodic write scheduler */
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
session->tv_send_rate,
on_udp_rate_delay, slot);
}
else {
cancel_slot_write(slot);
}
}
else {
if (session->udp_keepalive_send) {
if (slot->want_write) {
assert(slot->want_write == 1);
nepim_cancel_slot_keepalive(slot); /* keepalive write */
}
/* stop periodic keepalive scheduler */
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
session->tv_keepalive,
on_udp_keepalive_time,
slot);
}
}
assert(!slot->want_write);
}
static void udp_slot_kill(nepim_slot_t *slot)
{
nepim_sock_show_opt(stderr, slot->udp_sd);
udp_slot_cancel_stat_timers(slot);
udp_slot_cancel_io(slot);
if (slot->session.udp_keepalive_require)
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
slot->session.tv_keepalive_timer,
on_udp_keepalive_require, slot);
cancel_slot_segment_read(slot);
nepim_slot_set_del(&slots, slot->index);
}
static void will_slot_greet_writer(nepim_slot_t *slot)
{
int sd = slot->udp_sd;
assert(!slot->want_write);
++slot->want_write;
if (nepim_usock_writer_add(&udp_tab, sd))
return;
nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
OOP_WRITE, on_udp_write, 0);
}
static void cancel_slot_greet_writer(nepim_slot_t *slot)
{
int sd = slot->udp_sd;
assert(slot->want_write == 1);
--slot->want_write;
if (nepim_usock_writer_del(&udp_tab, sd))
return;
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_WRITE);
}
static void soft_cancel_slot_greet_writer(nepim_slot_t *slot)
{
if (slot->want_write) {
assert(slot->want_write == 1);
cancel_slot_greet_writer(slot);
}
}
static void cancel_slot_greet_timer(nepim_slot_t *slot)
{
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
slot->session.tv_greet_rate,
on_udp_time_greet, slot);
}
static void will_slot_segment_read(nepim_slot_t *slot)
{
int sd = slot->udp_sd;
if (nepim_usock_reader_add(&udp_tab, sd))
return;
nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
OOP_READ, on_udp_read_segment, 0);
}
static void cancel_slot_segment_read(nepim_slot_t *slot)
{
int sd = slot->udp_sd;
if (nepim_usock_reader_del(&udp_tab, sd))
return;
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_READ);
}
static void *on_udp_time_greet(oop_source *src, struct timeval tv,
void *user)
{
nepim_slot_t *slot = user;
nepim_session_t *session;
assert(slot);
session = &slot->session;
assert(timercmp(&tv, &session->tv_greet_rate, ==));
/* save next scheduling time */
{
int result = gettimeofday(&session->tv_greet_rate, 0);
assert(!result);
}
nepim_timer_usec_add(&session->tv_greet_rate, nepim_global.udp_greet_delay);
/* wait opportunity to write on shared socket */
will_slot_greet_writer(slot);
return OOP_CONTINUE;
}
static int spawn_udp_connection(const char *hostname,
const char *portname,
struct sockaddr *remote,
socklen_t *remote_len)
{
struct addrinfo hints;
struct addrinfo *ai_res;
struct addrinfo *ai;
int result;
int sd = -1;
memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
hints.ai_flags = AI_CANONNAME;
hints.ai_family = PF_UNSPEC;
hints.ai_addrlen = 0;
hints.ai_addr = 0;
hints.ai_canonname = 0;
fprintf(stderr,
"UDP socket solving %s,%s\n",
hostname, portname);
result = getaddrinfo(hostname, portname,
&hints, &ai_res);
if (result) {
fprintf(stderr, "%s: getaddrinfo(%s,%s): %s\n",
__PRETTY_FUNCTION__,
hostname, portname,
gai_strerror(result));
return -1;
}
for (ai = ai_res; ai; ai = ai->ai_next) {
fprintf(stderr,
"UDP socket trying %s,%s\n",
hostname, portname);
if (nepim_global.no_inet6 && (ai->ai_family == PF_INET6))
continue;
if (nepim_global.no_inet4 && (ai->ai_family == PF_INET))
continue;
sd = nepim_connect_client_socket(ai->ai_addr, ai->ai_addrlen,
ai->ai_family, ai->ai_socktype,
ai->ai_protocol,
nepim_global.pmtu_mode,
nepim_global.ttl,
nepim_global.win_recv,
nepim_global.win_send);
if (sd < 0) {
fprintf(stderr,
"could not connect UDP socket to %s,%s: %d\n",
hostname, portname, sd);
continue;
}
result = nepim_socket_mcast_ttl(sd, nepim_global.mcast_ttl);
if (result) {
fprintf(stderr,
"%d: failure setting mcast_ttl=%d: %d\n",
sd, nepim_global.mcast_ttl, result);
}
/* save remote address */
assert(*remote_len >= ai->ai_addrlen);
memcpy(remote, ai->ai_addr, ai->ai_addrlen);
*remote_len = ai->ai_addrlen;
{
union {
struct sockaddr_in inet;
struct sockaddr_in6 inet6;
} local_sockaddr;
socklen_t local_sockaddr_len = sizeof(local_sockaddr);
int result;
char local[100];
result = getsockname(sd, (struct sockaddr *) &local_sockaddr,
&local_sockaddr_len);
assert(!result);
nepim_sock_dump_addr(local, sizeof(local),
(const struct sockaddr *) &local_sockaddr);
fprintf(stderr,
"%d: UDP socket (%s,%d) connected to: "
"host=%s,%s(%d) len=%d family=%d type=%d proto=%d\n",
sd,
local,
nepim_sock_get_port((struct sockaddr *) &local_sockaddr),
hostname, portname,
nepim_sock_get_port(ai->ai_addr),
ai->ai_addrlen, ai->ai_family,
ai->ai_socktype, ai->ai_protocol);
}
break;
}
freeaddrinfo(ai_res);
return sd;
}
static void parse_hosts(const char *host_list)
{
int size = addr_list_size(host_list);
int i, j;
/* scan host_list */
for (j = 0; j < size; ++j) {
char hostname[100];
const char *portname;
/* get hostname */
if (addr_list_get(host_list, j, hostname, sizeof(hostname))) {
fprintf(stderr,
"%s: failure parsing address %d/%d from list: %s\n",
nepim_global.prog_name, j, size, host_list);
continue;
}
/* split host/port */
if (addr_split_port(hostname, sizeof(hostname), &portname))
portname = nepim_global.portname;
/* create sockets */
for (i = 0; i < nepim_global.pipes; ++i) {
union {
struct sockaddr_in inet;
struct sockaddr_in6 inet6;
} remote;
int remote_len = sizeof(remote);
int sd;
nepim_greet_t opt;
sd = spawn_udp_connection(hostname, portname,
(struct sockaddr *) &remote,
&remote_len);
if (sd < 0)
continue;
/* add socket to table of readers/writers */
nepim_usock_set_add(&udp_tab, sd);
opt.must_send = nepim_global.duplex_mode
|| nepim_global.simplex_client_send;
opt.bit_rate = nepim_global.bit_rate;
opt.pkt_rate = nepim_global.pkt_rate;
opt.stat_interval = nepim_global.stat_interval;
opt.test_duration = nepim_global.test_duration;
opt.write_delay = nepim_global.write_delay;
opt.udp_keepalive_send = nepim_global.udp_keepalive_send;
opt.udp_keepalive_require = nepim_global.udp_keepalive_require;
/* create UDP slots */
for (i = 0; i < nepim_global.pipes; ++i) {
int local_slot = nepim_slot_find_free(&slots);
int remote_slot = 0xFFFF;
nepim_slot_set_add(&slots, sd,
local_slot, remote_slot,
(const struct sockaddr *) &remote,
remote_len, &opt);
}
}
} /* scan host_list */
}
void nepim_udp_clients(const char *host_list)
{
int i;
/* spawn sockets */
parse_hosts(host_list);
/* activate monitoring for UDP socket */
for (i = 0; i < slots.array.capacity; ++i) {
nepim_slot_t *slot = nepim_slot_set_get(&slots, i);
nepim_session_t *session;
if (!slot)
continue;
session = &slot->session;
/*
schedule periodic greetings
(until answer from server)
*/
session->tv_greet_rate = OOP_TIME_NOW;
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_greet_rate,
on_udp_time_greet, slot);
/* wait answers */
will_slot_segment_read(slot);
}
}
syntax highlighted by Code2HTML, v. 0.9.1