/*-GNU-GPL-BEGIN-* nepim - network pipemeter Copyright (C) 2005 Everton da Silva Marques nepim is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. nepim is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with nepim; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *-GNU-GPL-END-*/ /* $Id: server.c,v 1.44 2005/10/13 19:02:20 evertonm Exp $ */ #include #include #include #include #include #include #include #include #include #include #include "conf.h" #include "sock.h" #include "pipe.h" #include "common.h" #include "slot.h" #include "usock.h" #include "str.h" #include "tcpwrap.h" nepim_pipe_set_t pipes; /* tcp socket table */ nepim_slot_set_t slots; /* udp pseudo-sockets */ nepim_usock_set_t udp_tab; /* udp socket table */ const char *INET_ANY = "0.0.0.0"; const char *INET6_ANY = "::"; static void schedule_stat_interval(int sd); static void tcp_pipe_cancel_timers(int sd); static void tcp_pipe_cancel_io(int sd); static void tcp_pipe_kill(int sd); static void *on_tcp_rate_delay(oop_source *src, struct timeval tv, void *user); static void *on_tcp_read(oop_source *src, int sd, oop_event event, void *user) { char buf[nepim_global.tcp_read_size]; int rd; nepim_pipe_t *pipe = user; nepim_session_t *session = &pipe->session; assert(nepim_global.tcp_read_size = sizeof(buf)); errno = 0; rd = read(sd, buf, nepim_global.tcp_read_size); if (rd <= 0) { if (rd) { switch (errno) { case EINTR: fprintf(stderr, "read: EINTR on TCP socket %d\n", sd); return OOP_CONTINUE; case EAGAIN: fprintf(stderr, "read: EAGAIN on TCP socket %d\n", sd); return OOP_CONTINUE; default: fprintf(stderr, "read: errno=%d: %s\n", errno, strerror(errno)); } } fprintf(stderr, "read: connection lost on TCP socket %d\n", sd); if (!session->duration_done) report_broken_pipe_stat(stdout, pipe); tcp_pipe_kill(sd); return OOP_CONTINUE; } assert(rd >= 0); assert(rd <= sizeof(buf)); nepim_session_read_add(session, rd); return OOP_CONTINUE; } static void *on_tcp_write(oop_source *src, int sd, oop_event event, void *user) { char buf[nepim_global.tcp_write_size]; int wr; nepim_pipe_t *pipe = user; nepim_session_t *session = &pipe->session; assert(event == OOP_WRITE); assert(sd == pipe->sd); assert(session->max_bit_rate < 1); assert(session->max_pkt_rate < 1); assert(sizeof(buf) == nepim_global.tcp_write_size); wr = write(sd, buf, nepim_global.tcp_write_size); if (wr < 0) { switch (errno) { case EINTR: fprintf(stderr, "write: EINTR on TCP socket %d\n", sd); return OOP_CONTINUE; case EAGAIN: fprintf(stderr, "write: EAGAIN on TCP socket %d\n", sd); return OOP_CONTINUE; case EPIPE: fprintf(stderr, "write: EPIPE on TCP socket %d\n", sd); break; } fprintf(stderr, "write: connection lost on TCP socket %d\n", sd); if (!session->duration_done) report_broken_pipe_stat(stdout, pipe); tcp_pipe_kill(sd); return OOP_CONTINUE; } assert(wr >= 0); assert(wr <= sizeof(buf)); nepim_session_write_add(session, wr); return OOP_CONTINUE; } static void *on_tcp_rate_write(oop_source *src, int sd, oop_event event, void *user) { char buf[nepim_global.tcp_write_size]; int wr; nepim_pipe_t *pipe = user; nepim_session_t *session = &pipe->session; int to_write; assert(event == OOP_WRITE); assert(sd == pipe->sd); #ifndef NDEBUG { if (session->max_bit_rate <= 0) { assert(session->max_pkt_rate > 0); } if (session->max_pkt_rate <= 0) { assert(session->max_bit_rate > 0); } } #endif /* NDEBUG */ assert(sizeof(buf) == nepim_global.tcp_write_size); #ifndef NDEBUG { if (session->rate_remaining <= 0) { assert(session->pkt_remaining > 0); } if (session->pkt_remaining <= 0) { assert(session->rate_remaining > 0); } } #endif /* NDEBUG */ if (session->rate_remaining > 0) to_write = NEPIM_MIN(session->rate_remaining, sizeof(buf)); else to_write = sizeof(buf); assert(to_write >= 0); assert(to_write <= sizeof(buf)); wr = write(sd, buf, to_write); if (wr < 0) { switch (errno) { case EINTR: fprintf(stderr, "rate_write: EINTR on TCP socket %d\n", sd); return OOP_CONTINUE; case EAGAIN: fprintf(stderr, "rate_write: EAGAIN on TCP socket %d\n", sd); return OOP_CONTINUE; case EPIPE: fprintf(stderr, "rate_write: EPIPE on TCP socket %d\n", sd); break; } fprintf(stderr, "rate_write: connection lost on TCP socket %d\n", sd); if (!session->duration_done) report_broken_pipe_stat(stdout, pipe); tcp_pipe_kill(sd); return OOP_CONTINUE; } assert(wr >= 0); assert(wr <= sizeof(buf)); assert(wr <= to_write); session->rate_remaining -= wr; --session->pkt_remaining; nepim_session_write_add(session, wr); /* finished ? */ if ( ((session->max_bit_rate > 0) && (session->rate_remaining < 1)) || ((session->max_pkt_rate > 0) && (session->pkt_remaining < 1)) ){ /* stop writing */ nepim_global.oop_src->cancel_fd(nepim_global.oop_src, sd, OOP_WRITE); /* schedule for next time saved by on_tcp_rate_delay() */ nepim_global.oop_src->on_time(nepim_global.oop_src, session->tv_send_rate, on_tcp_rate_delay, pipe); } return OOP_CONTINUE; } static void *on_tcp_rate_delay(oop_source *src, struct timeval tv, void *user) { nepim_pipe_t *pipe = user; nepim_session_t *session = &pipe->session; assert(timercmp(&tv, &session->tv_send_rate, ==)); #ifndef NDEBUG { if (session->max_bit_rate <= 0) { assert(session->max_pkt_rate > 0); } if (session->max_pkt_rate <= 0) { assert(session->max_bit_rate > 0); } } #endif /* NDEBUG */ /* save next scheduling time */ { int result = gettimeofday(&session->tv_send_rate, 0); assert(!result); } nepim_timer_usec_add(&session->tv_send_rate, session->write_delay); /* calculate bytes to be written from rate */ session->rate_remaining = nepim_bps2bytes(session->max_bit_rate, session->write_delay); session->pkt_remaining = nepim_pps2packets(session->max_pkt_rate, session->write_delay); #ifndef NDEBUG { if (session->rate_remaining <= 0) { assert(session->pkt_remaining > 0); } if (session->pkt_remaining <= 0) { assert(session->rate_remaining > 0); } } #endif /* NDEBUG */ /* start to write */ nepim_global.oop_src->on_fd(nepim_global.oop_src, pipe->sd, OOP_WRITE, on_tcp_rate_write, pipe); return OOP_CONTINUE; } static void *on_tcp_duration(oop_source *src, struct timeval tv, void *user) { nepim_pipe_t *pipe = user; nepim_session_t *session = &pipe->session; nepim_pipe_stat(stdout, NEPIM_LABEL_TOTAL, pipe->sd, session->byte_total_recv, session->byte_total_sent, session->test_duration, session->tv_start.tv_sec, session->test_duration, session->total_reads, session->total_writes); session->byte_total_recv = 0; session->byte_total_sent = 0; session->total_reads = 0; session->total_writes = 0; session->duration_done = 1; tcp_pipe_cancel_timers(pipe->sd); return OOP_CONTINUE; } static void *on_tcp_interval(oop_source *src, struct timeval tv, void *user) { nepim_pipe_t *pipe = user; nepim_session_t *session = &pipe->session; nepim_pipe_stat(stdout, NEPIM_LABEL_PARTIAL, pipe->sd, session->byte_interval_recv, session->byte_interval_sent, session->stat_interval, session->tv_start.tv_sec, session->test_duration, session->interval_reads, session->interval_writes); session->byte_interval_recv = 0; session->byte_interval_sent = 0; session->interval_reads = 0; session->interval_writes = 0; schedule_stat_interval(pipe->sd); return OOP_CONTINUE; } static int parse_greetings(int sd, const struct sockaddr *remote, socklen_t remote_len, const char *buf, const char *past_end) { nepim_greet_t opt; int result; char password_buf[100]; opt.password_buf = password_buf; opt.password_buf_size = sizeof(password_buf); result = nepim_parse_greetings(&opt, buf, past_end); if (result) return result; fprintf(stderr, "%d: send=%d bit_rate=%lld pkt_rate=%d " "interval=%d duration=%d delay=%ld ka_send=%d ka_req=%d " "password=%s\n", sd, opt.must_send, opt.bit_rate, opt.pkt_rate, opt.stat_interval, opt.test_duration, opt.write_delay, opt.udp_keepalive_send, opt.udp_keepalive_require, password_buf); if (nepim_global.password) if (strcmp(nepim_global.password, password_buf)) { fprintf(stderr, "%d: bad client password=%s\n", sd, password_buf); return 1; } nepim_pipe_set_add(&pipes, sd, remote, remote_len, &opt); return 0; } static int read_greetings(int sd, const struct sockaddr *remote, socklen_t remote_len) { char buf[1024]; int len = 0; char *eos; int result; for (;;) { char *curr = buf + len; int left = sizeof(buf) - len; int rd; assert(left > 0); rd = read(sd, curr, left); if (!rd) { fprintf(stderr, "%s: incoming connection lost\n", __PRETTY_FUNCTION__); return -1; } if (rd < 0) { if (errno == EAGAIN) continue; } assert(rd > 0); assert(rd <= left); len += rd; eos = memchr(curr, '\n', rd); if (eos) break; } *eos = '\0'; result = parse_greetings(sd, remote, remote_len, buf, eos); if (result) { fprintf(stderr, "%d: bad client greetings: %d [%s]\n", sd, result, buf); return result; } return 0; } static void schedule_stat_interval(int sd) { nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd); nepim_session_t *session; assert(pipe); session = &pipe->session; { int result = gettimeofday(&session->tv_interval, 0); assert(!result); } session->tv_interval.tv_sec += session->stat_interval; nepim_global.oop_src->on_time(nepim_global.oop_src, session->tv_interval, on_tcp_interval, pipe); } static void tcp_pipe_start(int sd) { nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd); nepim_session_t *session; assert(pipe); session = &pipe->session; nepim_global.oop_src->on_fd(nepim_global.oop_src, sd, OOP_READ, on_tcp_read, pipe); if (session->must_send) { if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) { session->tv_send_rate = OOP_TIME_NOW; nepim_global.oop_src->on_time(nepim_global.oop_src, session->tv_send_rate, on_tcp_rate_delay, pipe); } else { nepim_global.oop_src->on_fd(nepim_global.oop_src, sd, OOP_WRITE, on_tcp_write, pipe); } } { int result = gettimeofday(&session->tv_start, 0); assert(!result); session->tv_duration = session->tv_start; } session->tv_duration.tv_sec += session->test_duration; nepim_global.oop_src->on_time(nepim_global.oop_src, session->tv_duration, on_tcp_duration, pipe); nepim_sock_show_opt(stderr, sd); schedule_stat_interval(sd); } static void tcp_pipe_cancel_timers(int sd) { nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd); nepim_session_t *session; assert(pipe); session = &pipe->session; nepim_global.oop_src->cancel_time(nepim_global.oop_src, session->tv_duration, on_tcp_duration, pipe); nepim_global.oop_src->cancel_time(nepim_global.oop_src, session->tv_interval, on_tcp_interval, pipe); } static void tcp_pipe_cancel_io(int sd) { nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd); nepim_session_t *session; assert(pipe); session = &pipe->session; nepim_global.oop_src->cancel_fd(nepim_global.oop_src, sd, OOP_READ); if (session->must_send) { if ((session->max_bit_rate > 0) || (session->max_pkt_rate > 0)) { /* stop current writing, if any */ nepim_global.oop_src->cancel_fd(nepim_global.oop_src, sd, OOP_WRITE); /* stop periodic write scheduler */ nepim_global.oop_src->cancel_time(nepim_global.oop_src, session->tv_send_rate, on_tcp_rate_delay, pipe); } else { nepim_global.oop_src->cancel_fd(nepim_global.oop_src, sd, OOP_WRITE); } } } static void tcp_pipe_kill(int sd) { nepim_pipe_t *pipe = nepim_pipe_set_get(&pipes, sd); assert(pipe); tcp_pipe_cancel_timers(sd); tcp_pipe_cancel_io(sd); nepim_sock_show_opt(stderr, sd); if (close(sd)) fprintf(stderr, "%d: close failed\n", sd); nepim_pipe_set_del(&pipes, sd); } static void *on_tcp_connect(oop_source *src, int sd, oop_event event, void *unnused) { int conn_sd; union { struct sockaddr_in inet; struct sockaddr_in6 inet6; } sa; int len = sizeof(sa); int result; char addr_buf[100]; conn_sd = accept(sd, (struct sockaddr *) &sa, &len); if (conn_sd < 0) { fprintf(stderr, "%s: %s: could not accept connection: errno=%d: %s\n", __FILE__, __PRETTY_FUNCTION__, errno, strerror(errno)); return OOP_CONTINUE; } nepim_sock_dump_addr(addr_buf, sizeof(addr_buf), (const struct sockaddr *) &sa); fprintf(stderr, "%d: TCP incoming: %s,%d\n", conn_sd, addr_buf, nepim_sock_get_port((const struct sockaddr *) &sa)); if (nepim_global.tcpwrap) { if (!nepim_hosts_ctl(nepim_global.tcpwrap, nepim_global.prog_name, addr_buf)) { fprintf(stderr, "%d: %s: %s: TCP wrapper denied access from client=[%s] to service=[%s]\n", conn_sd, __FILE__, __PRETTY_FUNCTION__, addr_buf, nepim_global.prog_name); close(conn_sd); return OOP_CONTINUE; } } result = nepim_socket_opt(conn_sd, nepim_global.pmtu_mode, nepim_global.ttl); if (result) { fprintf(stderr, "%d: %s: %s: could not set socket options: %d\n", conn_sd, __FILE__, __PRETTY_FUNCTION__, result); close(conn_sd); return OOP_CONTINUE; } result = nepim_socket_tcp_opt(conn_sd); if (result) { fprintf(stderr, "%d: %s: %s: could not set tcp options: %d\n", conn_sd, __FILE__, __PRETTY_FUNCTION__, result); close(conn_sd); return OOP_CONTINUE; } if (nepim_socket_block(conn_sd)) { fprintf(stderr, "%d: %s: %s: could not set blocking socket mode\n", conn_sd, __FILE__, __PRETTY_FUNCTION__); close(conn_sd); return OOP_CONTINUE; } if (read_greetings(conn_sd, (const struct sockaddr *) &sa, len)) { fprintf(stderr, "%d: %s: %s: could not parse client greetings\n", conn_sd, __FILE__, __PRETTY_FUNCTION__); close(conn_sd); return OOP_CONTINUE; } if (nepim_socket_nonblock(conn_sd)) { fprintf(stderr, "%d: %s: %s: could not set non-blocking socket mode\n", conn_sd, __FILE__, __PRETTY_FUNCTION__); close(conn_sd); return OOP_CONTINUE; } tcp_pipe_start(conn_sd); return OOP_CONTINUE; } static int spawn_tcp_listener(const char *hostname, const char *portname) { struct addrinfo hints; struct addrinfo *ai_res; struct addrinfo *ai; int result; int tcp_listeners = 0; memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_PASSIVE | AI_CANONNAME; hints.ai_family = PF_UNSPEC; hints.ai_addrlen = 0; hints.ai_addr = 0; hints.ai_canonname = 0; result = getaddrinfo(hostname, portname, &hints, &ai_res); if (result) { fprintf(stderr, "%s: getaddrinfo(%s,%s): %s\n", __PRETTY_FUNCTION__, hostname, portname, gai_strerror(result)); return 0; } for (ai = ai_res; ai; ai = ai->ai_next) { int sd; #if 0 fprintf(stderr, "TCP listener: host=%s,%s(%d) len=%d family=%d type=%d proto=%d\n", hostname, portname, nepim_sock_get_port(ai->ai_addr), ai->ai_addrlen, ai->ai_family, ai->ai_socktype, ai->ai_protocol); #endif if (nepim_global.no_inet6 && (ai->ai_family == PF_INET6)) continue; if (nepim_global.no_inet4 && (ai->ai_family == PF_INET)) continue; sd = nepim_create_listener_socket(ai->ai_addr, ai->ai_addrlen, ai->ai_family, ai->ai_socktype, ai->ai_protocol, nepim_global.listen_backlog, nepim_global.pmtu_mode, nepim_global.ttl, nepim_global.win_recv, nepim_global.win_send); if (sd < 0) { fprintf(stderr, "%s: TCP listener socket failed for %s,%s(%d): %d\n", __PRETTY_FUNCTION__, hostname, portname, nepim_sock_get_port(ai->ai_addr), sd); break; } nepim_global.oop_src->on_fd(nepim_global.oop_src, sd, OOP_READ, on_tcp_connect, 0); ++tcp_listeners; fprintf(stderr, "%d: TCP socket listening on %s,%s(%d)\n", sd, hostname, portname, nepim_sock_get_port(ai->ai_addr)); } freeaddrinfo(ai_res); return tcp_listeners; } extern int nepim_udp_listener(const char *hostname, const char *portname, int join); static void parse_listeners(const char *list, int *udp, int *tcp, int join) { int i; int size = addr_list_size(list); for (i = 0; i < size; ++i) { char buf[100]; const char *portname; if (addr_list_get(list, i, buf, sizeof(buf))) { fprintf(stderr, "%s: failure parsing address %d/%d from list: %s\n", nepim_global.prog_name, i, size, list); continue; } if (addr_split_port(buf, sizeof(buf), &portname)) portname = nepim_global.portname; *tcp += spawn_tcp_listener(buf, portname); *udp += nepim_udp_listener(buf, portname, join); } } void nepim_server_run() { void *result; int tcp_listeners = 0; int udp_listeners = 0; /* these must be initialized before spawning listeners */ nepim_pipe_set_init(&pipes); nepim_slot_set_init(&slots); nepim_usock_set_init(&udp_tab); if (nepim_global.bind_list) parse_listeners(nepim_global.bind_list, &udp_listeners, &tcp_listeners, 0 /* no join */); if (nepim_global.join_list) parse_listeners(nepim_global.join_list, &udp_listeners, &tcp_listeners, 1 /* do join */); if (!nepim_global.bind_list && !nepim_global.join_list) { if (!nepim_global.no_inet6) tcp_listeners += spawn_tcp_listener(INET6_ANY, nepim_global.portname); tcp_listeners += spawn_tcp_listener(INET_ANY, nepim_global.portname); if (!nepim_global.no_inet6) udp_listeners += nepim_udp_listener(INET6_ANY, nepim_global.portname, 0 /* no join */); udp_listeners += nepim_udp_listener(INET_ANY, nepim_global.portname, 0 /* no join */); } if (tcp_listeners || udp_listeners) fprintf(stderr, "%s: server ready\n", nepim_global.prog_name); result = oop_sys_run(nepim_global.oop_sys); }