/*-GNU-GPL-BEGIN-* nepim - network pipemeter Copyright (C) 2005 Everton da Silva Marques nepim is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. nepim is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with nepim; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *-GNU-GPL-END-*/ /* $Id: slot.c,v 1.27 2005/09/30 11:55:40 evertonm Exp $ */ #include #include #include #include #include #include #include "slot.h" #include "conf.h" #include "usock.h" #include "udp_header.h" #include "common.h" extern nepim_usock_set_t udp_tab; /* from server.c */ void nepim_slot_set_init(nepim_slot_set_t *set) { nepim_array_init(&set->array); } nepim_slot_t *nepim_slot_set_get(const nepim_slot_set_t *set, int index) { return nepim_array_get(&set->array, index); } nepim_slot_t *nepim_slot_set_search(const nepim_slot_set_t *set, int index) { return nepim_array_search(&set->array, index); } struct ctx_t { const struct sockaddr *sa; socklen_t len; }; static int match_remote_addr(const void *context, const void *element) { const struct ctx_t *ctx = context; const nepim_slot_t *slot = element; assert(ctx); if (slot) return !memcmp(&slot->session.remote, ctx->sa, ctx->len); return 0; } nepim_slot_t *nepim_slot_set_search_remote(const nepim_slot_set_t *set, const struct sockaddr *addr, socklen_t len) { struct ctx_t ctx = { addr, len }; return nepim_array_find(&set->array, match_remote_addr, &ctx); } int nepim_slot_find_free(nepim_slot_set_t *set) { return nepim_array_find_free(&set->array); } void nepim_slot_set_add(nepim_slot_set_t *set, int sd, int index, int index_remote, const struct sockaddr *remote, socklen_t remote_len, const nepim_greet_t *opt) { nepim_slot_t *slot = malloc(sizeof(*slot)); assert(slot); slot->index = index; slot->index_remote = index_remote; slot->udp_sd = sd; slot->seq = 0; slot->want_write = 0; /* boolean */ slot->greetings_sent = 0; slot->client_writer_status = NEPIM_SLOT_CLIENT_GREET; slot->keepalives_recv = 0; slot->interval_pkt_lost = 0; slot->interval_pkt_dup = 0; slot->total_pkt_lost = 0; slot->total_pkt_dup = 0; slot->exp_loss_sampler = 0; slot->exp_dup_sampler = 0; nepim_session_init(&slot->session, opt, remote, remote_len, SESSION_SLOT, index); nepim_win_init(&slot->win_recv_seq, nepim_global.udp_win_max); nepim_array_add(&set->array, index, slot); } void nepim_slot_set_del(nepim_slot_set_t *set, int index) { nepim_slot_t *slot = nepim_slot_set_get(set, index); assert(slot); nepim_win_del(&slot->win_recv_seq); free(slot); nepim_array_del(&set->array, index); } /* find next round-robin slot wanting to write on a shared file descriptor */ static int write_cursor = 0; static int slot_writer(nepim_slot_t *slot, int sd) { if (slot) if (slot->udp_sd == sd) if (slot->want_write) { assert(slot->want_write == 1); return -1; } return 0; } nepim_slot_t *nepim_slot_find_next_writer(const nepim_slot_set_t *set, int sd) { nepim_slot_t *slot; int last = write_cursor; write_cursor = (write_cursor + 1) % set->array.capacity; slot = nepim_slot_set_get(set, last); if (slot_writer(slot, sd)) return slot; for (; write_cursor != last; write_cursor = (write_cursor + 1) % set->array.capacity) { slot = nepim_slot_set_get(set, write_cursor); if (slot_writer(slot, sd)) return slot; } assert(0); return 0; } int nepim_slot_find_addr(const nepim_slot_set_t *set, int remote_slot, const struct sockaddr *remote, socklen_t remote_len) { nepim_slot_t *slot; int i; for (i = 0; i < set->array.capacity; ++i) { slot = nepim_slot_set_get(set, i); if (slot) if (slot->index_remote == remote_slot) if (slot->session.remote_len == remote_len) if (!memcmp(&slot->session.remote, remote, remote_len)) return -1; } return 0; } static void slot_buf_write(nepim_slot_t *slot, char *buf, size_t buf_size, uint8_t type) { nepim_udp_hdr_t hdr; assert(buf_size >= UDP_HEADER_LEN); hdr.version = UDP_VERSION; hdr.dst_slot = slot->index_remote; hdr.src_slot = slot->index; hdr.type = type; switch (type) { case UDP_TYPE_DATA: if (nepim_global.udp_exp_loss > 0) if (!(++slot->exp_loss_sampler % nepim_global.udp_exp_loss)) ++slot->seq; if (nepim_global.udp_exp_dup > 0) if (!(++slot->exp_dup_sampler % nepim_global.udp_exp_dup)) --slot->seq; hdr.seq = slot->seq++; break; case UDP_TYPE_HELLO: case UDP_TYPE_KEEPALIVE: hdr.seq = 0; break; default: assert(0); } nepim_udp_hdr_write(&hdr, buf, buf_size); } int nepim_slot_buf_sendto(nepim_slot_t *slot, char *buf, size_t buf_size, uint8_t type) { ssize_t wr; slot_buf_write(slot, buf, buf_size, type); wr = sendto(slot->udp_sd, buf, buf_size, 0, (const struct sockaddr *) &slot->session.remote, slot->session.remote_len); return wr; } int nepim_slot_buf_write(nepim_slot_t *slot, char *buf, size_t buf_size, uint8_t type) { ssize_t wr; slot_buf_write(slot, buf, buf_size, type); wr = write(slot->udp_sd, buf, buf_size); return wr; } void nepim_will_slot_keepalive(nepim_slot_t *slot, oop_call_fd *on_udp_write) { nepim_session_t *session = &slot->session; int sd = slot->udp_sd; assert(!session->must_send); assert(session->udp_keepalive_send); assert(!slot->want_write); ++slot->want_write; if (nepim_usock_writer_add(&udp_tab, sd)) return; nepim_global.oop_src->on_fd(nepim_global.oop_src, sd, OOP_WRITE, on_udp_write, 0); } void nepim_cancel_slot_keepalive(nepim_slot_t *slot) { nepim_session_t *session = &slot->session; int sd = slot->udp_sd; assert(!session->must_send); assert(session->udp_keepalive_send); assert(slot->want_write == 1); --slot->want_write; if (nepim_usock_writer_del(&udp_tab, sd)) return; nepim_global.oop_src->cancel_fd(nepim_global.oop_src, sd, OOP_WRITE); } void nepim_udp_write_keepalive(nepim_slot_t *slot, int (*nepim_slot_buf_send)(nepim_slot_t*, char*,size_t,uint8_t), oop_call_time *on_udp_keepalive_time, void (*udp_slot_kill)(nepim_slot_t *)) { char buf[nepim_global.udp_write_size]; int wr; nepim_session_t *session; int to_write; int sd; assert(nepim_global.udp_write_size == sizeof(buf)); assert(slot); /* stop writing */ nepim_cancel_slot_keepalive(slot); sd = slot->udp_sd; session = &slot->session; assert(!session->must_send); assert(session->udp_keepalive_send); to_write = UDP_HEADER_LEN; assert(to_write > 0); assert(to_write >= UDP_HEADER_LEN); assert(to_write <= nepim_global.udp_write_size); wr = nepim_slot_buf_send(slot, buf, to_write, UDP_TYPE_KEEPALIVE); if (wr < 1) { switch (errno) { case EINTR: case EAGAIN: nepim_usock_write_error(&udp_tab, sd, slot->index, slot->index_remote, errno); return; case EPIPE: fprintf(stderr, "keepalive_write: EPIPE on UDP socket %d\n", sd); break; } fprintf(stderr, "%d: keepalive_write: unexpected failure: errno=%d: %s\n", sd, errno, strerror(errno)); if (!session->duration_done) report_broken_slot_stat(stdout, slot); udp_slot_kill(slot); return; } assert(wr > 0); assert(wr <= to_write); nepim_usock_write_good(&udp_tab, sd); session->byte_total_sent += wr; session->byte_interval_sent += wr; ++session->total_writes; ++session->interval_writes; /* schedule for next time saved by on_udp_rate_delay() */ nepim_global.oop_src->on_time(nepim_global.oop_src, session->tv_keepalive, on_udp_keepalive_time, slot); } void nepim_schedule_keepalive_timer(nepim_slot_t *slot, oop_call_time *on_udp_keepalive_require) { nepim_session_t *session = &slot->session; assert(session->udp_keepalive_require); { int result = gettimeofday(&session->tv_keepalive_timer, 0); assert(!result); } nepim_timer_usec_add(&session->tv_keepalive_timer, nepim_global.udp_keepalive_timer); nepim_global.oop_src->on_time(nepim_global.oop_src, session->tv_keepalive_timer, on_udp_keepalive_require, slot); }