/*
 This file is part of pathload.

 pathload is free software; you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation; either version 2 of the License, or
 (at your option) any later version.

 pathload is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with pathload; if not, write to the Free Software
 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
*/

/*-------------------------------------------------
   pathload : an end-to-end available bandwidth 
              estimation tool
   Author   : Manish Jain ( jain@cc.gatech.edu )
              Constantinos Dovrolis (dovrolis@cc.gatech.edu )
   Release  : Ver 1.3.2
   Support  : This work was supported by the SciDAC
              program of the US department 
--------------------------------------------------*/

/*
 * $Header: /net/cvs/bwtest/pathload/pathload_rcv_func.c,v 1.294 2006/05/19 20:21:15 jain Exp $
 */

#include "pathload_gbls.h"
#include "pathload_rcv.h"

l_int32 recvfrom_latency(struct sockaddr_in rcv_udp_addr)
{
  char *random_data;
  float min_OSdelta[50], ord_min_OSdelta[50];
  l_int32 j ;
  struct timeval current_time, first_time ;

  if ( (random_data = malloc(max_pkt_sz*sizeof(char)) ) == NULL )
  {
    printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz);
    exit(-1);
  }
  srandom(getpid()); /* Create random payload; does it matter? */
  for (j=0; j<max_pkt_sz-1; j++) random_data[j]=(char)(random()&0x000000ff);

  for (j=0; j<50; j++)
  {
    if ( sendto(sock_udp, random_data, max_pkt_sz, 0, 
         (struct sockaddr*)&rcv_udp_addr,sizeof(rcv_udp_addr)) == -1)
        perror("recvfrom_latency");
    gettimeofday(&first_time, NULL);
    recvfrom(sock_udp, random_data, max_pkt_sz, 0, NULL, NULL);
    gettimeofday(&current_time, NULL);
    min_OSdelta[j]= time_to_us_delta(first_time, current_time);
  }
  /* Use median  of measured latencies to avoid outliers */
  order_float(min_OSdelta, ord_min_OSdelta,0,50);
  free(random_data);
  return((l_int32)ord_min_OSdelta[25]);   
}


double get_adr() 
{
  struct timeval select_tv,arrv_tv[MAX_STREAM_LEN] ; 
  double delta ;
  double bw_msr = 0;
  double bad_bw_msr[10] ;
  int num_bad_train=0 ; 
  int first = 1 ;
  double sum =0 ;
  l_int32 exp_train_id ; 
  l_int32 bad_train = 1; 
  l_int32 retry = 0 ;
  l_int32 ctr_code ; 
  l_int32 ctr_msg_rcvd ; 
  l_int32 train_len=0;
  l_int32 last=0,i;
  l_int32 spacecnt=24 ; 
  char ctr_buff[8];
  l_int32 num_burst;

  if (Verbose)
    printf("  ADR [");
  fflush(stdout);
  fprintf(pathload_fp,"  ADR [");
  fflush(pathload_fp);
  ctr_code = SEND_TRAIN | CTR_CODE ;
  send_ctr_mesg(ctr_buff, ctr_code);
  exp_train_id = 0 ;
  for(i=0;i<100;i++)
  {
    arrv_tv[i].tv_sec=0;
    arrv_tv[i].tv_usec=0;
  }
  while ( retry < MAX_TRAIN && bad_train )
  {
    if ( train_len == 5)
      train_len = 3;
    else 
      train_len = TRAIN_LEN - exp_train_id*15;
    if (Verbose)
      printf(".");
    fflush(stdout);
    fprintf(pathload_fp,".");
    spacecnt--;
    ctr_msg_rcvd = 0 ;
    bad_train = recv_train(exp_train_id, arrv_tv, train_len);
    /* Compute dispersion and bandwidth measurement */
    if (!bad_train) 
    {
      num_burst=0;
      interrupt_coalescence=check_intr_coalescence(arrv_tv,train_len,&num_burst);
      last=train_len;
      while(!arrv_tv[last].tv_sec) --last;
      delta = time_to_us_delta(arrv_tv[1], arrv_tv[last]);
      bw_msr = ((28+max_pkt_sz) << 3) * (last-1) / delta;
      /* tell sender that it was agood train.*/
      ctr_code = GOOD_TRAIN | CTR_CODE ;
      send_ctr_mesg(ctr_buff, ctr_code ) ;
    }
    else
    {
      retry++ ;
      /* wait for atleast 10msec before requesting another train */
      last=train_len;
      while(!arrv_tv[last].tv_sec) --last;
      first=1 ; 
      while(!arrv_tv[first].tv_sec) ++first ; 
      delta = time_to_us_delta(arrv_tv[first], arrv_tv[last]);
      bad_bw_msr[num_bad_train++] = ((28+max_pkt_sz) << 3) * (last-first-1) / delta;
      select_tv.tv_sec=0;select_tv.tv_usec=10000;
      select(0,NULL,NULL,NULL,&select_tv);
      ctr_code = BAD_TRAIN | CTR_CODE ;
      send_ctr_mesg(ctr_buff, ctr_code ) ;
      exp_train_id++ ;
    }
  }

  if (Verbose)
  { 
    i = spacecnt;
    putchar(']');
    while(--i>0)putchar(' ');
    printf(":: ");
  }
  fputc(']',pathload_fp);
  while(--spacecnt>0)fputc(' ',pathload_fp);
  fprintf(pathload_fp,":: ");
  if ( !bad_train)
  {
    if(Verbose)
      printf("%.2fMbps\n", bw_msr ) ;
    fprintf(pathload_fp,"%.2fMbps\n", bw_msr ) ;
  }
  else
  {
    for ( i=0;i<num_bad_train;i++)
      if ( finite(bad_bw_msr[i]))
        sum += bad_bw_msr[i] ;
    bw_msr = sum/num_bad_train ; 
    if(Verbose)
      printf("%.2fMbps (I)\n", bw_msr ) ;
    fprintf(pathload_fp,"%.2fMbps (I)\n", bw_msr ) ;
  }
  return bw_msr ;
}

/* Receive a complete packet train from the sender */
l_int32 recv_train( l_int32 exp_train_id, struct timeval *time,l_int32 train_len)
{
  struct sigaction sigstruct ;
  struct timeval current_time;
  struct timeval select_tv;
  fd_set readset;
  l_int32 ret_val ;
  l_int32 pack_id , exp_pack_id ;
  l_int32 bad_train = 0 ; 
  l_int32 train_id  ;
  l_int32 rcvd=0;
  char *pack_buf ;
  char ctr_buff[8];
#ifdef THRLIB
  thr_arg arg ;
  pthread_t tid;
  pthread_attr_t attr ;
#endif 
  exp_pack_id=0;

  if ( ( pack_buf = malloc(max_pkt_sz*sizeof(char))) == NULL ) 
  {
    printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz);
    exit(-1);
  }
      
  sigstruct.sa_handler = sig_sigusr1 ;
  sigemptyset(&sigstruct.sa_mask);
  sigstruct.sa_flags = 0 ;
  #ifdef SA_INTERRUPT
    sigstruct.sa_flags |= SA_INTERRUPT ;
  #endif
  sigaction(SIGUSR1 , &sigstruct,NULL );

#ifdef THRLIB
  arg.finished_stream=0;
  arg.ptid = pthread_self() ;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
  if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 )
  {
    perror("recv_train::pthread_create");
    fprintf(stdout,"Failed to create thread. exiting...\n");
    fprintf(pathload_fp,"Failed to create thread. exiting...\n");
    exit(-1);
  }
#endif

  do 
  {
#ifndef THRLIB
      FD_ZERO(&readset);
      FD_SET(sock_tcp,&readset);
      FD_SET(sock_udp,&readset);
      select_tv.tv_sec=1000;select_tv.tv_usec=0;
      if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 ) 
      {
        if (FD_ISSET(sock_udp,&readset) )
        {
#endif
    if (recvfrom(sock_udp, pack_buf, max_pkt_sz, 0, NULL, NULL) != -1)
    {
      gettimeofday(&current_time, NULL);
      memcpy(&train_id, pack_buf, sizeof(l_int32));
      train_id = ntohl(train_id) ;
      memcpy(&pack_id, pack_buf+sizeof(l_int32), sizeof(l_int32));
      pack_id=ntohl(pack_id);
      if (train_id == exp_train_id && pack_id==exp_pack_id ) 
      {
        rcvd++;
        time[pack_id] = current_time ;
        exp_pack_id++;
      }
      else bad_train=1;
    }
#ifndef THRLIB
        } // end of FD_ISSET

        if ( FD_ISSET(sock_tcp,&readset) )
        {
          /* check the control connection.*/
          if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
          {
            if ( (((ret_val & CTR_CODE) >> 31) == 1) && 
                  ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) )
            {
              break;
            }
          }
        }
      } // end of select
  } while (1);
#else
  } while (!arg.finished_stream);
#endif

  if ( rcvd != train_len+1 ) bad_train=1;
  gettimeofday(&time[pack_id+1], NULL);
  sigstruct.sa_handler = SIG_DFL ;
  sigemptyset(&sigstruct.sa_mask);
  sigstruct.sa_flags = 0 ;
  sigaction(SIGUSR1 , &sigstruct,NULL );
  free(pack_buf);
  return bad_train ; 
}

l_int32 check_intr_coalescence(struct timeval time[],l_int32 len, l_int32 *burst)
{
  double delta[MAX_STREAM_LEN];
  l_int32 b2b=0,tmp=0;
  l_int32 i;
  l_int32 min_gap;

  min_gap = MIN_TIME_INTERVAL > 3*rcv_latency ? MIN_TIME_INTERVAL : 3*rcv_latency ;
  //printf("---%d\n",len);
  for (i=2;i<len;i++)
  {
    delta[i] = time_to_us_delta(time[i-1],time[i]);
    if ( delta[i] <=  min_gap )
    {
      b2b++ ;
      tmp++;
    }
    else
    {
      if ( tmp >=3 )
      {
        (*burst)++;
        tmp=0;
      }
    }
  }

  //fprintf(stderr,"\tNumber of b2b %d, Number of burst %d\n",b2b,*burst);
  if ( b2b > .6*len )
  {
   return 1;
  }
  else return 0;
}

/* 
   Receive N streams .
   After each stream, compute the loss rate.
   Mark a stream "lossy" , if losss rate in 
   that stream is more than a threshold.
*/
l_int32 recv_fleet()
{
  struct sigaction sigstruct ;
  struct timeval snd_tv[MAX_STREAM_LEN], arrv_tv[MAX_STREAM_LEN];
  struct timeval current_time, first_time;
  double pkt_loss_rate ;
  double owd[MAX_STREAM_LEN] ;
  double snd_tm[MAX_STREAM_LEN] ;
  double arrv_tm[MAX_STREAM_LEN];
  l_int32 ctr_code ;
  l_int32 pkt_lost = 0 ;
  l_int32 stream_id_n , stream_id=0 ;
  l_int32 total_pkt_rcvd=0 ,pkt_rcvd = 0 ;
  l_int32 pkt_id = 0 ;
  l_int32 pkt_id_n = 0 ;
  l_int32 exp_pkt_id = 0 ;
  l_int32 stream_cnt = 0 ; /* 0->n*/
  l_int32 fleet_id  , fleet_id_n = 0 ;
  l_int32 lossy_stream = 0 ;
  l_int32 return_val = 0 ;
  l_int32 finished_stream = 0 ;
  l_int32 stream_duration ;
  l_int32 num_sndr_cs[20],num_rcvr_cs[20];
  char ctr_buff[8];
  char *pkt_buf ;
  double owdfortd[MAX_STREAM_LEN];
  l_int32 num_substream,substream[MAX_STREAM_LEN];
  l_int32 low,high,len,j;
  l_int32 b2b_pkt_per_stream[20];
  l_int32 tmp_b2b;
#ifdef THRLIB
  pthread_t tid ; 
  thr_arg arg ;
  pthread_attr_t attr ;
#endif
  l_int32 num_bursts;
  l_int32 abort_fleet=0;
  l_int32 p=0;
  struct timeval select_tv;
  fd_set readset;
  l_int32 ret_val ;

  if ( (pkt_buf = malloc(cur_pkt_sz*sizeof(char)) ) == NULL )
  {
    printf("ERROR : unable to malloc %ld bytes \n",cur_pkt_sz);
    exit(-1);
  }
  trend_idx=0;
  ic_flag = 0;
  if(verbose&&!Verbose)
    printf("Receiving Fleet %ld, Rate %.2fMbps\n",exp_fleet_id,tr);
  if(Verbose)
  {
    printf("\nReceiving Fleet %ld\n",exp_fleet_id);
    printf("  Fleet Parameter(req)  :: R=%.2fMbps, L=%ldB, K=%ldpackets, \
T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval) ;
  }
  fprintf(pathload_fp,"\nReceiving Fleet %ld\n",exp_fleet_id);
  fprintf(pathload_fp,"  Fleet Parameter(req)  :: R=%.2fMbps, L=%ldB, \
K=%ldpackets, T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval);
  
  if(Verbose)
    printf("  Lossrate per stream   :: ");
  fprintf(pathload_fp,"  Lossrate per stream   :: ");

  sigstruct.sa_handler = sig_sigusr1 ;
  sigemptyset(&sigstruct.sa_mask);
  sigstruct.sa_flags = 0 ;
  #ifdef SA_INTERRUPT
    sigstruct.sa_flags |= SA_INTERRUPT ;
  #endif
  sigaction(SIGUSR1 , &sigstruct,NULL );

  while ( stream_cnt < num_stream  )
  {
#ifdef THRLIB
    arg.finished_stream=0;
    arg.ptid = pthread_self() ;
    arg.stream_cnt = stream_cnt ;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
    if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 )
    {
      perror("recv_fleet::pthread_create");
      exit(-1);
    }
#endif
    pkt_lost = 0 ;
    first_time.tv_sec = 0 ;
    for (j=0; j < stream_len; j++ )
    {
      snd_tv[j].tv_sec=0 ; snd_tv[j].tv_usec=0 ;
      arrv_tv[j].tv_sec=0; arrv_tv[j].tv_usec=0;
    }

    /* Receive K packets of ith stream */
#ifdef THRLIB
    while(!arg.finished_stream) 
#else
    while(1)
#endif
    {
#ifndef THRLIB
      FD_ZERO(&readset);
      FD_SET(sock_tcp,&readset);
      FD_SET(sock_udp,&readset);
      select_tv.tv_sec=1000;select_tv.tv_usec=0;
      if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 ) 
      {
        if (FD_ISSET(sock_udp,&readset) )
        {
#endif
          if( recvfrom(sock_udp,pkt_buf,cur_pkt_sz,0,NULL,NULL) > 0 )
          {
            gettimeofday(&current_time,NULL);
            memcpy(&fleet_id_n,pkt_buf , sizeof(l_int32));
            fleet_id = ntohl(fleet_id_n) ;
            memcpy(&stream_id_n,pkt_buf+sizeof(l_int32) , sizeof(l_int32));
        stream_id = ntohl(stream_id_n) ;
        memcpy(&pkt_id_n, pkt_buf+2*sizeof(l_int32), sizeof(l_int32));
        pkt_id = ntohl(pkt_id_n) ;
        if ( fleet_id == exp_fleet_id  && stream_id == stream_cnt && 
             pkt_id >= exp_pkt_id )
        {
          if ( first_time.tv_sec == 0 )
            first_time = current_time;
          arrv_tv[pkt_id] = current_time ;
          memcpy(&(snd_tv[pkt_id].tv_sec) , pkt_buf+3*sizeof(l_int32), sizeof(l_int32));
          memcpy(&(snd_tv[pkt_id].tv_usec), pkt_buf+4*sizeof(l_int32), sizeof(l_int32));
          if ( pkt_id > exp_pkt_id ) /* reordered are considered as lost */
          {
            pkt_lost += ( pkt_id - exp_pkt_id ) ;
            exp_pkt_id  = pkt_id ;
          }
          ++exp_pkt_id ;
          ++pkt_rcvd;
        }
      } // end of recvfrom
#ifndef THRLIB
        } // end of FD_ISSET

        if ( FD_ISSET(sock_tcp,&readset) )
        {
          /* check the control connection.*/
          if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
          {
            if ( (((ret_val & CTR_CODE) >> 31) == 1) && 
                  ((ret_val & 0x7fffffff) == FINISHED_STREAM ) )
            {
              while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ;
              if ( ret_val == stream_cnt )
              {
                break ; 
              }
            }
          }
        }
      } // end of select
      else
      {
        perror("select");
        exit(0);
      }
#endif
      
    } 

    for (j=0; j < stream_len; j++ )
    {
      snd_tv[j].tv_sec = ntohl(snd_tv[j].tv_sec);
      snd_tv[j].tv_usec = ntohl(snd_tv[j].tv_usec);
      snd_tm[j]= snd_tv[j].tv_sec * 1000000.0 + snd_tv[j].tv_usec ;
      arrv_tm[j] = arrv_tv[j].tv_sec * 1000000.0 + arrv_tv[j].tv_usec ;
      owd[j] =  arrv_tm[j] - snd_tm[j]   ;
    }

    total_pkt_rcvd += pkt_rcvd ;
    finished_stream = 0 ;
    pkt_lost +=  stream_len  - exp_pkt_id ;
    pkt_loss_rate = (double )pkt_lost * 100. / stream_len ;
    if(Verbose)
      printf(":%.1f",pkt_loss_rate ) ;
    fprintf(pathload_fp,":%.1f",pkt_loss_rate ) ;
    exp_pkt_id = 0 ;
    stream_cnt++ ;

    num_bursts=0;
    if ( interrupt_coalescence )
      ic_flag=check_intr_coalescence(arrv_tv,pkt_rcvd,&num_bursts); 
    
    if ( pkt_loss_rate < HIGH_LOSS_RATE && pkt_loss_rate >= MEDIUM_LOSS_RATE )
      lossy_stream++ ;
     
    if ( pkt_loss_rate >= HIGH_LOSS_RATE || ( stream_cnt >= num_stream
         && lossy_stream*100./stream_cnt >= MAX_LOSSY_STREAM_FRACTION ))
    {
      if ( increase_stream_len )
      {
        increase_stream_len=0;
        lower_bound=1;
      }

      if(Verbose)
        printf("\n  Fleet aborted due to high lossrate");
      fprintf(pathload_fp,"\n  Fleet aborted due to high lossrate");
      abort_fleet=1;
      break ;
    }
    else
    {
      /* analyze trend in stream */
      num += get_sndr_time_interval(snd_tm,&snd_time_interval) ;
      adjust_offset_to_zero(owd, stream_len);  
      num_substream = eliminate_sndr_side_CS(snd_tm,substream);
      num_sndr_cs[stream_cnt-1] = num_substream ;
      substream[num_substream++]=stream_len-1;
      low=0;
      num_rcvr_cs[stream_cnt-1]=0;
      tmp_b2b=0;
      for (j=0;j<num_substream;j++)
      {
        high=substream[j]; 
        if ( ic_flag )
        {
          if ( num_bursts < 2 )
          {
            if ( ++repeat_1 == 3)
            {
              repeat_1=0;
              /* Abort fleet and try to find lower bound */
              abort_fleet=1;
              lower_bound=1;
              increase_stream_len=0;
              break ;
            }
          }
          else if ( num_bursts <= 5 )
          {
            if ( ++repeat_2 == 3)
            {
              repeat_2=0;
              /* Abort fleet and retry with longer stream length */
              abort_fleet=1;
              increase_stream_len=1;
              break ;
            }
          }
          else
          {
            increase_stream_len=0;
            len=eliminate_b2b_pkt_ic(arrv_tm,owd,owdfortd,low,high,&num_rcvr_cs[stream_cnt-1],&tmp_b2b);
            /*
            for(p=0;p<len;p++)
              printf("%d %f\n",p,owdfortd[p]);
            */
            pct_metric[trend_idx]=
              pairwise_comparision_test(owdfortd , 0 , len );
            pdt_metric[trend_idx]=
              pairwise_diff_test(owdfortd , 0, len );
            trend_idx+=1;
          }
        }
        else
        {
          len=eliminate_rcvr_side_CS(arrv_tm,owd,owdfortd,low,high,&num_rcvr_cs[stream_cnt-1],&tmp_b2b);
          if ( len > MIN_STREAM_LEN )
          {
            get_trend(owdfortd,len);
          }
        }
        low=high+1;
      }
      if ( abort_fleet )
        break;
      else
      {
        b2b_pkt_per_stream[stream_cnt-1] = tmp_b2b ;
        ctr_code = CONTINUE_STREAM | CTR_CODE;
        send_ctr_mesg(ctr_buff, ctr_code);
      }
    }
    pkt_rcvd = 0 ;

    /* A hack for slow links */
    stream_duration = stream_len * time_interval ;
    if ( stream_duration >= 500000 )
    {
      slow=1;
      break ;
    }
  }  /*end of while (stream_cnt < num_stream ). */

  if ( Verbose ) printf("\n");
  fprintf(pathload_fp,"\n");

  if ( abort_fleet )
  {
    printf("\tAborting fleet. Stream_cnt %d\n",stream_cnt);
     ctr_code = ABORT_FLEET | CTR_CODE;
     send_ctr_mesg(ctr_buff , ctr_code ) ;
     return_val = -1 ;
  }
  else
     print_contextswitch_info(num_sndr_cs,num_rcvr_cs,b2b_pkt_per_stream,stream_cnt);

  exp_fleet_id++ ;
  free(pkt_buf);
  return return_val  ;
}

void print_contextswitch_info(l_int32 num_sndr_cs[], l_int32 num_rcvr_cs[],l_int32 discard[],l_int32 stream_cnt)
{
    l_int32 j;

    if (Verbose)
      printf("  # of CS @ sndr        :: ");
    fprintf(pathload_fp,"  # of CS @ sndr        :: ");
    
    for(j=0;j<stream_cnt-1;j++)
    {
      if (Verbose) printf(":%2d",num_sndr_cs[j]);
      fprintf(pathload_fp,":%2d",num_sndr_cs[j]);
    }
    if ( Verbose ) printf("\n");
    fprintf(pathload_fp,"\n");
    if (Verbose)
      printf("  # of CS @ rcvr        :: ");
    fprintf(pathload_fp,"  # of CS @ rcvr        :: ");
    for(j=0;j<stream_cnt-1;j++)
    {
      if (Verbose) printf(":%2d",num_rcvr_cs[j]);
      fprintf(pathload_fp,":%2d",num_rcvr_cs[j]);
    }
    if ( Verbose ) printf("\n");
    fprintf(pathload_fp,"\n");
 
    if (Verbose)
      printf("  # of DS @ rcvr        :: ");
    fprintf(pathload_fp,"  # of DS @ rcvr        :: ");
    for(j=0;j<stream_cnt-1;j++)
    {
      if (Verbose) printf(":%2d",discard[j]);
      fprintf(pathload_fp,":%2d",discard[j]);
    }
     if ( Verbose ) printf("\n");
    fprintf(pathload_fp,"\n");
}

void sig_sigusr1()
{
  return;
}

void sig_alrm()
{
  terminate_gracefully(exp_start_time);
  exit(0);
}

void *ctrl_listen(void *arg)
{
  struct timeval select_tv;
  fd_set readset;
  l_int32 ret_val ;
  char ctr_buff[8]; 
  
#ifdef THRLIB
  FD_ZERO(&readset);
  FD_SET(sock_tcp,&readset);
  select_tv.tv_sec=100;select_tv.tv_usec=0;
  if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 ) 
  {
    /* check ... mesg received */
    if ( FD_ISSET(sock_tcp,&readset) )
    {
      /* check the control connection.*/
      if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 )
      {
        if ( (((ret_val & CTR_CODE) >> 31) == 1) && 
              ((ret_val & 0x7fffffff) == FINISHED_STREAM ) )
        {
          while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ;
          if ( ret_val == ((thr_arg *)arg)->stream_cnt )
          {
            ((thr_arg *)arg)->finished_stream =1 ;
            pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1);
            pthread_exit(NULL);
          }
        }
        else if ( (((ret_val & CTR_CODE) >> 31) == 1) &&
                  ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) )
        {
          select_tv.tv_usec = 2000 ;
          select_tv.tv_sec = 0 ;
          select(1,NULL,NULL,NULL,&select_tv);
          ((thr_arg *)arg)->finished_stream =1 ;
          pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1);
          pthread_exit(NULL);
        }
      }
    }
  }
#endif
  return NULL;
}

void get_trend(double owdfortd[],l_int32 pkt_cnt )
{
  double median_owd[MAX_STREAM_LEN];
  l_int32 median_owd_len=0;
  double ordered[MAX_STREAM_LEN];
  l_int32 j,count,pkt_per_min;
  //pkt_per_min = 5 ; 
  pkt_per_min = (int)floor(sqrt((double)pkt_cnt));
  count = 0 ;
  for ( j = 0 ; j < pkt_cnt  ; j=j+pkt_per_min )
  {
    if ( j+pkt_per_min >= pkt_cnt )
      count = pkt_cnt - j ;
    else
      count = pkt_per_min;
    order_dbl(owdfortd , ordered ,j,count ) ;
    if ( count % 2 == 0 )
       median_owd[median_owd_len++] =  
          ( ordered[(int)(count*.5) -1] + ordered[(int)(count*0.5)] )/2 ;
    else
       median_owd[median_owd_len++] =  ordered[(int)(count*0.5)] ;
  }
  pct_metric[trend_idx]=
      pairwise_comparision_test(median_owd , 0 , median_owd_len );
  pdt_metric[trend_idx]=
      pairwise_diff_test(median_owd , 0, median_owd_len );
  trend_idx+=1;
}

/* 
  Order an array of doubles using bubblesort 
*/
void order_dbl(double unord_arr[], double ord_arr[],l_int32 start, l_int32 num_elems)
{
  l_int32 i,j,k;
  double temp;
  for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i];
  
  for (i=1;i<num_elems;i++) 
  {
    for (j=i-1;j>=0;j--)
      if (ord_arr[j+1] < ord_arr[j]) 
      {
        temp=ord_arr[j]; 
        ord_arr[j]=ord_arr[j+1]; 
        ord_arr[j+1]=temp;
      }
      else break;
  }
}

/* 
    Order an array of float using bubblesort 
*/
void order_float(float unord_arr[], float ord_arr[],l_int32 start, l_int32 num_elems)
{
  l_int32 i,j,k;
  double temp;
  for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i];
  for (i=1;i<num_elems;i++) 
  {
    for (j=i-1;j>=0;j--)
      if (ord_arr[j+1] < ord_arr[j]) 
      {
        temp=ord_arr[j]; 
        ord_arr[j]=ord_arr[j+1]; 
        ord_arr[j+1]=temp;
      }
      else break;
  }
}

/* 
    Order an array of l_int32 using bubblesort 
*/
void order_int(l_int32 unord_arr[], l_int32 ord_arr[], l_int32 num_elems)
{
    l_int32 i,j;
    l_int32 temp;
    for (i=0;i<num_elems;i++) ord_arr[i]=unord_arr[i];
    for (i=1;i<num_elems;i++) {
        for (j=i-1;j>=0;j--)
            if (ord_arr[j+1] < ord_arr[j]) {
                temp=ord_arr[j]; 
                ord_arr[j]=ord_arr[j+1]; 
                ord_arr[j+1]=temp;
            }
            else break;
    }
}

/*
    Send a message through the control stream
*/
void send_ctr_mesg(char *ctr_buff, l_int32 ctr_code)
{
    l_int32 ctr_code_n = htonl(ctr_code);
    memcpy((void*)ctr_buff, &ctr_code_n, sizeof(l_int32));
    if (write(sock_tcp, ctr_buff, sizeof(l_int32)) != sizeof(l_int32))
    {
        fprintf(stderr, "send control message failed:\n");
        exit(-1);
    }
}


/*
  Receive message from the control stream
*/
l_int32 recv_ctr_mesg(l_int32 ctr_strm, char *ctr_buff)
{
  l_int32 ctr_code;
  gettimeofday(&first_time,0);
  if (read(ctr_strm, ctr_buff, sizeof(l_int32)) != sizeof(l_int32))
    return(-1);
  gettimeofday(&second_time,0);
  memcpy(&ctr_code, ctr_buff, sizeof(l_int32));
  return(ntohl(ctr_code));
}

/*
  Compute the time difference in microseconds
  between two timeval measurements
*/
double time_to_us_delta(struct timeval tv1, struct timeval tv2)
{
  double time_us;
  time_us = (double)
    ((tv2.tv_sec-tv1.tv_sec)*1000000+(tv2.tv_usec-tv1.tv_usec));
  return time_us;
}

/*
  Compute the average of the set of measurements <data>.
*/
double get_avg(double data[], l_int32 num_values)
{
  l_int32 i;
  double sum_;
  sum_ = 0;
  for (i=0; i<num_values; i++) sum_ += data[i];
  return (sum_ / (double)num_values);
}

/*
    PCT test to detect increasing trend in stream
*/
double pairwise_comparision_test (double array[] ,l_int32 start , l_int32 end)
{
  l_int32 improvement = 0 ,i ;
  double total ;

  if ( ( end - start  ) >= MIN_PARTITIONED_STREAM_LEN )
  {
    for ( i = start ; i < end - 1   ; i++ )
    {
      if ( array[i] < array[i+1] )
        improvement += 1 ;
    }
    total = ( end - start ) ;
    return ( (double)improvement/total ) ;
  }
  else
    return -1 ;
}

/*
    PDT test to detect increasing trend in stream
*/
double pairwise_diff_test(double array[] ,l_int32 start , l_int32 end)
{
  double y = 0 , y_abs = 0 ;
  l_int32 i ;
  if ( ( end - start  ) >= MIN_PARTITIONED_STREAM_LEN )
  {
    for ( i = start+1 ; i < end    ; i++ )
    {
      y += array[i] - array[i-1] ;
      y_abs += fabs(array[i] - array[i-1]) ;
    }
    return y/y_abs ;
  }
  else
    return 2. ;
}

double grey_bw_resolution() 
{
  if ( adr )
    return (.05*adr<12?.05*adr:12) ;
  else 
    return min_rate ;
}

/*
    test if Rmax and Rmin range is smaller than
    user specified bw resolution
    or
    if Gmin and Gmax range is smaller than grey
    bw resolution.
*/
l_int32 converged()
{
  int ret_val=0;
  if ( (converged_gmx_rmx_tm && converged_gmn_rmn_tm) || converged_rmn_rmx_tm  )
    ret_val=1;
  else if ( tr_max != 0 && tr_max != tr_min )
  {
    if ( tr_max - tr_min <= bw_resol )
    {
      converged_rmn_rmx=1;
      ret_val=1;
    }
    else if( tr_max - grey_max <= grey_bw_resolution() &&
              grey_min - tr_min <= grey_bw_resolution() )
    {
      converged_gmn_rmn = 1;
      converged_gmx_rmx = 1;
      ret_val=1;
    }
  }
  return ret_val ;
}

/*
    Calculate next fleet rate
    when fleet showed INCREASING trend. 
*/
void radj_increasing() 
{
  if ( grey_max != 0 && grey_max >= tr_min )
  {
    if ( tr_max - grey_max <= grey_bw_resolution() )
    {
      converged_gmx_rmx = 1;
      exp_flag=0;
      if ( grey_min || tr_min )
        radj_notrend() ;
      else
      {
        if ( grey_min < grey_max )
          tr = grey_min/2. ;
        else
          tr = grey_max/2. ;
      }
    }
    else 
     tr = ( tr_max + grey_max)/2. ;
  }
  else
    tr =  (tr_max + tr_min)/2.<min_rate?min_rate:(tr_min+tr_max)/2. ;
}


/*
    Calculate next fleet rate
    when fleet showed NOTREND trend. 
*/
void radj_notrend() 
{
  if ( exp_flag )
     tr =  2*tr>max_rate?max_rate:2*tr ;
  else
  {
    if ( grey_min != 0 && grey_min <= tr_max )
    {
      if ( grey_min - tr_min <= grey_bw_resolution() )
      {
        converged_gmn_rmn = 1;
        radj_increasing() ;
      }
      else 
        tr =  (tr_min+grey_min)/2.<min_rate?min_rate:(tr_min+grey_min)/2. ;
    }
    else
      tr =  (tr_max + tr_min)/2.<min_rate?min_rate:(tr_min+tr_max)/2. ;
  }
}


/*
    Calculate next fleet rate
    when fleet showed GREY trend. 
*/
void radj_greymax()
{
  if ( tr_max == 0 )
    tr = (tr+.5*tr)<max_rate?(tr+.5*tr):max_rate ;
  else if ( tr_max - grey_max <= grey_bw_resolution() )
  {
    converged_gmx_rmx = 1;
    radj_greymin() ;
  }
  else 
    tr = ( tr_max + grey_max)/2. ;
}

/*
    Calculate next fleet rate
    when fleet showed GREY trend. 
*/
void radj_greymin()
{
 if ( grey_min - tr_min <= grey_bw_resolution() )
 {
   converged_gmn_rmn = 1;
   radj_greymax() ;
 }
 else 
   tr = (tr_min+grey_min)/2.<min_rate?min_rate:(tr_min+grey_min)/2. ;
}


/*
    dpending upon trend in fleet :-
    - update the state variables.
    - decide the next fleet rate
    return -1 when converged
*/
l_int32 rate_adjustment(l_int32 flag)
{
  l_int32 ret_val = 0 ;
  if( flag == INCREASING )
  {
    if ( max_rate_flag)
      max_rate_flag=0;
    if ( grey_max >= tr )
      grey_max = grey_min = 0 ;
    tr_max = tr ;
    if (!converged_gmx_rmx_tm ) 
    {
      if ( !converged() ) 
        radj_increasing() ;
      else
        ret_val=-1 ; //return -1; 
    }
    else
    {
      exp_flag = 0 ;
      if ( !converged() )
        radj_notrend() ;
    }
  }
  else if ( flag == NOTREND )
  {
    if ( grey_min < tr )
      grey_min = 0 ;
    if ( grey_max < tr )
      grey_max = grey_min = 0 ;
    if ( tr > tr_min )
      tr_min =  tr ;
    if ( !converged_gmn_rmn_tm && !converged() ) 
      radj_notrend() ;
    else
      ret_val=-1 ; //return -1 ;
  }
  else if ( flag == GREY )
  {
    if ( grey_max == 0 && grey_min == 0 )
      grey_max =  grey_min = tr ;
    if (tr==grey_max || tr>grey_max )
    {
      grey_max = tr ;
      if ( !converged_gmx_rmx_tm )
      {
        if ( !converged() )
          radj_greymax() ;
        else
          ret_val=-1 ; //return -1 ;
      }
      else
      {
        exp_flag = 0 ;
        if ( !converged() )
          radj_notrend() ;
        else
          ret_val=-1;
      }
    }
    else if ( tr < grey_min || grey_min == 0  ) 
    {
      grey_min = tr ;
      if ( !converged() )
        radj_greymin() ;
      else
        ret_val=-1 ; //return -1 ;
    }
  }

  if (Verbose)
  {
    printf("  Rmin-Rmax             :: %.2f-%.2fMbps\n",tr_min,tr_max);
    printf("  Gmin-Gmax             :: %.2f-%.2fMbps\n",grey_min,grey_max);
  }
  fprintf(pathload_fp,"  Rmin-Rmax             :: %.2f-%.2fMbps\n",tr_min,tr_max);
  fprintf(pathload_fp,"  Gmin-Gmax             :: %.2f-%.2fMbps\n",grey_min,grey_max);

  if ( ret_val == -1 )
    return -1 ;
  if ( tr >= max_rate )
    max_rate_flag++ ;

  if ( max_rate_flag > 1 )
     return -1 ;
  if ( min_rate_flag > 1 ) 
     return -1 ;
  transmission_rate = (l_int32) rint(1000000 * tr ) ;
  return 0 ;
}

/*
  calculates fleet param L,T .
  calc_param returns -1, if we have 
  reached to upper/lower limits of the 
  stream parameters like L,T .
  otherwise returns 0 .
*/
l_int32 calc_param()
{
  double tmp_tr ;
  l_int32 tmp ;
  l_int32 tmp_time_interval;
  if (tr < 150 )
  {
    time_interval  = 80>min_time_interval?80:min_time_interval ;
    cur_pkt_sz=rint(tr*time_interval/8.) - 28;
    if ( cur_pkt_sz < MIN_PKT_SZ )
    {
      cur_pkt_sz = MIN_PKT_SZ ;
      time_interval =rint ((cur_pkt_sz + 28)*8./tr) ;
      tr = ( cur_pkt_sz + 28 )*8. /time_interval ;
    }
    else if ( cur_pkt_sz > max_pkt_sz )
    {
      cur_pkt_sz = max_pkt_sz;
      time_interval = min_time_interval ;
      tmp_tr = ( cur_pkt_sz + 28 )*8. /time_interval ;
      if ( equal(tr,tmp_tr))
          tr = tmp_tr;
      else
        return -1 ;
    }
  }
  else if ( tr < 600 )
  {
    tmp_tr = tr ;
    tmp_time_interval = rint(( max_pkt_sz + 28 )* 8 / tr) ; 
    if ( cur_pkt_sz == max_pkt_sz && tmp_time_interval == time_interval )
      return -1 ;
    time_interval = tmp_time_interval ; 
    tmp=rint(tr*time_interval/8.)-28;
    cur_pkt_sz=tmp<max_pkt_sz?tmp:max_pkt_sz;
    tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
    if ((tr_min && (equal(tr,tr_min) || tr<tr_min)) 
        || (grey_max && tmp_tr>grey_max && (equal(tr,grey_max) || tr<grey_max))) 
    {
      do
      {
        --time_interval;
        cur_pkt_sz=rint(tr*time_interval/8.)-28;
      }while (cur_pkt_sz > max_pkt_sz);
      tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
    }
  }
  else
  {
    cur_pkt_sz = max_pkt_sz ;
    time_interval = rint(( cur_pkt_sz + 28 )* 8 / tr) ; 
    tr = ( cur_pkt_sz + 28 ) *8./time_interval ;
    if ((tr_min && (equal(tr,tr_min) || tr<tr_min)) )
    {
      return -1 ;
    } 
    if( equal(tr,tr_max) )
    {
      tr_max = tr ;
      if ( grey_max )
      {
        converged_gmx_rmx_tm=1;
        if ( !converged_gmn_rmn && !converged_gmn_rmn_tm )
          radj_notrend();
        else return -1 ;
      }
      else 
      {
        converged_rmn_rmx=1;
        return -1 ;
      }
    }
  }
  return 0 ;
}

/*
  splits stream iff sender sent packets more than
  time_interval+1000 usec apart.
*/
l_int32 eliminate_sndr_side_CS (double sndr_time_stamp[], l_int32 split_owd[])
{
  l_int32 j = 0,k=0;
  l_int32 cs_threshold;

  cs_threshold = 2*time_interval>time_interval+1000?2*time_interval:time_interval+1000;
  for ( k = 0 ; k < stream_len-1 ; k++ )
  {
    if ( sndr_time_stamp[k] == 0 || sndr_time_stamp[k+1] == 0 )
       continue;
    else if ((sndr_time_stamp[k+1]-sndr_time_stamp[k]) > cs_threshold)
      split_owd[j++] = k;
  }
  return j ;
}

/*
  discards owd of packets received when
  receiver was NOT running.
*/
l_int32 eliminate_rcvr_side_CS ( double rcvr_time_stamp[] , double owd[],double owdfortd[], l_int32 low,l_int32 high,l_int32 *num_rcvr_cs,l_int32 *tmp_b2b )
{
  l_int32 b2b_pkt[MAX_STREAM_LEN] ;
  l_int32 i,k=0 ;
  l_int32 len=0;
  l_int32 min_gap;

  min_gap = MIN_TIME_INTERVAL > 1.5*rcv_latency ? MIN_TIME_INTERVAL :2.5*rcv_latency ;
  for ( i = low ; i <= high  ; i++ )
  {
    if ( rcvr_time_stamp[i] == 0 || rcvr_time_stamp[i+1] == 0 )
      continue ;
    else if ((rcvr_time_stamp[i+1]- rcvr_time_stamp[i])> min_gap)
      owdfortd[len++] = owd[i];
    else 
      b2b_pkt[k++] = i ;
  }

  /* go through discarded list and count b2b discards as 1 CS instance */
  for (i=1;i<k;i++)
    if ( b2b_pkt[i]-b2b_pkt[i-1] != 1)
      (*num_rcvr_cs)++;
  *tmp_b2b += k;
  return len ;
}

/* eliminates packets received b2b due to IC */
l_int32 eliminate_b2b_pkt_ic ( double rcvr_time_stamp[] , double owd[],double owdfortd[], l_int32 low,l_int32 high,l_int32 *num_rcvr_cs,l_int32 *tmp_b2b )
{
  l_int32 b2b_pkt[MAX_STREAM_LEN] ;
  l_int32 i,k=0 ;
  l_int32 len=0;
  l_int32 min_gap;
  l_int32 tmp=0;

  min_gap = MIN_TIME_INTERVAL > 3*rcv_latency ? MIN_TIME_INTERVAL :3*rcv_latency ;
  for ( i = low ; i <= high  ; i++ )
  {
    if ( rcvr_time_stamp[i] == 0 || rcvr_time_stamp[i+1] == 0 )
      continue ;
    
    //fprintf(stderr,"i %d  owd %.2f dispersion %.2f",i, owd[i],rcvr_time_stamp[i+1]- rcvr_time_stamp[i]);
    if ((rcvr_time_stamp[i+1]- rcvr_time_stamp[i])< min_gap)
    {
      b2b_pkt[k++] = i ;
      tmp++;
      //fprintf(stderr," b\n");
    }
    else 
    {
      if ( tmp >= 3 )
      {
        //fprintf(stderr," j\n");
        tmp=0;
        owdfortd[len++] = owd[i];
      }
    }
  }
  return len ;
}

/* Adjust offset to zero again  */
void adjust_offset_to_zero(double owd[], l_int32 len)
{
    l_int32 owd_min = 0;
    l_int32 i ; 
    for (i=0; i< len; i++) {
        if ( owd_min == 0 && owd[i] != 0 ) owd_min=owd[i];
        else if (owd_min != 0 && owd[i] != 0 && owd[i]<owd_min) owd_min=owd[i];
    }

    for (i=0; i< len; i++) {
        if ( owd[i] != 0 )
            owd[i] -= owd_min;
    }
}

#define INCR    1
#define NOTR    2
#define DISCARD 3
#define UNCL    4
void get_pct_trend(double pct_metric[], l_int32 pct_trend[], l_int32 pct_result_cnt )
{
 l_int32 i ;
 for (i=0; i < pct_result_cnt;i++ )
 {
   pct_trend[i] = UNCL ;
   if ( pct_metric[i] == -1  )
   {
     if (Verbose)
       printf("d");
     fprintf(pathload_fp,"d");
     pct_trend[i] = DISCARD ;
   }
   else if ( pct_metric[i] > 1.1 * PCT_THRESHOLD )
   {
     if (Verbose)
       printf("I");
     fprintf(pathload_fp,"I");
     pct_trend[i] = INCR ;
   }
   else if ( pct_metric[i] < .9 * PCT_THRESHOLD )
   {
     if (Verbose)
       printf("N");
     fprintf(pathload_fp,"N");
     pct_trend[i] = NOTR ;
   }
   else if(pct_metric[i] <= PCT_THRESHOLD*1.1 && pct_metric[i] >= PCT_THRESHOLD*.9 )
   {
     if (Verbose)
       printf("U");
     fprintf(pathload_fp,"U");
     pct_trend[i] = UNCL ;
   }
 }
 if (Verbose)
   printf("\n");
 fprintf(pathload_fp,"\n");
}

void get_pdt_trend(double pdt_metric[], l_int32 pdt_trend[], l_int32 pdt_result_cnt )
{
 l_int32 i ;
 for (i=0; i < pdt_result_cnt;i++ )
 {
    if ( pdt_metric[i] == 2  )
    {
 if (Verbose)
        printf("d");
        fprintf(pathload_fp,"d");
        pdt_trend[i] = DISCARD ;
    }
    else if ( pdt_metric[i] > 1.1 * PDT_THRESHOLD )
    {
 if (Verbose)
        printf("I");
        fprintf(pathload_fp,"I");
        pdt_trend[i] = INCR ;
    }
    else if ( pdt_metric[i] < .9 * PDT_THRESHOLD )
    {
 if (Verbose)
        printf("N");
        fprintf(pathload_fp,"N");
        pdt_trend[i] = NOTR ;
    }
    else if ( pdt_metric[i] <= PDT_THRESHOLD*1.1 && pdt_metric[i] >= PDT_THRESHOLD*.9 )
    {
 if (Verbose)
        printf("U");
        fprintf(pathload_fp,"U");
        pdt_trend[i] = UNCL ;
    }
 }
 if (Verbose)
 printf("\n");
 fprintf(pathload_fp,"\n");
}

/*
  returns : trend in fleet or -1 if more than 50% of stream were discarded 
*/
l_int32 aggregate_trend_result()
{
  l_int32 total=0 ,i_cnt = 0, n_cnt = 0;
  l_int32 num_dscrd_strm=0;
  l_int32 i=0;
  l_int32 pct_trend[TREND_ARRAY_LEN] , pdt_trend[TREND_ARRAY_LEN] ; 

  if (Verbose)
    printf("  PCT metric/stream[%2d] :: ",trend_idx); 
  fprintf(pathload_fp,"  PCT metric/stream[%2d] :: ",trend_idx); 
  for (i=0; i < trend_idx;i++ )
  {
    if (Verbose)
      printf("%3.2f:",pct_metric[i]);
    fprintf(pathload_fp,"%3.2f:",pct_metric[i]);
  }
  if (Verbose)
    printf("\n"); 
  fprintf(pathload_fp,"\n"); 
  if (Verbose)
    printf("  PDT metric/stream[%2d] :: ",trend_idx); 
  fprintf(pathload_fp,"  PDT metric/stream[%2d] :: ",trend_idx); 
  for (i=0; i < trend_idx;i++ )
  {
    if (Verbose)
      printf("%3.2f:",pdt_metric[i]);
    fprintf(pathload_fp,"%3.2f:",pdt_metric[i]);
  }
  if (Verbose)
    printf("\n"); 
  fprintf(pathload_fp,"\n"); 
  if (Verbose)
    printf("  PCT Trend/stream [%2d] :: ",trend_idx); 
  fprintf(pathload_fp,"  PCT Trend/stream [%2d] :: ",trend_idx); 
  get_pct_trend(pct_metric,pct_trend,trend_idx);
  if (Verbose)
    printf("  PDT Trend/stream [%2d] :: ",trend_idx); 
  fprintf(pathload_fp,"  PDT Trend/stream [%2d] :: ",trend_idx); 
  get_pdt_trend(pdt_metric,pdt_trend,trend_idx);
  
  if (Verbose)
    printf("  Trend per stream [%2d] :: ",trend_idx); 
  fprintf(pathload_fp,"  Trend per stream [%2d] :: ",trend_idx); 
  for (i=0; i < trend_idx;i++ )
  {
    if ( pct_trend[i] == DISCARD || pdt_trend[i] == DISCARD )
    {
       if (Verbose)
         printf("d");
       fprintf(pathload_fp,"d");
       num_dscrd_strm++ ;
    }
    else if ( pct_trend[i] == INCR &&  pdt_trend[i] == INCR )
    {
       if (Verbose)
         printf("I");
       fprintf(pathload_fp,"I");
       i_cnt++;
    }
    else if ( pct_trend[i] == NOTR && pdt_trend[i] == NOTR )
    {
       if (Verbose)
         printf("N");
       fprintf(pathload_fp,"N");
       n_cnt++;
    }
    else if ( pct_trend[i] == INCR && pdt_trend[i] == UNCL )
    {
       if (Verbose)
         printf("I");
       fprintf(pathload_fp,"I");
       i_cnt++;
    }
    else if ( pct_trend[i] == NOTR && pdt_trend[i] == UNCL )
    {
       if (Verbose)
         printf("N");
       fprintf(pathload_fp,"N");
       n_cnt++;
    }
    else if ( pdt_trend[i] == INCR && pct_trend[i] == UNCL )
    {
       if (Verbose)
         printf("I");
       fprintf(pathload_fp,"I");
       i_cnt++;
    }
    else if ( pdt_trend[i] == NOTR && pct_trend[i] == UNCL )
    {
       if (Verbose)
         printf("N");
       fprintf(pathload_fp,"N");
       n_cnt++ ;
    }
    else
    {
       if (Verbose)
         printf("U");
       fprintf(pathload_fp,"U");
    }
    total++ ;
  }
  if (Verbose) printf("\n"); 
  fprintf(pathload_fp,"\n"); 

  /* check whether number of usable streams is 
     atleast 50% of requested number of streams */
  total-=num_dscrd_strm ;
  if ( total < num_stream/2 && !slow && !interrupt_coalescence)
  {
    bad_fleet_cs = 1 ;
    retry_fleet_cnt_cs++  ;
    return -1 ;
  }
  else
  {
    bad_fleet_cs = 0 ;
    retry_fleet_cnt_cs=0;
  }

  if( (double)i_cnt/(total) >= AGGREGATE_THRESHOLD )
  {
    if (Verbose)
      printf("  Aggregate trend       :: INCREASING\n");
    fprintf(pathload_fp,"  Aggregate trend       :: INCREASING\n");
    return INCREASING ;
  }
  else if( (double)n_cnt/(total) >= AGGREGATE_THRESHOLD )
  {
    if (Verbose)
      printf("  Aggregate trend       :: NO TREND\n");
    fprintf(pathload_fp,"  Aggregate trend       :: NO TREND\n");
    return NOTREND ;
  }
  else 
  {
    if (Verbose)
      printf("  Aggregate trend       :: GREY\n");
    fprintf(pathload_fp,"  Aggregate trend       :: GREY\n");
    return GREY ;
  }
}

l_int32 get_sndr_time_interval(double snd_time[],double *sum)
{
  l_int32 k,j=0,new_j=0;
  double ordered[MAX_STREAM_LEN] ;
  double ltime_interval[MAX_STREAM_LEN] ;
  for ( k = 0; k < stream_len-1; k++ )
  {
    if ( snd_time[k] == 0 || snd_time[k+1] == 0 )
      continue;
    else 
      ltime_interval[j++] = snd_time[k+1] - snd_time[k] ;
  }
  order_dbl(ltime_interval, ordered , 0, j ) ;
  /* discard the top 15% as outliers  */
  new_j = j - rint(j*.15) ;
  for ( k = 0 ; k < new_j ; k++ )
    *sum += ordered[k] ;
  return new_j ;
}

void get_sending_rate() 
{
 time_interval = snd_time_interval/num;
 cur_req_rate = tr ;
 cur_actual_rate = (28 + cur_pkt_sz) * 8. / time_interval ;

 if( !equal(cur_req_rate, cur_actual_rate)  ) 
 {
   if( !grey_max && !grey_min )
   {
     if( tr_min && tr_max && (less_than(cur_actual_rate,tr_min)||equal(cur_actual_rate,tr_min)))
       converged_rmn_rmx_tm = 1;
     if( tr_min && tr_max && (less_than(tr_max,cur_actual_rate)||equal(tr_max,cur_actual_rate)))
       converged_rmn_rmx_tm = 1;
     
   }
   else if ( cur_req_rate < tr_max && cur_req_rate > grey_max )
   {
     if( !(less_than(cur_actual_rate,tr_max)&&grtr_than(cur_actual_rate,grey_max)) )
       converged_gmx_rmx_tm = 1;
   }
   else if ( cur_req_rate < grey_min && cur_req_rate > tr_min )
   {
     if( !(less_than(cur_actual_rate,grey_min) && grtr_than(cur_actual_rate,tr_min)) )
        converged_gmn_rmn_tm = 1;
   }
  }

  tr = cur_actual_rate ;
  transmission_rate = (l_int32) rint(1000000 * tr) ;
  if(Verbose)
    printf("  Fleet Parameter(act)  :: R=%.2fMbps, L=%ldB, K=%ldpackets, T=%ldusec\n",cur_actual_rate, cur_pkt_sz , stream_len,time_interval);
  fprintf(pathload_fp,"  Fleet Parameter(act)  :: R=%.2fMbps, L=%ldB, K=%ldpackets, T=%ldusec\n",cur_actual_rate,cur_pkt_sz,stream_len,time_interval);
  snd_time_interval=0;
  num=0;
}

void terminate_gracefully(struct timeval exp_start_time)
{
  l_int32 ctr_code;
  char ctr_buff[8],buff[26];
  struct timeval exp_end_time;
  double min=0,max=0 ;

  ctr_code = TERMINATE | CTR_CODE;
  send_ctr_mesg(ctr_buff, ctr_code);

  gettimeofday(&exp_end_time, NULL);
  strncpy(buff, ctime(&(exp_end_time.tv_sec)), 24);
  buff[24] = '\0';
  if (verbose || Verbose)
    printf("\n\t*****  RESULT *****\n");
  fprintf(pathload_fp,"\n\t*****  RESULT *****\n");
  
  if (netlog)
    netlogger() ;

  if ( min_rate_flag )
  {
    if (verbose || Verbose)
    {
      printf("Avail-bw < minimum sending rate.\n");
      printf("Increase MAX_TIME_INTERVAL in pathload_rcv.h from 200000 usec to a higher value.\n");
    }
    fprintf(pathload_fp,"Avail-bw < minimum sending rate.\n");
    fprintf(pathload_fp,"Increase MAX_TIME_INTERVAL in pathload_rcv.h from 200000 usec to a higher value.\n");
  }
  else if ( max_rate_flag && !interrupt_coalescence )
  {
    if (verbose || Verbose)
    {
      printf("Avail-bw > maximum sending rate.\n");
      if ( tr_min)
        printf("Avail-bw > %.2f (Mbps)\n", tr_min);
    }
    fprintf(pathload_fp,"Avail-bw > maximum sending rate.\n");
    if ( tr_min)
      fprintf(pathload_fp,"Avail-bw > %.2f (Mbps)\n", tr_min);
  }
  else if (bad_fleet_cs && !interrupt_coalescence)
  {
    if (verbose || Verbose)
      printf("Measurement terminated due to frequent CS @ sender/receiver.\n");
    fprintf(pathload_fp,"Measurement terminated due to frequent CS @ sender/receiver.\n");
    if ((tr_min&& tr_max) || (grey_min&&grey_max))
    {
      if ( grey_min&& grey_max)
      {
        min = grey_min ; max = grey_max ;
      }
      else
      {
        min = tr_min ;max = tr_max ;
      }
      if (verbose || Verbose)
      {
        printf("Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
        printf("Measurements finished at %s \n",  buff);
        printf("Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
      }
      fprintf(pathload_fp,"Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);
      fprintf(pathload_fp,"Measurements finished at %s \n",  buff);
      fprintf(pathload_fp,"Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
    }
  }
  else 
  {
    if ( !interrupt_coalescence && ((converged_gmx_rmx_tm && converged_gmn_rmn_tm) || converged_rmn_rmx_tm ))
    {
      if (Verbose)
        printf("Actual probing rate != desired probing rate.\n");
      fprintf(pathload_fp,"Actual probing rate != desired probing rate.\n");
      if ( converged_rmn_rmx_tm )
      {
        min = tr_min ; max = tr_max;
      }
      else
      {
        min = grey_min ; max = grey_max ;
      }
    }
    else if ( !interrupt_coalescence && converged_rmn_rmx )
    {
      if (Verbose)
        printf("User specified bandwidth resolution achieved\n");
      fprintf(pathload_fp,"User specified bandwidth resolution achieved\n");
      min = tr_min ; max = tr_max ;
    }
    else if ( !interrupt_coalescence && converged_gmn_rmn && converged_gmx_rmx )
    {
      if (Verbose)
        printf("Exiting due to grey bw resolution\n");
      fprintf(pathload_fp,"Exiting due to grey bw resolution\n");
      min = grey_min ; max = grey_max;
    }
    else
    {
      min = tr_min ; max = tr_max;
    }

    if (verbose||Verbose)
    {
      if ( lower_bound)
      {
        printf("Receiver NIC has interrupt coalescence enabled\n");
        printf("Available bandwidth is greater than %.2f (Mbps)\n", min);
      }
      else
        printf("Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);

      printf("Measurements finished at %s \n",  buff);
      printf("Measurement latency is %.2f sec \n",time_to_us_delta(exp_start_time,exp_end_time)/1000000);
    }
    
    if ( lower_bound)
    {
      fprintf(pathload_fp,"Receiver NIC has interrupt coalescence enabled\n");
      fprintf(pathload_fp,"Available bandwidth is greater than %.2f (Mbps)\n", min);
    }
    else
      fprintf(pathload_fp,"Available bandwidth range : %.2f - %.2f (Mbps)\n", min, max);

    fprintf(pathload_fp,"Measurements finished at %s \n",  buff);
    fprintf(pathload_fp,"Measurement latency is %.2f sec \n", time_to_us_delta(exp_start_time, exp_end_time) / 1000000);
  }

  if (netlog)
    fclose(netlog_fp);
  fclose(pathload_fp); 
  close(sock_tcp);
  exit(0);
}

void netlogger()
{
    struct tm *tm;
    struct hostent *rcv_host, *snd_host;
    char rcv_name[256];
    struct timeval curr_time;

    gettimeofday(&curr_time,NULL);
    tm = gmtime(&curr_time.tv_sec);
    fprintf(netlog_fp,"DATE=%4d",tm->tm_year+1900);
    print_time(netlog_fp,tm->tm_mon+1);
    print_time(netlog_fp,tm->tm_mday);
    print_time(netlog_fp,tm->tm_hour);
    print_time(netlog_fp,tm->tm_min);
    if (tm->tm_sec <10) {
      fprintf(netlog_fp,"0");
      fprintf(netlog_fp,"%1.6f",tm->tm_sec+curr_time.tv_usec/1000000.0);
    }
    else{
      fprintf(netlog_fp,"%1.6f",tm->tm_sec+curr_time.tv_usec/1000000.0);
    }
    gethostname(rcv_name, 255);
    rcv_host = gethostbyname(rcv_name);
    if(strcmp(rcv_name, "\0")!=0) fprintf(netlog_fp," HOST=%s",rcv_host->h_name);
    else fprintf(netlog_fp," HOST=NO_NAME");
    fprintf(netlog_fp," PROG=pathload");
    fprintf(netlog_fp," LVL=Usage");
    if ((snd_host = gethostbyname(hostname)) == 0) {
        snd_host = gethostbyaddr(hostname,256,AF_INET);
    }
    fprintf(netlog_fp," PATHLOAD.SNDR=%s",snd_host->h_name);
    fprintf(netlog_fp," PATHLOAD.ABWL=%.1fMbps",tr_min);
    fprintf(netlog_fp," PATHLOAD.ABWH=%.1fMbps\n",tr_max);
    fclose(netlog_fp);
}

/* prl_int32 time */
void print_time(FILE *fp, l_int32 time)
{
  if( time<10){
    fprintf(fp,"0");
    fprintf(fp,"%1d",time);
  }
  else{
    fprintf(fp,"%2d",time);
  }
}

l_int32 less_than(double a, double b)
{
  if ( !equal(a,b) && a < b)
    return 1;
  else 
    return 0;
}

l_int32 grtr_than(double a, double b)
{
  if ( !equal(a,b) && a > b)
    return 1;
  else 
    return 0;
}

/*
   if a approx-equal b, return 1
   else 0
*/
l_int32 equal(double a , double b)
{
  l_int32 maxdiff ;
  if ( a<b?a:b < 500 ) maxdiff = 2.5 ;
  else maxdiff = 5 ;
  if ( abs( a - b ) / b <= .02  && abs(a-b) < maxdiff )
    return 1 ;
  else
    return 0;
}


/*
 *  Help
 *  */
void help()
{
  fprintf(stderr, "usage: pathload_rcv [-q|-v] [-o|-O <filename>] [-N <filename>]\
[-w <bw_resol>] [-h|-H] -s <sender>\n");
  fprintf (stderr,"-s        : hostname/ipaddress of sender\n");
  fprintf (stderr,"-q        : quite mode\n");
  fprintf (stderr,"-v        : verbose mode\n");
  fprintf (stderr,"-w        : user specified bw resolution\n");
  fprintf (stderr,"-o <file> : write log in user specified file [default is pathload.log]\n");
  fprintf (stderr,"-O <file> : append log in user specified file [default is pathload.log]\n");
  fprintf (stderr,"-N <file> : print output in netlogger format to <file>\n");
  fprintf (stderr,"-h|H      : print this help and exit\n");
  exit(0);
}


syntax highlighted by Code2HTML, v. 0.9.1