/* This file is part of pathload. pathload is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. pathload is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with pathload; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /*------------------------------------------------- pathload : an end-to-end available bandwidth estimation tool Author : Manish Jain ( jain@cc.gatech.edu ) Constantinos Dovrolis (dovrolis@cc.gatech.edu ) Release : Ver 1.3.2 Support : This work was supported by the SciDAC program of the US department --------------------------------------------------*/ /* * $Header: /net/cvs/bwtest/pathload/pathload_rcv_func.c,v 1.294 2006/05/19 20:21:15 jain Exp $ */ #include "pathload_gbls.h" #include "pathload_rcv.h" l_int32 recvfrom_latency(struct sockaddr_in rcv_udp_addr) { char *random_data; float min_OSdelta[50], ord_min_OSdelta[50]; l_int32 j ; struct timeval current_time, first_time ; if ( (random_data = malloc(max_pkt_sz*sizeof(char)) ) == NULL ) { printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz); exit(-1); } srandom(getpid()); /* Create random payload; does it matter? */ for (j=0; j0)putchar(' '); printf(":: "); } fputc(']',pathload_fp); while(--spacecnt>0)fputc(' ',pathload_fp); fprintf(pathload_fp,":: "); if ( !bad_train) { if(Verbose) printf("%.2fMbps\n", bw_msr ) ; fprintf(pathload_fp,"%.2fMbps\n", bw_msr ) ; } else { for ( i=0;i 0 ) { if (FD_ISSET(sock_udp,&readset) ) { #endif if (recvfrom(sock_udp, pack_buf, max_pkt_sz, 0, NULL, NULL) != -1) { gettimeofday(¤t_time, NULL); memcpy(&train_id, pack_buf, sizeof(l_int32)); train_id = ntohl(train_id) ; memcpy(&pack_id, pack_buf+sizeof(l_int32), sizeof(l_int32)); pack_id=ntohl(pack_id); if (train_id == exp_train_id && pack_id==exp_pack_id ) { rcvd++; time[pack_id] = current_time ; exp_pack_id++; } else bad_train=1; } #ifndef THRLIB } // end of FD_ISSET if ( FD_ISSET(sock_tcp,&readset) ) { /* check the control connection.*/ if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 ) { if ( (((ret_val & CTR_CODE) >> 31) == 1) && ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) ) { break; } } } } // end of select } while (1); #else } while (!arg.finished_stream); #endif if ( rcvd != train_len+1 ) bad_train=1; gettimeofday(&time[pack_id+1], NULL); sigstruct.sa_handler = SIG_DFL ; sigemptyset(&sigstruct.sa_mask); sigstruct.sa_flags = 0 ; sigaction(SIGUSR1 , &sigstruct,NULL ); free(pack_buf); return bad_train ; } l_int32 check_intr_coalescence(struct timeval time[],l_int32 len, l_int32 *burst) { double delta[MAX_STREAM_LEN]; l_int32 b2b=0,tmp=0; l_int32 i; l_int32 min_gap; min_gap = MIN_TIME_INTERVAL > 3*rcv_latency ? MIN_TIME_INTERVAL : 3*rcv_latency ; //printf("---%d\n",len); for (i=2;i=3 ) { (*burst)++; tmp=0; } } } //fprintf(stderr,"\tNumber of b2b %d, Number of burst %d\n",b2b,*burst); if ( b2b > .6*len ) { return 1; } else return 0; } /* Receive N streams . After each stream, compute the loss rate. Mark a stream "lossy" , if losss rate in that stream is more than a threshold. */ l_int32 recv_fleet() { struct sigaction sigstruct ; struct timeval snd_tv[MAX_STREAM_LEN], arrv_tv[MAX_STREAM_LEN]; struct timeval current_time, first_time; double pkt_loss_rate ; double owd[MAX_STREAM_LEN] ; double snd_tm[MAX_STREAM_LEN] ; double arrv_tm[MAX_STREAM_LEN]; l_int32 ctr_code ; l_int32 pkt_lost = 0 ; l_int32 stream_id_n , stream_id=0 ; l_int32 total_pkt_rcvd=0 ,pkt_rcvd = 0 ; l_int32 pkt_id = 0 ; l_int32 pkt_id_n = 0 ; l_int32 exp_pkt_id = 0 ; l_int32 stream_cnt = 0 ; /* 0->n*/ l_int32 fleet_id , fleet_id_n = 0 ; l_int32 lossy_stream = 0 ; l_int32 return_val = 0 ; l_int32 finished_stream = 0 ; l_int32 stream_duration ; l_int32 num_sndr_cs[20],num_rcvr_cs[20]; char ctr_buff[8]; char *pkt_buf ; double owdfortd[MAX_STREAM_LEN]; l_int32 num_substream,substream[MAX_STREAM_LEN]; l_int32 low,high,len,j; l_int32 b2b_pkt_per_stream[20]; l_int32 tmp_b2b; #ifdef THRLIB pthread_t tid ; thr_arg arg ; pthread_attr_t attr ; #endif l_int32 num_bursts; l_int32 abort_fleet=0; l_int32 p=0; struct timeval select_tv; fd_set readset; l_int32 ret_val ; if ( (pkt_buf = malloc(cur_pkt_sz*sizeof(char)) ) == NULL ) { printf("ERROR : unable to malloc %ld bytes \n",cur_pkt_sz); exit(-1); } trend_idx=0; ic_flag = 0; if(verbose&&!Verbose) printf("Receiving Fleet %ld, Rate %.2fMbps\n",exp_fleet_id,tr); if(Verbose) { printf("\nReceiving Fleet %ld\n",exp_fleet_id); printf(" Fleet Parameter(req) :: R=%.2fMbps, L=%ldB, K=%ldpackets, \ T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval) ; } fprintf(pathload_fp,"\nReceiving Fleet %ld\n",exp_fleet_id); fprintf(pathload_fp," Fleet Parameter(req) :: R=%.2fMbps, L=%ldB, \ K=%ldpackets, T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval); if(Verbose) printf(" Lossrate per stream :: "); fprintf(pathload_fp," Lossrate per stream :: "); sigstruct.sa_handler = sig_sigusr1 ; sigemptyset(&sigstruct.sa_mask); sigstruct.sa_flags = 0 ; #ifdef SA_INTERRUPT sigstruct.sa_flags |= SA_INTERRUPT ; #endif sigaction(SIGUSR1 , &sigstruct,NULL ); while ( stream_cnt < num_stream ) { #ifdef THRLIB arg.finished_stream=0; arg.ptid = pthread_self() ; arg.stream_cnt = stream_cnt ; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 ) { perror("recv_fleet::pthread_create"); exit(-1); } #endif pkt_lost = 0 ; first_time.tv_sec = 0 ; for (j=0; j < stream_len; j++ ) { snd_tv[j].tv_sec=0 ; snd_tv[j].tv_usec=0 ; arrv_tv[j].tv_sec=0; arrv_tv[j].tv_usec=0; } /* Receive K packets of ith stream */ #ifdef THRLIB while(!arg.finished_stream) #else while(1) #endif { #ifndef THRLIB FD_ZERO(&readset); FD_SET(sock_tcp,&readset); FD_SET(sock_udp,&readset); select_tv.tv_sec=1000;select_tv.tv_usec=0; if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 ) { if (FD_ISSET(sock_udp,&readset) ) { #endif if( recvfrom(sock_udp,pkt_buf,cur_pkt_sz,0,NULL,NULL) > 0 ) { gettimeofday(¤t_time,NULL); memcpy(&fleet_id_n,pkt_buf , sizeof(l_int32)); fleet_id = ntohl(fleet_id_n) ; memcpy(&stream_id_n,pkt_buf+sizeof(l_int32) , sizeof(l_int32)); stream_id = ntohl(stream_id_n) ; memcpy(&pkt_id_n, pkt_buf+2*sizeof(l_int32), sizeof(l_int32)); pkt_id = ntohl(pkt_id_n) ; if ( fleet_id == exp_fleet_id && stream_id == stream_cnt && pkt_id >= exp_pkt_id ) { if ( first_time.tv_sec == 0 ) first_time = current_time; arrv_tv[pkt_id] = current_time ; memcpy(&(snd_tv[pkt_id].tv_sec) , pkt_buf+3*sizeof(l_int32), sizeof(l_int32)); memcpy(&(snd_tv[pkt_id].tv_usec), pkt_buf+4*sizeof(l_int32), sizeof(l_int32)); if ( pkt_id > exp_pkt_id ) /* reordered are considered as lost */ { pkt_lost += ( pkt_id - exp_pkt_id ) ; exp_pkt_id = pkt_id ; } ++exp_pkt_id ; ++pkt_rcvd; } } // end of recvfrom #ifndef THRLIB } // end of FD_ISSET if ( FD_ISSET(sock_tcp,&readset) ) { /* check the control connection.*/ if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 ) { if ( (((ret_val & CTR_CODE) >> 31) == 1) && ((ret_val & 0x7fffffff) == FINISHED_STREAM ) ) { while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ; if ( ret_val == stream_cnt ) { break ; } } } } } // end of select else { perror("select"); exit(0); } #endif } for (j=0; j < stream_len; j++ ) { snd_tv[j].tv_sec = ntohl(snd_tv[j].tv_sec); snd_tv[j].tv_usec = ntohl(snd_tv[j].tv_usec); snd_tm[j]= snd_tv[j].tv_sec * 1000000.0 + snd_tv[j].tv_usec ; arrv_tm[j] = arrv_tv[j].tv_sec * 1000000.0 + arrv_tv[j].tv_usec ; owd[j] = arrv_tm[j] - snd_tm[j] ; } total_pkt_rcvd += pkt_rcvd ; finished_stream = 0 ; pkt_lost += stream_len - exp_pkt_id ; pkt_loss_rate = (double )pkt_lost * 100. / stream_len ; if(Verbose) printf(":%.1f",pkt_loss_rate ) ; fprintf(pathload_fp,":%.1f",pkt_loss_rate ) ; exp_pkt_id = 0 ; stream_cnt++ ; num_bursts=0; if ( interrupt_coalescence ) ic_flag=check_intr_coalescence(arrv_tv,pkt_rcvd,&num_bursts); if ( pkt_loss_rate < HIGH_LOSS_RATE && pkt_loss_rate >= MEDIUM_LOSS_RATE ) lossy_stream++ ; if ( pkt_loss_rate >= HIGH_LOSS_RATE || ( stream_cnt >= num_stream && lossy_stream*100./stream_cnt >= MAX_LOSSY_STREAM_FRACTION )) { if ( increase_stream_len ) { increase_stream_len=0; lower_bound=1; } if(Verbose) printf("\n Fleet aborted due to high lossrate"); fprintf(pathload_fp,"\n Fleet aborted due to high lossrate"); abort_fleet=1; break ; } else { /* analyze trend in stream */ num += get_sndr_time_interval(snd_tm,&snd_time_interval) ; adjust_offset_to_zero(owd, stream_len); num_substream = eliminate_sndr_side_CS(snd_tm,substream); num_sndr_cs[stream_cnt-1] = num_substream ; substream[num_substream++]=stream_len-1; low=0; num_rcvr_cs[stream_cnt-1]=0; tmp_b2b=0; for (j=0;j MIN_STREAM_LEN ) { get_trend(owdfortd,len); } } low=high+1; } if ( abort_fleet ) break; else { b2b_pkt_per_stream[stream_cnt-1] = tmp_b2b ; ctr_code = CONTINUE_STREAM | CTR_CODE; send_ctr_mesg(ctr_buff, ctr_code); } } pkt_rcvd = 0 ; /* A hack for slow links */ stream_duration = stream_len * time_interval ; if ( stream_duration >= 500000 ) { slow=1; break ; } } /*end of while (stream_cnt < num_stream ). */ if ( Verbose ) printf("\n"); fprintf(pathload_fp,"\n"); if ( abort_fleet ) { printf("\tAborting fleet. Stream_cnt %d\n",stream_cnt); ctr_code = ABORT_FLEET | CTR_CODE; send_ctr_mesg(ctr_buff , ctr_code ) ; return_val = -1 ; } else print_contextswitch_info(num_sndr_cs,num_rcvr_cs,b2b_pkt_per_stream,stream_cnt); exp_fleet_id++ ; free(pkt_buf); return return_val ; } void print_contextswitch_info(l_int32 num_sndr_cs[], l_int32 num_rcvr_cs[],l_int32 discard[],l_int32 stream_cnt) { l_int32 j; if (Verbose) printf(" # of CS @ sndr :: "); fprintf(pathload_fp," # of CS @ sndr :: "); for(j=0;j 0 ) { /* check ... mesg received */ if ( FD_ISSET(sock_tcp,&readset) ) { /* check the control connection.*/ if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 ) { if ( (((ret_val & CTR_CODE) >> 31) == 1) && ((ret_val & 0x7fffffff) == FINISHED_STREAM ) ) { while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ; if ( ret_val == ((thr_arg *)arg)->stream_cnt ) { ((thr_arg *)arg)->finished_stream =1 ; pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1); pthread_exit(NULL); } } else if ( (((ret_val & CTR_CODE) >> 31) == 1) && ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) ) { select_tv.tv_usec = 2000 ; select_tv.tv_sec = 0 ; select(1,NULL,NULL,NULL,&select_tv); ((thr_arg *)arg)->finished_stream =1 ; pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1); pthread_exit(NULL); } } } } #endif return NULL; } void get_trend(double owdfortd[],l_int32 pkt_cnt ) { double median_owd[MAX_STREAM_LEN]; l_int32 median_owd_len=0; double ordered[MAX_STREAM_LEN]; l_int32 j,count,pkt_per_min; //pkt_per_min = 5 ; pkt_per_min = (int)floor(sqrt((double)pkt_cnt)); count = 0 ; for ( j = 0 ; j < pkt_cnt ; j=j+pkt_per_min ) { if ( j+pkt_per_min >= pkt_cnt ) count = pkt_cnt - j ; else count = pkt_per_min; order_dbl(owdfortd , ordered ,j,count ) ; if ( count % 2 == 0 ) median_owd[median_owd_len++] = ( ordered[(int)(count*.5) -1] + ordered[(int)(count*0.5)] )/2 ; else median_owd[median_owd_len++] = ordered[(int)(count*0.5)] ; } pct_metric[trend_idx]= pairwise_comparision_test(median_owd , 0 , median_owd_len ); pdt_metric[trend_idx]= pairwise_diff_test(median_owd , 0, median_owd_len ); trend_idx+=1; } /* Order an array of doubles using bubblesort */ void order_dbl(double unord_arr[], double ord_arr[],l_int32 start, l_int32 num_elems) { l_int32 i,j,k; double temp; for (i=start,k=0;i=0;j--) if (ord_arr[j+1] < ord_arr[j]) { temp=ord_arr[j]; ord_arr[j]=ord_arr[j+1]; ord_arr[j+1]=temp; } else break; } } /* Order an array of float using bubblesort */ void order_float(float unord_arr[], float ord_arr[],l_int32 start, l_int32 num_elems) { l_int32 i,j,k; double temp; for (i=start,k=0;i=0;j--) if (ord_arr[j+1] < ord_arr[j]) { temp=ord_arr[j]; ord_arr[j]=ord_arr[j+1]; ord_arr[j+1]=temp; } else break; } } /* Order an array of l_int32 using bubblesort */ void order_int(l_int32 unord_arr[], l_int32 ord_arr[], l_int32 num_elems) { l_int32 i,j; l_int32 temp; for (i=0;i=0;j--) if (ord_arr[j+1] < ord_arr[j]) { temp=ord_arr[j]; ord_arr[j]=ord_arr[j+1]; ord_arr[j+1]=temp; } else break; } } /* Send a message through the control stream */ void send_ctr_mesg(char *ctr_buff, l_int32 ctr_code) { l_int32 ctr_code_n = htonl(ctr_code); memcpy((void*)ctr_buff, &ctr_code_n, sizeof(l_int32)); if (write(sock_tcp, ctr_buff, sizeof(l_int32)) != sizeof(l_int32)) { fprintf(stderr, "send control message failed:\n"); exit(-1); } } /* Receive message from the control stream */ l_int32 recv_ctr_mesg(l_int32 ctr_strm, char *ctr_buff) { l_int32 ctr_code; gettimeofday(&first_time,0); if (read(ctr_strm, ctr_buff, sizeof(l_int32)) != sizeof(l_int32)) return(-1); gettimeofday(&second_time,0); memcpy(&ctr_code, ctr_buff, sizeof(l_int32)); return(ntohl(ctr_code)); } /* Compute the time difference in microseconds between two timeval measurements */ double time_to_us_delta(struct timeval tv1, struct timeval tv2) { double time_us; time_us = (double) ((tv2.tv_sec-tv1.tv_sec)*1000000+(tv2.tv_usec-tv1.tv_usec)); return time_us; } /* Compute the average of the set of measurements . */ double get_avg(double data[], l_int32 num_values) { l_int32 i; double sum_; sum_ = 0; for (i=0; i= MIN_PARTITIONED_STREAM_LEN ) { for ( i = start ; i < end - 1 ; i++ ) { if ( array[i] < array[i+1] ) improvement += 1 ; } total = ( end - start ) ; return ( (double)improvement/total ) ; } else return -1 ; } /* PDT test to detect increasing trend in stream */ double pairwise_diff_test(double array[] ,l_int32 start , l_int32 end) { double y = 0 , y_abs = 0 ; l_int32 i ; if ( ( end - start ) >= MIN_PARTITIONED_STREAM_LEN ) { for ( i = start+1 ; i < end ; i++ ) { y += array[i] - array[i-1] ; y_abs += fabs(array[i] - array[i-1]) ; } return y/y_abs ; } else return 2. ; } double grey_bw_resolution() { if ( adr ) return (.05*adr<12?.05*adr:12) ; else return min_rate ; } /* test if Rmax and Rmin range is smaller than user specified bw resolution or if Gmin and Gmax range is smaller than grey bw resolution. */ l_int32 converged() { int ret_val=0; if ( (converged_gmx_rmx_tm && converged_gmn_rmn_tm) || converged_rmn_rmx_tm ) ret_val=1; else if ( tr_max != 0 && tr_max != tr_min ) { if ( tr_max - tr_min <= bw_resol ) { converged_rmn_rmx=1; ret_val=1; } else if( tr_max - grey_max <= grey_bw_resolution() && grey_min - tr_min <= grey_bw_resolution() ) { converged_gmn_rmn = 1; converged_gmx_rmx = 1; ret_val=1; } } return ret_val ; } /* Calculate next fleet rate when fleet showed INCREASING trend. */ void radj_increasing() { if ( grey_max != 0 && grey_max >= tr_min ) { if ( tr_max - grey_max <= grey_bw_resolution() ) { converged_gmx_rmx = 1; exp_flag=0; if ( grey_min || tr_min ) radj_notrend() ; else { if ( grey_min < grey_max ) tr = grey_min/2. ; else tr = grey_max/2. ; } } else tr = ( tr_max + grey_max)/2. ; } else tr = (tr_max + tr_min)/2.max_rate?max_rate:2*tr ; else { if ( grey_min != 0 && grey_min <= tr_max ) { if ( grey_min - tr_min <= grey_bw_resolution() ) { converged_gmn_rmn = 1; radj_increasing() ; } else tr = (tr_min+grey_min)/2.= tr ) grey_max = grey_min = 0 ; tr_max = tr ; if (!converged_gmx_rmx_tm ) { if ( !converged() ) radj_increasing() ; else ret_val=-1 ; //return -1; } else { exp_flag = 0 ; if ( !converged() ) radj_notrend() ; } } else if ( flag == NOTREND ) { if ( grey_min < tr ) grey_min = 0 ; if ( grey_max < tr ) grey_max = grey_min = 0 ; if ( tr > tr_min ) tr_min = tr ; if ( !converged_gmn_rmn_tm && !converged() ) radj_notrend() ; else ret_val=-1 ; //return -1 ; } else if ( flag == GREY ) { if ( grey_max == 0 && grey_min == 0 ) grey_max = grey_min = tr ; if (tr==grey_max || tr>grey_max ) { grey_max = tr ; if ( !converged_gmx_rmx_tm ) { if ( !converged() ) radj_greymax() ; else ret_val=-1 ; //return -1 ; } else { exp_flag = 0 ; if ( !converged() ) radj_notrend() ; else ret_val=-1; } } else if ( tr < grey_min || grey_min == 0 ) { grey_min = tr ; if ( !converged() ) radj_greymin() ; else ret_val=-1 ; //return -1 ; } } if (Verbose) { printf(" Rmin-Rmax :: %.2f-%.2fMbps\n",tr_min,tr_max); printf(" Gmin-Gmax :: %.2f-%.2fMbps\n",grey_min,grey_max); } fprintf(pathload_fp," Rmin-Rmax :: %.2f-%.2fMbps\n",tr_min,tr_max); fprintf(pathload_fp," Gmin-Gmax :: %.2f-%.2fMbps\n",grey_min,grey_max); if ( ret_val == -1 ) return -1 ; if ( tr >= max_rate ) max_rate_flag++ ; if ( max_rate_flag > 1 ) return -1 ; if ( min_rate_flag > 1 ) return -1 ; transmission_rate = (l_int32) rint(1000000 * tr ) ; return 0 ; } /* calculates fleet param L,T . calc_param returns -1, if we have reached to upper/lower limits of the stream parameters like L,T . otherwise returns 0 . */ l_int32 calc_param() { double tmp_tr ; l_int32 tmp ; l_int32 tmp_time_interval; if (tr < 150 ) { time_interval = 80>min_time_interval?80:min_time_interval ; cur_pkt_sz=rint(tr*time_interval/8.) - 28; if ( cur_pkt_sz < MIN_PKT_SZ ) { cur_pkt_sz = MIN_PKT_SZ ; time_interval =rint ((cur_pkt_sz + 28)*8./tr) ; tr = ( cur_pkt_sz + 28 )*8. /time_interval ; } else if ( cur_pkt_sz > max_pkt_sz ) { cur_pkt_sz = max_pkt_sz; time_interval = min_time_interval ; tmp_tr = ( cur_pkt_sz + 28 )*8. /time_interval ; if ( equal(tr,tmp_tr)) tr = tmp_tr; else return -1 ; } } else if ( tr < 600 ) { tmp_tr = tr ; tmp_time_interval = rint(( max_pkt_sz + 28 )* 8 / tr) ; if ( cur_pkt_sz == max_pkt_sz && tmp_time_interval == time_interval ) return -1 ; time_interval = tmp_time_interval ; tmp=rint(tr*time_interval/8.)-28; cur_pkt_sz=tmpgrey_max && (equal(tr,grey_max) || tr max_pkt_sz); tr = ( cur_pkt_sz + 28 ) *8./time_interval ; } } else { cur_pkt_sz = max_pkt_sz ; time_interval = rint(( cur_pkt_sz + 28 )* 8 / tr) ; tr = ( cur_pkt_sz + 28 ) *8./time_interval ; if ((tr_min && (equal(tr,tr_min) || trtime_interval+1000?2*time_interval:time_interval+1000; for ( k = 0 ; k < stream_len-1 ; k++ ) { if ( sndr_time_stamp[k] == 0 || sndr_time_stamp[k+1] == 0 ) continue; else if ((sndr_time_stamp[k+1]-sndr_time_stamp[k]) > cs_threshold) split_owd[j++] = k; } return j ; } /* discards owd of packets received when receiver was NOT running. */ l_int32 eliminate_rcvr_side_CS ( double rcvr_time_stamp[] , double owd[],double owdfortd[], l_int32 low,l_int32 high,l_int32 *num_rcvr_cs,l_int32 *tmp_b2b ) { l_int32 b2b_pkt[MAX_STREAM_LEN] ; l_int32 i,k=0 ; l_int32 len=0; l_int32 min_gap; min_gap = MIN_TIME_INTERVAL > 1.5*rcv_latency ? MIN_TIME_INTERVAL :2.5*rcv_latency ; for ( i = low ; i <= high ; i++ ) { if ( rcvr_time_stamp[i] == 0 || rcvr_time_stamp[i+1] == 0 ) continue ; else if ((rcvr_time_stamp[i+1]- rcvr_time_stamp[i])> min_gap) owdfortd[len++] = owd[i]; else b2b_pkt[k++] = i ; } /* go through discarded list and count b2b discards as 1 CS instance */ for (i=1;i 3*rcv_latency ? MIN_TIME_INTERVAL :3*rcv_latency ; for ( i = low ; i <= high ; i++ ) { if ( rcvr_time_stamp[i] == 0 || rcvr_time_stamp[i+1] == 0 ) continue ; //fprintf(stderr,"i %d owd %.2f dispersion %.2f",i, owd[i],rcvr_time_stamp[i+1]- rcvr_time_stamp[i]); if ((rcvr_time_stamp[i+1]- rcvr_time_stamp[i])< min_gap) { b2b_pkt[k++] = i ; tmp++; //fprintf(stderr," b\n"); } else { if ( tmp >= 3 ) { //fprintf(stderr," j\n"); tmp=0; owdfortd[len++] = owd[i]; } } } return len ; } /* Adjust offset to zero again */ void adjust_offset_to_zero(double owd[], l_int32 len) { l_int32 owd_min = 0; l_int32 i ; for (i=0; i< len; i++) { if ( owd_min == 0 && owd[i] != 0 ) owd_min=owd[i]; else if (owd_min != 0 && owd[i] != 0 && owd[i] 1.1 * PCT_THRESHOLD ) { if (Verbose) printf("I"); fprintf(pathload_fp,"I"); pct_trend[i] = INCR ; } else if ( pct_metric[i] < .9 * PCT_THRESHOLD ) { if (Verbose) printf("N"); fprintf(pathload_fp,"N"); pct_trend[i] = NOTR ; } else if(pct_metric[i] <= PCT_THRESHOLD*1.1 && pct_metric[i] >= PCT_THRESHOLD*.9 ) { if (Verbose) printf("U"); fprintf(pathload_fp,"U"); pct_trend[i] = UNCL ; } } if (Verbose) printf("\n"); fprintf(pathload_fp,"\n"); } void get_pdt_trend(double pdt_metric[], l_int32 pdt_trend[], l_int32 pdt_result_cnt ) { l_int32 i ; for (i=0; i < pdt_result_cnt;i++ ) { if ( pdt_metric[i] == 2 ) { if (Verbose) printf("d"); fprintf(pathload_fp,"d"); pdt_trend[i] = DISCARD ; } else if ( pdt_metric[i] > 1.1 * PDT_THRESHOLD ) { if (Verbose) printf("I"); fprintf(pathload_fp,"I"); pdt_trend[i] = INCR ; } else if ( pdt_metric[i] < .9 * PDT_THRESHOLD ) { if (Verbose) printf("N"); fprintf(pathload_fp,"N"); pdt_trend[i] = NOTR ; } else if ( pdt_metric[i] <= PDT_THRESHOLD*1.1 && pdt_metric[i] >= PDT_THRESHOLD*.9 ) { if (Verbose) printf("U"); fprintf(pathload_fp,"U"); pdt_trend[i] = UNCL ; } } if (Verbose) printf("\n"); fprintf(pathload_fp,"\n"); } /* returns : trend in fleet or -1 if more than 50% of stream were discarded */ l_int32 aggregate_trend_result() { l_int32 total=0 ,i_cnt = 0, n_cnt = 0; l_int32 num_dscrd_strm=0; l_int32 i=0; l_int32 pct_trend[TREND_ARRAY_LEN] , pdt_trend[TREND_ARRAY_LEN] ; if (Verbose) printf(" PCT metric/stream[%2d] :: ",trend_idx); fprintf(pathload_fp," PCT metric/stream[%2d] :: ",trend_idx); for (i=0; i < trend_idx;i++ ) { if (Verbose) printf("%3.2f:",pct_metric[i]); fprintf(pathload_fp,"%3.2f:",pct_metric[i]); } if (Verbose) printf("\n"); fprintf(pathload_fp,"\n"); if (Verbose) printf(" PDT metric/stream[%2d] :: ",trend_idx); fprintf(pathload_fp," PDT metric/stream[%2d] :: ",trend_idx); for (i=0; i < trend_idx;i++ ) { if (Verbose) printf("%3.2f:",pdt_metric[i]); fprintf(pathload_fp,"%3.2f:",pdt_metric[i]); } if (Verbose) printf("\n"); fprintf(pathload_fp,"\n"); if (Verbose) printf(" PCT Trend/stream [%2d] :: ",trend_idx); fprintf(pathload_fp," PCT Trend/stream [%2d] :: ",trend_idx); get_pct_trend(pct_metric,pct_trend,trend_idx); if (Verbose) printf(" PDT Trend/stream [%2d] :: ",trend_idx); fprintf(pathload_fp," PDT Trend/stream [%2d] :: ",trend_idx); get_pdt_trend(pdt_metric,pdt_trend,trend_idx); if (Verbose) printf(" Trend per stream [%2d] :: ",trend_idx); fprintf(pathload_fp," Trend per stream [%2d] :: ",trend_idx); for (i=0; i < trend_idx;i++ ) { if ( pct_trend[i] == DISCARD || pdt_trend[i] == DISCARD ) { if (Verbose) printf("d"); fprintf(pathload_fp,"d"); num_dscrd_strm++ ; } else if ( pct_trend[i] == INCR && pdt_trend[i] == INCR ) { if (Verbose) printf("I"); fprintf(pathload_fp,"I"); i_cnt++; } else if ( pct_trend[i] == NOTR && pdt_trend[i] == NOTR ) { if (Verbose) printf("N"); fprintf(pathload_fp,"N"); n_cnt++; } else if ( pct_trend[i] == INCR && pdt_trend[i] == UNCL ) { if (Verbose) printf("I"); fprintf(pathload_fp,"I"); i_cnt++; } else if ( pct_trend[i] == NOTR && pdt_trend[i] == UNCL ) { if (Verbose) printf("N"); fprintf(pathload_fp,"N"); n_cnt++; } else if ( pdt_trend[i] == INCR && pct_trend[i] == UNCL ) { if (Verbose) printf("I"); fprintf(pathload_fp,"I"); i_cnt++; } else if ( pdt_trend[i] == NOTR && pct_trend[i] == UNCL ) { if (Verbose) printf("N"); fprintf(pathload_fp,"N"); n_cnt++ ; } else { if (Verbose) printf("U"); fprintf(pathload_fp,"U"); } total++ ; } if (Verbose) printf("\n"); fprintf(pathload_fp,"\n"); /* check whether number of usable streams is atleast 50% of requested number of streams */ total-=num_dscrd_strm ; if ( total < num_stream/2 && !slow && !interrupt_coalescence) { bad_fleet_cs = 1 ; retry_fleet_cnt_cs++ ; return -1 ; } else { bad_fleet_cs = 0 ; retry_fleet_cnt_cs=0; } if( (double)i_cnt/(total) >= AGGREGATE_THRESHOLD ) { if (Verbose) printf(" Aggregate trend :: INCREASING\n"); fprintf(pathload_fp," Aggregate trend :: INCREASING\n"); return INCREASING ; } else if( (double)n_cnt/(total) >= AGGREGATE_THRESHOLD ) { if (Verbose) printf(" Aggregate trend :: NO TREND\n"); fprintf(pathload_fp," Aggregate trend :: NO TREND\n"); return NOTREND ; } else { if (Verbose) printf(" Aggregate trend :: GREY\n"); fprintf(pathload_fp," Aggregate trend :: GREY\n"); return GREY ; } } l_int32 get_sndr_time_interval(double snd_time[],double *sum) { l_int32 k,j=0,new_j=0; double ordered[MAX_STREAM_LEN] ; double ltime_interval[MAX_STREAM_LEN] ; for ( k = 0; k < stream_len-1; k++ ) { if ( snd_time[k] == 0 || snd_time[k+1] == 0 ) continue; else ltime_interval[j++] = snd_time[k+1] - snd_time[k] ; } order_dbl(ltime_interval, ordered , 0, j ) ; /* discard the top 15% as outliers */ new_j = j - rint(j*.15) ; for ( k = 0 ; k < new_j ; k++ ) *sum += ordered[k] ; return new_j ; } void get_sending_rate() { time_interval = snd_time_interval/num; cur_req_rate = tr ; cur_actual_rate = (28 + cur_pkt_sz) * 8. / time_interval ; if( !equal(cur_req_rate, cur_actual_rate) ) { if( !grey_max && !grey_min ) { if( tr_min && tr_max && (less_than(cur_actual_rate,tr_min)||equal(cur_actual_rate,tr_min))) converged_rmn_rmx_tm = 1; if( tr_min && tr_max && (less_than(tr_max,cur_actual_rate)||equal(tr_max,cur_actual_rate))) converged_rmn_rmx_tm = 1; } else if ( cur_req_rate < tr_max && cur_req_rate > grey_max ) { if( !(less_than(cur_actual_rate,tr_max)&&grtr_than(cur_actual_rate,grey_max)) ) converged_gmx_rmx_tm = 1; } else if ( cur_req_rate < grey_min && cur_req_rate > tr_min ) { if( !(less_than(cur_actual_rate,grey_min) && grtr_than(cur_actual_rate,tr_min)) ) converged_gmn_rmn_tm = 1; } } tr = cur_actual_rate ; transmission_rate = (l_int32) rint(1000000 * tr) ; if(Verbose) printf(" Fleet Parameter(act) :: R=%.2fMbps, L=%ldB, K=%ldpackets, T=%ldusec\n",cur_actual_rate, cur_pkt_sz , stream_len,time_interval); fprintf(pathload_fp," Fleet Parameter(act) :: R=%.2fMbps, L=%ldB, K=%ldpackets, T=%ldusec\n",cur_actual_rate,cur_pkt_sz,stream_len,time_interval); snd_time_interval=0; num=0; } void terminate_gracefully(struct timeval exp_start_time) { l_int32 ctr_code; char ctr_buff[8],buff[26]; struct timeval exp_end_time; double min=0,max=0 ; ctr_code = TERMINATE | CTR_CODE; send_ctr_mesg(ctr_buff, ctr_code); gettimeofday(&exp_end_time, NULL); strncpy(buff, ctime(&(exp_end_time.tv_sec)), 24); buff[24] = '\0'; if (verbose || Verbose) printf("\n\t***** RESULT *****\n"); fprintf(pathload_fp,"\n\t***** RESULT *****\n"); if (netlog) netlogger() ; if ( min_rate_flag ) { if (verbose || Verbose) { printf("Avail-bw < minimum sending rate.\n"); printf("Increase MAX_TIME_INTERVAL in pathload_rcv.h from 200000 usec to a higher value.\n"); } fprintf(pathload_fp,"Avail-bw < minimum sending rate.\n"); fprintf(pathload_fp,"Increase MAX_TIME_INTERVAL in pathload_rcv.h from 200000 usec to a higher value.\n"); } else if ( max_rate_flag && !interrupt_coalescence ) { if (verbose || Verbose) { printf("Avail-bw > maximum sending rate.\n"); if ( tr_min) printf("Avail-bw > %.2f (Mbps)\n", tr_min); } fprintf(pathload_fp,"Avail-bw > maximum sending rate.\n"); if ( tr_min) fprintf(pathload_fp,"Avail-bw > %.2f (Mbps)\n", tr_min); } else if (bad_fleet_cs && !interrupt_coalescence) { if (verbose || Verbose) printf("Measurement terminated due to frequent CS @ sender/receiver.\n"); fprintf(pathload_fp,"Measurement terminated due to frequent CS @ sender/receiver.\n"); if ((tr_min&& tr_max) || (grey_min&&grey_max)) { if ( grey_min&& grey_max) { min = grey_min ; max = grey_max ; } else { min = tr_min ;max = tr_max ; } if (verbose || Verbose) { printf("Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max); printf("Measurements finished at %s \n", buff); printf("Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000); } fprintf(pathload_fp,"Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max); fprintf(pathload_fp,"Measurements finished at %s \n", buff); fprintf(pathload_fp,"Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000); } } else { if ( !interrupt_coalescence && ((converged_gmx_rmx_tm && converged_gmn_rmn_tm) || converged_rmn_rmx_tm )) { if (Verbose) printf("Actual probing rate != desired probing rate.\n"); fprintf(pathload_fp,"Actual probing rate != desired probing rate.\n"); if ( converged_rmn_rmx_tm ) { min = tr_min ; max = tr_max; } else { min = grey_min ; max = grey_max ; } } else if ( !interrupt_coalescence && converged_rmn_rmx ) { if (Verbose) printf("User specified bandwidth resolution achieved\n"); fprintf(pathload_fp,"User specified bandwidth resolution achieved\n"); min = tr_min ; max = tr_max ; } else if ( !interrupt_coalescence && converged_gmn_rmn && converged_gmx_rmx ) { if (Verbose) printf("Exiting due to grey bw resolution\n"); fprintf(pathload_fp,"Exiting due to grey bw resolution\n"); min = grey_min ; max = grey_max; } else { min = tr_min ; max = tr_max; } if (verbose||Verbose) { if ( lower_bound) { printf("Receiver NIC has interrupt coalescence enabled\n"); printf("Available bandwidth is greater than %.2f (Mbps)\n", min); } else printf("Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max); printf("Measurements finished at %s \n", buff); printf("Measurement latency is %.2f sec \n",time_to_us_delta(exp_start_time,exp_end_time)/1000000); } if ( lower_bound) { fprintf(pathload_fp,"Receiver NIC has interrupt coalescence enabled\n"); fprintf(pathload_fp,"Available bandwidth is greater than %.2f (Mbps)\n", min); } else fprintf(pathload_fp,"Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max); fprintf(pathload_fp,"Measurements finished at %s \n", buff); fprintf(pathload_fp,"Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000); } if (netlog) fclose(netlog_fp); fclose(pathload_fp); close(sock_tcp); exit(0); } void netlogger() { struct tm *tm; struct hostent *rcv_host, *snd_host; char rcv_name[256]; struct timeval curr_time; gettimeofday(&curr_time,NULL); tm = gmtime(&curr_time.tv_sec); fprintf(netlog_fp,"DATE=%4d",tm->tm_year+1900); print_time(netlog_fp,tm->tm_mon+1); print_time(netlog_fp,tm->tm_mday); print_time(netlog_fp,tm->tm_hour); print_time(netlog_fp,tm->tm_min); if (tm->tm_sec <10) { fprintf(netlog_fp,"0"); fprintf(netlog_fp,"%1.6f",tm->tm_sec+curr_time.tv_usec/1000000.0); } else{ fprintf(netlog_fp,"%1.6f",tm->tm_sec+curr_time.tv_usec/1000000.0); } gethostname(rcv_name, 255); rcv_host = gethostbyname(rcv_name); if(strcmp(rcv_name, "\0")!=0) fprintf(netlog_fp," HOST=%s",rcv_host->h_name); else fprintf(netlog_fp," HOST=NO_NAME"); fprintf(netlog_fp," PROG=pathload"); fprintf(netlog_fp," LVL=Usage"); if ((snd_host = gethostbyname(hostname)) == 0) { snd_host = gethostbyaddr(hostname,256,AF_INET); } fprintf(netlog_fp," PATHLOAD.SNDR=%s",snd_host->h_name); fprintf(netlog_fp," PATHLOAD.ABWL=%.1fMbps",tr_min); fprintf(netlog_fp," PATHLOAD.ABWH=%.1fMbps\n",tr_max); fclose(netlog_fp); } /* prl_int32 time */ void print_time(FILE *fp, l_int32 time) { if( time<10){ fprintf(fp,"0"); fprintf(fp,"%1d",time); } else{ fprintf(fp,"%2d",time); } } l_int32 less_than(double a, double b) { if ( !equal(a,b) && a < b) return 1; else return 0; } l_int32 grtr_than(double a, double b) { if ( !equal(a,b) && a > b) return 1; else return 0; } /* if a approx-equal b, return 1 else 0 */ l_int32 equal(double a , double b) { l_int32 maxdiff ; if ( a] [-N ]\ [-w ] [-h|-H] -s \n"); fprintf (stderr,"-s : hostname/ipaddress of sender\n"); fprintf (stderr,"-q : quite mode\n"); fprintf (stderr,"-v : verbose mode\n"); fprintf (stderr,"-w : user specified bw resolution\n"); fprintf (stderr,"-o : write log in user specified file [default is pathload.log]\n"); fprintf (stderr,"-O : append log in user specified file [default is pathload.log]\n"); fprintf (stderr,"-N : print output in netlogger format to \n"); fprintf (stderr,"-h|H : print this help and exit\n"); exit(0); }