/*-GNU-GPL-BEGIN-*
nepim - network pipemeter
Copyright (C) 2005 Everton da Silva Marques
nepim is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
nepim is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with nepim; see the file COPYING. If not, write to
the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*-GNU-GPL-END-*/
/* $Id: slot.c,v 1.27 2005/09/30 11:55:40 evertonm Exp $ */
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include "slot.h"
#include "conf.h"
#include "usock.h"
#include "udp_header.h"
#include "common.h"
extern nepim_usock_set_t udp_tab; /* from server.c */
void nepim_slot_set_init(nepim_slot_set_t *set)
{
nepim_array_init(&set->array);
}
nepim_slot_t *nepim_slot_set_get(const nepim_slot_set_t *set, int index)
{
return nepim_array_get(&set->array, index);
}
nepim_slot_t *nepim_slot_set_search(const nepim_slot_set_t *set, int index)
{
return nepim_array_search(&set->array, index);
}
struct ctx_t {
const struct sockaddr *sa;
socklen_t len;
};
static int match_remote_addr(const void *context, const void *element)
{
const struct ctx_t *ctx = context;
const nepim_slot_t *slot = element;
assert(ctx);
if (slot)
return !memcmp(&slot->session.remote, ctx->sa, ctx->len);
return 0;
}
nepim_slot_t *nepim_slot_set_search_remote(const nepim_slot_set_t *set,
const struct sockaddr *addr,
socklen_t len)
{
struct ctx_t ctx = { addr, len };
return nepim_array_find(&set->array, match_remote_addr, &ctx);
}
int nepim_slot_find_free(nepim_slot_set_t *set)
{
return nepim_array_find_free(&set->array);
}
void nepim_slot_set_add(nepim_slot_set_t *set, int sd,
int index, int index_remote,
const struct sockaddr *remote,
socklen_t remote_len,
const nepim_greet_t *opt)
{
nepim_slot_t *slot = malloc(sizeof(*slot));
assert(slot);
slot->index = index;
slot->index_remote = index_remote;
slot->udp_sd = sd;
slot->seq = 0;
slot->want_write = 0; /* boolean */
slot->greetings_sent = 0;
slot->client_writer_status = NEPIM_SLOT_CLIENT_GREET;
slot->keepalives_recv = 0;
slot->interval_pkt_lost = 0;
slot->interval_pkt_dup = 0;
slot->total_pkt_lost = 0;
slot->total_pkt_dup = 0;
slot->exp_loss_sampler = 0;
slot->exp_dup_sampler = 0;
nepim_session_init(&slot->session, opt,
remote, remote_len,
SESSION_SLOT, index);
nepim_win_init(&slot->win_recv_seq, nepim_global.udp_win_max);
nepim_array_add(&set->array, index, slot);
}
void nepim_slot_set_del(nepim_slot_set_t *set, int index)
{
nepim_slot_t *slot = nepim_slot_set_get(set, index);
assert(slot);
nepim_win_del(&slot->win_recv_seq);
free(slot);
nepim_array_del(&set->array, index);
}
/*
find next round-robin slot wanting
to write on a shared file descriptor
*/
static int write_cursor = 0;
static int slot_writer(nepim_slot_t *slot, int sd)
{
if (slot)
if (slot->udp_sd == sd)
if (slot->want_write) {
assert(slot->want_write == 1);
return -1;
}
return 0;
}
nepim_slot_t *nepim_slot_find_next_writer(const nepim_slot_set_t *set, int sd)
{
nepim_slot_t *slot;
int last = write_cursor;
write_cursor = (write_cursor + 1) % set->array.capacity;
slot = nepim_slot_set_get(set, last);
if (slot_writer(slot, sd))
return slot;
for (; write_cursor != last; write_cursor = (write_cursor + 1) % set->array.capacity) {
slot = nepim_slot_set_get(set, write_cursor);
if (slot_writer(slot, sd))
return slot;
}
assert(0);
return 0;
}
int nepim_slot_find_addr(const nepim_slot_set_t *set,
int remote_slot,
const struct sockaddr *remote,
socklen_t remote_len)
{
nepim_slot_t *slot;
int i;
for (i = 0; i < set->array.capacity; ++i) {
slot = nepim_slot_set_get(set, i);
if (slot)
if (slot->index_remote == remote_slot)
if (slot->session.remote_len == remote_len)
if (!memcmp(&slot->session.remote, remote, remote_len))
return -1;
}
return 0;
}
static void slot_buf_write(nepim_slot_t *slot, char *buf, size_t buf_size, uint8_t type)
{
nepim_udp_hdr_t hdr;
assert(buf_size >= UDP_HEADER_LEN);
hdr.version = UDP_VERSION;
hdr.dst_slot = slot->index_remote;
hdr.src_slot = slot->index;
hdr.type = type;
switch (type) {
case UDP_TYPE_DATA:
if (nepim_global.udp_exp_loss > 0)
if (!(++slot->exp_loss_sampler % nepim_global.udp_exp_loss))
++slot->seq;
if (nepim_global.udp_exp_dup > 0)
if (!(++slot->exp_dup_sampler % nepim_global.udp_exp_dup))
--slot->seq;
hdr.seq = slot->seq++;
break;
case UDP_TYPE_HELLO:
case UDP_TYPE_KEEPALIVE:
hdr.seq = 0;
break;
default:
assert(0);
}
nepim_udp_hdr_write(&hdr, buf, buf_size);
}
int nepim_slot_buf_sendto(nepim_slot_t *slot, char *buf, size_t buf_size, uint8_t type)
{
ssize_t wr;
slot_buf_write(slot, buf, buf_size, type);
wr = sendto(slot->udp_sd, buf, buf_size, 0,
(const struct sockaddr *) &slot->session.remote,
slot->session.remote_len);
return wr;
}
int nepim_slot_buf_write(nepim_slot_t *slot, char *buf, size_t buf_size, uint8_t type)
{
ssize_t wr;
slot_buf_write(slot, buf, buf_size, type);
wr = write(slot->udp_sd, buf, buf_size);
return wr;
}
void nepim_will_slot_keepalive(nepim_slot_t *slot, oop_call_fd *on_udp_write)
{
nepim_session_t *session = &slot->session;
int sd = slot->udp_sd;
assert(!session->must_send);
assert(session->udp_keepalive_send);
assert(!slot->want_write);
++slot->want_write;
if (nepim_usock_writer_add(&udp_tab, sd))
return;
nepim_global.oop_src->on_fd(nepim_global.oop_src, sd,
OOP_WRITE, on_udp_write, 0);
}
void nepim_cancel_slot_keepalive(nepim_slot_t *slot)
{
nepim_session_t *session = &slot->session;
int sd = slot->udp_sd;
assert(!session->must_send);
assert(session->udp_keepalive_send);
assert(slot->want_write == 1);
--slot->want_write;
if (nepim_usock_writer_del(&udp_tab, sd))
return;
nepim_global.oop_src->cancel_fd(nepim_global.oop_src,
sd, OOP_WRITE);
}
void nepim_udp_write_keepalive(nepim_slot_t *slot,
int (*nepim_slot_buf_send)(nepim_slot_t*,
char*,size_t,uint8_t),
oop_call_time *on_udp_keepalive_time,
void (*udp_slot_kill)(nepim_slot_t *))
{
char buf[nepim_global.udp_write_size];
int wr;
nepim_session_t *session;
int to_write;
int sd;
assert(nepim_global.udp_write_size == sizeof(buf));
assert(slot);
/* stop writing */
nepim_cancel_slot_keepalive(slot);
sd = slot->udp_sd;
session = &slot->session;
assert(!session->must_send);
assert(session->udp_keepalive_send);
to_write = UDP_HEADER_LEN;
assert(to_write > 0);
assert(to_write >= UDP_HEADER_LEN);
assert(to_write <= nepim_global.udp_write_size);
wr = nepim_slot_buf_send(slot, buf, to_write, UDP_TYPE_KEEPALIVE);
if (wr < 1) {
switch (errno) {
case EINTR:
case EAGAIN:
nepim_usock_write_error(&udp_tab, sd,
slot->index, slot->index_remote,
errno);
return;
case EPIPE:
fprintf(stderr, "keepalive_write: EPIPE on UDP socket %d\n", sd);
break;
}
fprintf(stderr,
"%d: keepalive_write: unexpected failure: errno=%d: %s\n",
sd, errno, strerror(errno));
if (!session->duration_done)
report_broken_slot_stat(stdout, slot);
udp_slot_kill(slot);
return;
}
assert(wr > 0);
assert(wr <= to_write);
nepim_usock_write_good(&udp_tab, sd);
session->byte_total_sent += wr;
session->byte_interval_sent += wr;
++session->total_writes;
++session->interval_writes;
/* schedule for next time saved by on_udp_rate_delay() */
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_keepalive,
on_udp_keepalive_time, slot);
}
void nepim_schedule_keepalive_timer(nepim_slot_t *slot,
oop_call_time *on_udp_keepalive_require)
{
nepim_session_t *session = &slot->session;
assert(session->udp_keepalive_require);
{
int result = gettimeofday(&session->tv_keepalive_timer, 0);
assert(!result);
}
nepim_timer_usec_add(&session->tv_keepalive_timer,
nepim_global.udp_keepalive_timer);
nepim_global.oop_src->on_time(nepim_global.oop_src,
session->tv_keepalive_timer,
on_udp_keepalive_require, slot);
}
syntax highlighted by Code2HTML, v. 0.9.1