/*-GNU-GPL-BEGIN-*
nepim - network pipemeter
Copyright (C) 2005 Everton da Silva Marques
nepim is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
nepim is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with nepim; see the file COPYING. If not, write to
the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*-GNU-GPL-END-*/
/* $Id: client.c,v 1.37 2005/09/22 10:24:56 evertonm Exp $ */
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <assert.h>
#include "conf.h"
#include "sock.h"
#include "pipe.h"
#include "common.h"
#include "usock.h"
#include "str.h"
extern nepim_pipe_set_t pipes; /* from server.c */
extern nepim_slot_set_t slots; /* from server.c */
extern nepim_usock_set_t udp_tab; /* from server.c */
static void *on_tcp_interval(oop_source *src, struct timeval tv, void *user);
static void tcp_pipe_stop(int sd);
static void *on_tcp_rate_delay(oop_source *src, struct timeval tv, void *user);
static void schedule_stat_interval(int sd)
{
nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd);
nepim_session_t *session;
assert(pipe);
session = &pipe->session;
{
int result = gettimeofday(&session->tv_interval, 0);
assert(!result);
}
session->tv_interval.tv_sec += session->stat_interval;
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_interval,
on_tcp_interval, pipe);
}
static void *on_tcp_read(oop_source *src, int sd,
oop_event event, void *user)
{
char buf[nepim_global.tcp_read_size];
int rd;
nepim_pipe_t *pipe = user;
nepim_session_t *session;
assert(event == OOP_READ);
assert(sd == pipe->sd);
assert(sizeof(buf) == nepim_global.tcp_read_size);
errno = 0;
rd = read(sd, buf, nepim_global.tcp_read_size);
if (rd <= 0) {
if (rd) {
switch (errno) {
case EINTR:
fprintf(stderr, "read: EINTR on TCP socket %d\n", sd);
return OOP_CONTINUE;
case EAGAIN:
fprintf(stderr, "read: EAGAIN on TCP socket %d\n", sd);
return OOP_CONTINUE;
default:
fprintf(stderr, "read: errno=%d: %s\n",
errno, strerror(errno));
}
}
fprintf(stderr, "read: connection lost on TCP socket %d\n", sd);
report_broken_pipe_stat(stdout, pipe);
tcp_pipe_stop(sd);
close(sd);
return OOP_CONTINUE;
}
assert(rd >= 0);
assert(rd <= sizeof(buf));
session = &pipe->session;
nepim_session_read_add(session, rd);
return OOP_CONTINUE;
}
static void *on_tcp_write(oop_source *src, int sd,
oop_event event, void *user)
{
char buf[nepim_global.tcp_write_size];
int wr;
nepim_pipe_t *pipe = user;
nepim_session_t *session = &pipe->session;
assert(event == OOP_WRITE);
assert(sd == pipe->sd);
assert(session->max_bit_rate < 1);
assert(session->max_pkt_rate < 1);
assert(sizeof(buf) == nepim_global.tcp_write_size);
wr = write(sd, buf, nepim_global.tcp_write_size);
if (wr < 0) {
switch (errno) {
case EINTR:
fprintf(stderr, "write: EINTR on TCP socket %d\n", sd);
return OOP_CONTINUE;
case EAGAIN:
fprintf(stderr, "write: EAGAIN on TCP socket %d\n", sd);
return OOP_CONTINUE;
case EPIPE:
fprintf(stderr, "write: EPIPE on TCP socket %d\n", sd);
break;
}
fprintf(stderr, "write: connection lost on TCP socket %d\n", sd);
report_broken_pipe_stat(stdout, pipe);
tcp_pipe_stop(sd);
close(sd);
return OOP_CONTINUE;
}
assert(wr >= 0);
assert(wr <= sizeof(buf));
nepim_session_write_add(session, wr);
return OOP_CONTINUE;
}
static void *on_tcp_rate_write(oop_source *src, int sd,
oop_event event, void *user)
{
char buf[nepim_global.tcp_write_size];
int wr;
nepim_pipe_t *pipe = user;
nepim_session_t *session = &pipe->session;
int to_write;
assert(event == OOP_WRITE);
assert(sd == pipe->sd);
#ifndef NDEBUG
{
if (session->max_bit_rate <= 0) {
assert(session->max_pkt_rate > 0);
}
if (session->max_pkt_rate <= 0) {
assert(session->max_bit_rate > 0);
}
}
#endif /* NDEBUG */
assert(sizeof(buf) == nepim_global.tcp_write_size);
#ifndef NDEBUG
{
if (session->rate_remaining <= 0) {
assert(session->pkt_remaining > 0);
}
if (session->pkt_remaining <= 0) {
assert(session->rate_remaining > 0);
}
}
#endif /* NDEBUG */
if (session->rate_remaining > 0)
to_write = NEPIM_MIN(session->rate_remaining, sizeof(buf));
else
to_write = sizeof(buf);
assert(to_write >= 0);
assert(to_write <= sizeof(buf));
wr = write(sd, buf, to_write);
if (wr < 0) {
switch (errno) {
case EINTR:
fprintf(stderr, "rate_write: EINTR on TCP socket %d\n", sd);
return OOP_CONTINUE;
case EAGAIN:
fprintf(stderr, "rate_write: EAGAIN on TCP socket %d\n", sd);
return OOP_CONTINUE;
case EPIPE:
fprintf(stderr, "rate_write: EPIPE on TCP socket %d\n", sd);
break;
}
fprintf(stderr, "rate_write: connection lost on TCP socket %d\n", sd);
report_broken_pipe_stat(stdout, pipe);
tcp_pipe_stop(sd);
close(sd);
return OOP_CONTINUE;
}
assert(wr >= 0);
assert(wr <= sizeof(buf));
assert(wr <= to_write);
session->rate_remaining -= wr;
--session->pkt_remaining;
nepim_session_write_add(session, wr);
/* finished ? */
if (
((session->max_bit_rate > 0) && (session->rate_remaining < 1))
||
((session->max_pkt_rate > 0) && (session->pkt_remaining < 1))
){
/* stop writing */
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_WRITE);
/* schedule for next time saved by on_tcp_rate_delay() */
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_send_rate,
on_tcp_rate_delay, pipe);
}
return OOP_CONTINUE;
}
static void *on_tcp_rate_delay(oop_source *src, struct timeval tv, void *user)
{
nepim_pipe_t *pipe = user;
nepim_session_t *session = &pipe->session;
assert(timercmp(&tv, &session->tv_send_rate, ==));
#ifndef NDEBUG
{
if (session->max_bit_rate <= 0) {
assert(session->max_pkt_rate > 0);
}
if (session->max_pkt_rate <= 0) {
assert(session->max_bit_rate > 0);
}
}
#endif /* NDEBUG */
/* save next scheduling time */
{
int result = gettimeofday(&session->tv_send_rate, 0);
assert(!result);
}
nepim_timer_usec_add(&session->tv_send_rate, session->write_delay);
/* calculate bytes to be written from rate */
session->rate_remaining = nepim_bps2bytes(session->max_bit_rate,
session->write_delay);
session->pkt_remaining = nepim_pps2packets(session->max_pkt_rate,
session->write_delay);
#ifndef NDEBUG
{
if (session->rate_remaining <= 0) {
assert(session->pkt_remaining > 0);
}
if (session->pkt_remaining <= 0) {
assert(session->rate_remaining > 0);
}
}
#endif /* NDEBUG */
/* start to write */
nepim_global.oop_src->on_fd(nepim_global.oop_src,
pipe->sd, OOP_WRITE,
on_tcp_rate_write, pipe);
return OOP_CONTINUE;
}
static void *on_tcp_duration(oop_source *src, struct timeval tv, void *user)
{
nepim_pipe_t *pipe = user;
nepim_session_t *session = &pipe->session;
assert(timercmp(&tv, &session->tv_duration, ==));
nepim_pipe_stat(stdout, NEPIM_LABEL_TOTAL, pipe->sd,
session->byte_total_recv,
session->byte_total_sent,
session->test_duration,
session->tv_start.tv_sec,
session->test_duration,
session->total_reads,
session->total_writes);
session->byte_total_recv = 0;
session->byte_total_sent = 0;
session->total_reads = 0;
session->total_writes = 0;
tcp_pipe_stop(pipe->sd);
close(pipe->sd);
return OOP_CONTINUE;
}
static void *on_tcp_interval(oop_source *src, struct timeval tv, void *user)
{
nepim_pipe_t *pipe = user;
nepim_session_t *session = &pipe->session;
assert(timercmp(&tv, &session->tv_interval, ==));
nepim_pipe_stat(stdout, NEPIM_LABEL_PARTIAL, pipe->sd,
session->byte_interval_recv,
session->byte_interval_sent,
session->stat_interval,
session->tv_start.tv_sec,
session->test_duration,
session->interval_reads,
session->interval_writes);
session->byte_interval_recv = 0;
session->byte_interval_sent = 0;
session->interval_reads = 0;
session->interval_writes = 0;
schedule_stat_interval(pipe->sd);
return OOP_CONTINUE;
}
static int greet(int sd)
{
char buf[1024];
nepim_greet_t opt;
int pr;
int wr;
char *tmp = "";
opt.must_send = nepim_global.duplex_mode ||
!nepim_global.simplex_client_send;
opt.bit_rate = nepim_global.bit_rate;
opt.pkt_rate = nepim_global.pkt_rate;
opt.stat_interval = nepim_global.stat_interval;
opt.test_duration = nepim_global.test_duration;
opt.write_delay = nepim_global.write_delay;
opt.udp_keepalive_send = -1;
opt.udp_keepalive_require = -1;
if (nepim_global.password) {
opt.password_buf = nepim_global.password;
opt.password_buf_size = strlen(nepim_global.password) + 1;
}
else {
opt.password_buf = tmp;
opt.password_buf_size = strlen(tmp) + 1;
}
pr = nepim_write_greetings(&opt, buf, sizeof(buf));
if (pr < 0)
return -1;
assert(pr > 0);
fprintf(stderr, "%d: sending: %s", sd, buf);
wr = write(sd, buf, pr);
if (wr != pr)
return -1;
return 0;
}
static void tcp_pipe_start(int sd)
{
nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd);
nepim_session_t *session;
assert(pipe);
session = &pipe->session;
nepim_global.oop_src->on_fd(nepim_global.oop_src,
sd, OOP_READ,
on_tcp_read, pipe);
if (session->must_send) {
if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
session->tv_send_rate = OOP_TIME_NOW;
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_send_rate,
on_tcp_rate_delay, pipe);
}
else {
nepim_global.oop_src->on_fd(nepim_global.oop_src,
sd, OOP_WRITE,
on_tcp_write, pipe);
}
}
{
int result = gettimeofday(&session->tv_start, 0);
assert(!result);
session->tv_duration = session->tv_start;
}
session->tv_duration.tv_sec += session->test_duration;
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_duration,
on_tcp_duration, pipe);
nepim_sock_show_opt(stderr, sd);
schedule_stat_interval(sd);
}
static void tcp_pipe_stop(int sd)
{
nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd);
nepim_session_t *session;
assert(pipe);
session = &pipe->session;
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_READ);
if (session->must_send) {
if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) {
/* stop current writing, if any */
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_WRITE);
/* stop periodic write scheduler */
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
session->tv_send_rate,
on_tcp_rate_delay, pipe);
}
else {
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_WRITE);
}
}
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
session->tv_duration,
on_tcp_duration, pipe);
nepim_global.oop_src->cancel_time(nepim_global.oop_src,
session->tv_interval,
on_tcp_interval, pipe);
nepim_pipe_set_del(&pipes, sd);
nepim_sock_show_opt(stderr, sd);
}
static void spawn_one_tcp_client(const char *hostname, const char *portname)
{
struct addrinfo hints;
struct addrinfo *ai_res;
struct addrinfo *ai;
int result;
memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_CANONNAME;
hints.ai_family = PF_UNSPEC;
hints.ai_addrlen = 0;
hints.ai_addr = 0;
hints.ai_canonname = 0;
fprintf(stderr,
"TCP socket solving %s,%s\n",
hostname, portname);
result = getaddrinfo(hostname, portname,
&hints, &ai_res);
if (result) {
fprintf(stderr, "%s: getaddrinfo(%s,%s): %s\n",
__PRETTY_FUNCTION__, hostname, portname,
gai_strerror(result));
return;
}
for (ai = ai_res; ai; ai = ai->ai_next) {
int sd;
fprintf(stderr,
"TCP socket trying %s,%s\n",
hostname, portname);
if (nepim_global.no_inet6 && (ai->ai_family == PF_INET6))
continue;
if (nepim_global.no_inet4 && (ai->ai_family == PF_INET))
continue;
sd = nepim_connect_client_socket(ai->ai_addr, ai->ai_addrlen,
ai->ai_family, ai->ai_socktype,
ai->ai_protocol,
nepim_global.pmtu_mode,
nepim_global.ttl,
nepim_global.win_recv,
nepim_global.win_send);
if (sd < 0) {
fprintf(stderr,
"could not connect TCP socket to %s,%s: %d\n",
hostname, portname, sd);
continue;
}
fprintf(stderr,
"%d: TCP socket connected to: "
"host=%s,%s len=%d family=%d type=%d proto=%d\n",
sd, hostname, portname,
ai->ai_addrlen, ai->ai_family,
ai->ai_socktype, ai->ai_protocol);
{
nepim_greet_t opt;
opt.must_send = nepim_global.duplex_mode ||
nepim_global.simplex_client_send;
opt.bit_rate = nepim_global.bit_rate;
opt.pkt_rate = nepim_global.pkt_rate;
opt.stat_interval = nepim_global.stat_interval;
opt.test_duration = nepim_global.test_duration;
opt.write_delay = nepim_global.write_delay;
opt.udp_keepalive_send = nepim_global.udp_keepalive_send;
opt.udp_keepalive_require = nepim_global.udp_keepalive_require;
nepim_pipe_set_add(&pipes, sd, ai->ai_addr, ai->ai_addrlen, &opt);
}
break;
}
freeaddrinfo(ai_res);
}
static void parse_hosts(const char *host_list)
{
int size = addr_list_size(host_list);
int i, j;
/* scan host_list */
for (j = 0; j < size; ++j) {
char hostname[100];
const char *portname;
/* get hostname */
if (addr_list_get(host_list, j, hostname, sizeof(hostname))) {
fprintf(stderr,
"%s: failure parsing address %d/%d from list: %s\n",
nepim_global.prog_name, j, size, host_list);
continue;
}
/* split host/port */
if (addr_split_port(hostname, sizeof(hostname), &portname))
portname = nepim_global.portname;
/* create sockets */
for (i = 0; i < nepim_global.pipes; ++i)
spawn_one_tcp_client(hostname, portname);
} /* scan host_list */
}
static void spawn_tcp_clients(const char *host_list)
{
int i;
/* spawn sockets */
parse_hosts(host_list);
/* activate sockets */
for (i = 0; i < pipes.array.capacity; ++i) {
char peer[100];
nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, i);
if (!pipe)
continue;
nepim_sock_dump_addr(peer, sizeof(peer),
(const struct sockaddr *) &pipe->session.remote);
if (greet(i)) {
fprintf(stderr,
"%d: could not greet %s,%d\n",
i, peer,
nepim_sock_get_port((const struct sockaddr *) &pipe->session.remote));
nepim_pipe_set_del(&pipes, i);
close(i);
continue;
}
fprintf(stderr,
"%d: greetings sent to %s,%d\n",
i, peer,
nepim_sock_get_port((const struct sockaddr *) &pipe->session.remote));
tcp_pipe_start(i);
}
}
void nepim_udp_clients(const char *host_list);
void nepim_client_run()
{
void *result;
nepim_pipe_set_init(&pipes);
if (nepim_global.udp_mode) {
nepim_slot_set_init(&slots);
nepim_usock_set_init(&udp_tab);
nepim_udp_clients(nepim_global.hostname);
}
else
spawn_tcp_clients(nepim_global.hostname);
result = oop_sys_run(nepim_global.oop_sys);
}
syntax highlighted by Code2HTML, v. 0.9.1