/*
This file is part of pathload.
pathload is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
pathload is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pathload; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*-------------------------------------------------
pathload : an end-to-end available bandwidth
estimation tool
Author : Manish Jain ( jain@cc.gatech.edu.udel.edu )
Constantinos Dovrolis (dovrolis@cc.gatech.edu )
Release : Ver 1.3.2
Support : This work was supported by the SciDAC
program of the US department
--------------------------------------------------*/
/*
* $Header: /net/cvs/bwtest/pathload/pathload_snd_func.c,v 1.109 2006/05/19 19:12:27 jain Exp $
*/
#include "pathload_gbls.h"
#include "pathload_snd.h"
/*
Send fleet for avail-bw estimation.
pkt_id and fleet_id start with 0.
*/
int send_fleet()
{
struct timeval tmp1 , tmp2 ;
struct timeval sleep_time ;
double t1=0, t2 = 0 ;
l_int32 ctr_code ;
l_int32 pkt_id ;
l_int32 pkt_cnt = 0 ;
l_int32 stream_cnt = 0 ;
l_int32 stream_id_n = 0 ;
l_int32 usec_n , sec_n,pkt_id_n ;
l_int32 sleep_tm_usec;
int ret_val ;
int stream_duration ;
int diff ;
int i ;
l_int32 tmp=0 ;
char *pkt_buf;
char ctr_buff[8];
if ( (pkt_buf = malloc(cur_pkt_sz*sizeof(char)) ) == NULL )
{
printf("ERROR : send_fleet : unable to malloc %ld bytes \n",cur_pkt_sz);
exit(-1);
}
srandom(getpid()); /* Create random payload; does it matter? */
for (i=0; i<cur_pkt_sz-1; i++) pkt_buf[i]=(char)(random()&0x000000ff);
pkt_id = 0 ;
if ( !quiet)
printf("Sending fleet %ld ",fleet_id);
while ( stream_cnt < num_stream )
{
if ( !quiet) printf("#");
fflush(stdout);
fleet_id_n = htonl(fleet_id) ;
memcpy(pkt_buf, &fleet_id_n , sizeof(l_int32));
stream_id_n = htonl(stream_cnt) ;
memcpy(pkt_buf+sizeof(l_int32), &stream_id_n , sizeof(l_int32));
gettimeofday(&tmp1 , NULL ) ;
t1 = (double) tmp1.tv_sec * 1000000.0 +(double)tmp1.tv_usec ;
for (pkt_cnt=0 ; pkt_cnt < stream_len ; pkt_cnt++)
{
pkt_id_n = htonl(pkt_cnt) ;
memcpy(pkt_buf+2*sizeof(l_int32), &pkt_id_n , sizeof(l_int32));
sec_n = htonl ( tmp1.tv_sec ) ;
memcpy(pkt_buf+3*sizeof(l_int32), &sec_n , sizeof(l_int32));
usec_n = htonl ( tmp1.tv_usec ) ;
memcpy(pkt_buf+4*sizeof(l_int32), &usec_n , sizeof(l_int32));
if ( send(sock_udp, pkt_buf, cur_pkt_sz,0 ) == -1 ) {perror("send"); return -1;}
gettimeofday(&tmp2, NULL) ;
t2 = (double) tmp2.tv_sec * 1000000.0 +(double)tmp2.tv_usec ;
tmp = (l_int32) (t2-t1);
if ( pkt_cnt < ( stream_len - 1 ) )
{
l_int32 tm_remaining = time_interval - tmp;
if ( tm_remaining > min_sleep_interval )
{
sleep_tm_usec = tm_remaining - (tm_remaining%min_timer_intr)
-min_timer_intr<200?2*min_timer_intr:min_timer_intr;
sleep_time.tv_sec = (int)(sleep_tm_usec / 1000000) ;
sleep_time.tv_usec = sleep_tm_usec - sleep_time.tv_sec*1000000 ;
select(1,NULL,NULL,NULL,&sleep_time);
}
gettimeofday(&tmp2,NULL) ;
t2 = (double) tmp2.tv_sec * 1000000.0 +(double)tmp2.tv_usec ;
diff = gettimeofday_latency>0?gettimeofday_latency-1:0;
while((t2 - t1) < (time_interval-diff) )
{
gettimeofday(&tmp2, NULL) ;
t2 = (double) tmp2.tv_sec * 1000000.0 +(double)tmp2.tv_usec ;
}
tmp1 = tmp2 ;
t1 = t2 ;
}
}
/* Wait for 2000 usec and send End of
stream message along with streamid. */
gettimeofday(&tmp2,NULL) ;
t1 = (double) tmp2.tv_sec * 1000000.0 +(double)tmp2.tv_usec ;
do
{
gettimeofday(&tmp2, NULL) ;
t2 = (double) tmp2.tv_sec * 1000000.0 +(double)tmp2.tv_usec ;
}while((t2 - t1) < 8000 ) ;
ctr_code = FINISHED_STREAM | CTR_CODE ;
if ( send_ctr_mesg(ctr_buff, ctr_code ) == -1 )
{
free(pkt_buf);
perror("send_ctr_mesg : FINISHED_STREAM");
return -1;
}
if ( send_ctr_mesg(ctr_buff, stream_cnt ) == -1 )
{
free(pkt_buf);
return -1;
}
/* Wait for continue/cancel message from receiver.*/
if( (ret_val = recv_ctr_mesg (ctr_buff )) == -1 )
{
free(pkt_buf);
return -1;
}
if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
((ret_val & 0x7fffffff) == CONTINUE_STREAM) )
stream_cnt++ ;
else if ((((ret_val & CTR_CODE) >> 31) == 1 )&&
((ret_val & 0x7fffffff) == ABORT_FLEET) )
{
free(pkt_buf);
return 0 ;
}
/* inter-stream latency is max (RTT,9*stream_duration)*/
stream_duration = stream_len * time_interval ;
if ( t2 - t1 < stream_duration * 9 )
{
/* release cpu if inter-stream gap is longer than min_sleep_time
*/
if ( t2 - t1 - stream_duration * 9 > min_sleep_interval )
{
sleep_tm_usec = time_interval - tmp -
((time_interval-tmp) % min_sleep_interval) - min_sleep_interval;
sleep_time.tv_sec = (int)(sleep_tm_usec / 1000000) ;
sleep_time.tv_usec = sleep_tm_usec - sleep_time.tv_sec*1000000 ;
select(1,NULL,NULL,NULL,&sleep_time);
gettimeofday(&tmp2,NULL) ;
t2 = (double) tmp2.tv_sec * 1000000.0 +(double)tmp2.tv_usec ;
}
/* busy wait for the remaining time */
do
{
gettimeofday(&tmp2 , NULL );
t2 = (double) tmp2.tv_sec * 1000000.0 +(double)tmp2.tv_usec ;
}while((t2 - t1) < stream_duration * 9 ) ;
}
/* A hack for slow links */
if ( stream_duration >= 500000 )
break ;
}
free(pkt_buf);
return 0 ;
}
/*
Send a message through the control stream
*/
int send_ctr_mesg(char *ctr_buff, l_int32 ctr_code)
{
l_int32 ctr_code_n = htonl(ctr_code);
memcpy((void*)ctr_buff, &ctr_code_n, sizeof(l_int32));
if (write(ctr_strm, ctr_buff, sizeof(l_int32)) != sizeof(l_int32))
return -1 ;
else return 0;
}
/*
Receive a message from the control stream
*/
l_int32 recv_ctr_mesg(char *ctr_buff)
{
struct timeval select_tv;
fd_set readset ;
l_int32 ctr_code;
select_tv.tv_sec = 50 ; /* if noctrl mesg for 50 sec, terminate */
select_tv.tv_usec=0 ;
FD_ZERO(&readset);
FD_SET(ctr_strm,&readset);
bzero(ctr_buff,4);
if ( select(ctr_strm+1,&readset,NULL,NULL,&select_tv) > 0 )
{
if ( read(ctr_strm, ctr_buff, sizeof(l_int32)) < 0 ) return -1 ;
memcpy(&ctr_code, ctr_buff, sizeof(l_int32));
return(ntohl(ctr_code));
}
else
{
printf("Receiver is not responding.\n");
return -1;
}
}
/*
Compute the time difference in microseconds between two timeval measurements
*/
double time_to_us_delta(struct timeval tv1, struct timeval tv2)
{
double time_us;
time_us= (double) ((tv2.tv_sec-tv1.tv_sec)*1000000 + (tv2.tv_usec-tv1.tv_usec));
return time_us;
}
/*
Order an array of doubles using bubblesort
*/
void order_dbl(double unord_arr[], double ord_arr[],int start, int num_elems)
{
int i,j,k;
double temp;
for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i];
for (i=1;i<num_elems;i++) {
for (j=i-1;j>=0;j--)
if (ord_arr[j+1] < ord_arr[j])
{
temp=ord_arr[j];
ord_arr[j]=ord_arr[j+1];
ord_arr[j+1]=temp;
}
else break;
}
}
/*
Order an array of float using bubblesort
*/
void order_float(float unord_arr[], float ord_arr[],int start, int num_elems)
{
int i,j,k;
double temp;
for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i];
for (i=1;i<num_elems;i++) {
for (j=i-1;j>=0;j--)
if (ord_arr[j+1] < ord_arr[j])
{
temp=ord_arr[j];
ord_arr[j]=ord_arr[j+1];
ord_arr[j+1]=temp;
}
else break;
}
}
l_int32 send_latency()
{
char *pack_buf ;
float min_OSdelta[50], ord_min_OSdelta[50];
int i, len ;
int sock_udp ;
struct timeval first_time ,current_time;
struct sockaddr_in snd_udp_addr, rcv_udp_addr ;
if ( max_pkt_sz == 0 || (pack_buf = malloc(max_pkt_sz*sizeof(char)) ) == NULL )
{
printf("ERROR : send_latency : unable to malloc %ld bytes \n",max_pkt_sz);
exit(-1);
}
if ((sock_udp=socket(AF_INET, SOCK_DGRAM, 0)) < 0)
{
perror("socket(AF_INET,SOCK_DGRAM,0):");
exit(-1);
}
bzero((char*)&snd_udp_addr, sizeof(snd_udp_addr));
snd_udp_addr.sin_family = AF_INET;
snd_udp_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
snd_udp_addr.sin_port = 0 ;
if (bind(sock_udp, (struct sockaddr*)&snd_udp_addr, sizeof(snd_udp_addr)) < 0)
{
perror("bind(sock_udp):");
exit(-1);
}
len = sizeof(rcv_udp_addr);
if (getsockname(sock_udp, (struct sockaddr *)&rcv_udp_addr, &len ) < 0 )
{
perror("getsockname");
exit(-1);
}
if(connect(sock_udp,(struct sockaddr *)&rcv_udp_addr, sizeof(rcv_udp_addr)) < 0 )
{
perror("connect(sock_udp)");
exit(-1);
}
srandom(getpid()); /* Create random payload; does it matter? */
for (i=0; i<max_pkt_sz-1; i++) pack_buf[i]=(char)(random()&0x000000ff);
for (i=0; i<50; i++)
{
gettimeofday(&first_time, NULL);
if ( send(sock_udp, pack_buf, max_pkt_sz, 0) == -1 ) perror("sendto");
gettimeofday(¤t_time, NULL);
recv(sock_udp, pack_buf, max_pkt_sz, 0);
min_OSdelta[i] = time_to_us_delta(first_time, current_time);
}
/* Use median of measured latencies to avoid outliers */
order_float(min_OSdelta, ord_min_OSdelta,0, 50);
if ( pack_buf != NULL ) free(pack_buf);
return (ord_min_OSdelta[25]);
}
int send_train()
{
struct timeval select_tv;
char *pack_buf ;
int train_id , train_id_n ;
int pack_id , pack_id_n ;
l_int32 ctr_code ;
int ret_val ;
int i ;
int train_len=0;
char ctr_buff[8];
if ( (pack_buf = malloc(max_pkt_sz*sizeof(char)) ) == NULL )
{
printf("ERROR : send_train : unable to malloc %ld bytes \n",max_pkt_sz);
exit(-1);
}
train_id = 0 ;
srandom(getpid()); /* Create random payload; does it matter? */
for (i=0; i<max_pkt_sz-1; i++) pack_buf[i]=(char)(random()&0x000000ff);
while ( train_id < MAX_TRAIN )
{
if ( train_len == 5)
train_len = 3;
else
train_len = TRAIN_LEN - train_id*15;
train_id_n = htonl(train_id) ;
memcpy(pack_buf, &train_id_n, sizeof(l_int32));
for (pack_id=0; pack_id <= train_len; pack_id++)
{
pack_id_n = htonl(pack_id);
memcpy(pack_buf+sizeof(l_int32), &pack_id_n, sizeof(l_int32));
send(sock_udp, pack_buf, max_pkt_sz,0 ) ;
}
select_tv.tv_sec=0;select_tv.tv_usec=1000;
select(0,NULL,NULL,NULL,&select_tv);
ctr_code = FINISHED_TRAIN | CTR_CODE ;
send_ctr_mesg(ctr_buff, ctr_code );
if (( ret_val = recv_ctr_mesg (ctr_buff)) == -1 )return -1;
if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
((ret_val & 0x7fffffff) == BAD_TRAIN) )
{
train_id++ ;
continue ;
}
else
{
free(pack_buf);
return 0;
}
}
free(pack_buf);
return 0 ;
}
#define NUM_SELECT_CALL 31
void min_sleeptime()
{
struct timeval sleep_time, time[NUM_SELECT_CALL] ;
int res[NUM_SELECT_CALL] , ord_res[NUM_SELECT_CALL] ;
int i ;
l_int32 tm ;
gettimeofday(&time[0], NULL);
for(i=1;i<NUM_SELECT_CALL;i++)
{
sleep_time.tv_sec=0;sleep_time.tv_usec=1;
gettimeofday(&time[i], NULL);
select(0,NULL,NULL,NULL,&sleep_time);
}
for(i=1;i<NUM_SELECT_CALL;i++)
{
res[i-1] = (time[i].tv_sec-time[i-1].tv_sec)*1000000+
time[i].tv_usec-time[i-1].tv_usec ;
#ifdef DEBUG
printf("DEBUG :: %.2f ",(time[i].tv_sec-time[i-1].tv_sec)*1000000.+
time[i].tv_usec-time[i-1].tv_usec);
printf("DEBUG :: %d \n",res[i-1]);
#endif
}
order_int(res,ord_res,NUM_SELECT_CALL-1);
min_sleep_interval=(ord_res[NUM_SELECT_CALL/2]+ord_res[NUM_SELECT_CALL/2+1])/2;
gettimeofday(&time[0], NULL);
tm = min_sleep_interval+min_sleep_interval/4;
for(i=1;i<NUM_SELECT_CALL;i++)
{
sleep_time.tv_sec=0;sleep_time.tv_usec=tm;
gettimeofday(&time[i], NULL);
select(0,NULL,NULL,NULL,&sleep_time);
}
for(i=1;i<NUM_SELECT_CALL;i++)
{
res[i-1] = (time[i].tv_sec-time[i-1].tv_sec)*1000000+
time[i].tv_usec-time[i-1].tv_usec ;
#ifdef DEBUG
printf("DEBUG :: %.2f ",(time[i].tv_sec-time[i-1].tv_sec)*1000000.+
time[i].tv_usec-time[i-1].tv_usec);
printf("DEBUG :: %d \n",res[i-1]);
#endif
}
order_int(res,ord_res,NUM_SELECT_CALL-1);
min_timer_intr=(ord_res[NUM_SELECT_CALL/2]+ord_res[NUM_SELECT_CALL/2+1])/2-min_sleep_interval;
#ifdef DEBUG
printf("DEBUG :: min_sleep_interval %ld\n",min_sleep_interval);
printf("DEBUG :: min_timer_intr %ld\n",min_timer_intr);
#endif
}
/*
Order an array of int using bubblesort
*/
void order_int(int unord_arr[], int ord_arr[], int num_elems)
{
int i,j;
int temp;
for (i=0;i<num_elems;i++) ord_arr[i]=unord_arr[i];
for (i=1;i<num_elems;i++) {
for (j=i-1;j>=0;j--)
if (ord_arr[j+1] < ord_arr[j])
{
temp=ord_arr[j];
ord_arr[j]=ord_arr[j+1];
ord_arr[j+1]=temp;
}
else break;
}
}
void help()
{
fprintf(stderr, "usage: pathload_snd [-q] [-h|-H] [-i]\n");
fprintf (stderr,"-q : quite mode\n");
fprintf (stderr,"-h|-H : print this help and exit\n");
fprintf (stderr,"-i : run sender in iterative \"persistent\" mode\n");
exit(0);
}
syntax highlighted by Code2HTML, v. 0.9.1