/*
 This file is part of pathrate.

 pathrate is free software; you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation; either version 2 of the License, or
 (at your option) any later version.

 pathrate is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with pathrate; if not, write to the Free Software
 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
*/

/*-------------------------------------------------
   pathrate : an end-to-end capcity estimation tool
   Author   : Constantinos Dovrolis (dovrolis@cc.gatech.edu )
              Ravi S Prasad            ( ravi@cc.gatech.edu )
              
   Release  : Ver 2.4.1
   Support  : This work was supported by the SciDAC
              program of the US department 
--------------------------------------------------*/


#define LOCAL
#include "pathrate.h"
#include "pathrate_snd.h"

int main(int argc, char* argv[])
{
  
  struct sockaddr_in snd_udp_addr, snd_tcp_addr, rcv_udp_addr, rcv_tcp_addr;
  
  struct hostent *host_rcv;
  
  int opt_len, sock_udp, sock_tcp, ctr_strm, send_buff_sz, sleep_secs=1,
          rcv_tcp_adrlen, 
          i,
          round_id=1, round_id_n,
          train_id, train_id_n,
          pack_id, pack_id_n,
          ctr_code,
          ctr_code_cmnd,
          ctr_code_data,
          pack_sz=1500, 
          max_pack_sz,
          train_len=3,
          no_trains=1,
          train_spacing,
          train_no,
          trains_ackd, 
          trains_lost,
          file=0,
          errflg=0;
  
  char ctr_buff[8], pack_buf[MAX_PACK_SZ], random_data[MAX_PACK_SZ];
  char c, filename[256];

  struct timeval sleep_time, current_time, prior_sleep;
  
  short reset_flag, done, sleep_usecs;
  
  time_t localtm;
  
  FILE *pathrate_fp=NULL; 
  
  
  /* 
    Check command line arguments 
  */
  verbose = 1;
  iterative = 0;
  while ((c = getopt(argc, argv, "ivhHqo:")) != EOF)
  switch (c) {
    case 'i':
      iterative = 1;
      break;
    case 'q':
      Verbose=0;
      verbose=0;
      break;
    case 'v':
      Verbose=1;
      break;
    case 'o':
      file=1;
      strcpy(filename,optarg);
      break;
    case 'h':
      help() ;
      errflg++;
      break ;
    case 'H':
      help() ;
      errflg++;
      break ;
    case '?':
      errflg++; 
  }
  if (errflg) {
    (void)fprintf(stderr, "usage: pathrate_snd [-i] [-H|-h] [-q|-v] [-o <filename>] \n");
    exit (-1);
  } 
  if (file){
    pathrate_fp = fopen(filename,"w");
    fprintf(pathrate_fp, "\n\n");
  }

  /* Control stream: TCP connection */
  if ((sock_tcp=socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    perror("socket(AF_INET,SOCK_STREAM,0):");
    exit(-1);
  }
  opt_len=1;
  if (setsockopt(sock_tcp, SOL_SOCKET, SO_REUSEADDR, (char*)&opt_len, sizeof(opt_len)) < 0) {
    perror("setsockopt(SOL_SOCKET,SO_REUSEADDR):");
    exit(-1);
  }
  bzero((char*)&snd_tcp_addr, sizeof(snd_tcp_addr));
  snd_tcp_addr.sin_family         = AF_INET;
  snd_tcp_addr.sin_addr.s_addr    = htonl(INADDR_ANY);
  snd_tcp_addr.sin_port           = htons(TCPSND_PORT);
  if (bind(sock_tcp, (struct sockaddr*)&snd_tcp_addr, sizeof(snd_tcp_addr)) < 0) {
    perror("bind(sock_tcp):");
    exit(-1);
  }
  if (listen(sock_tcp,1) < 0) {
    perror("listen(sock_tcp,1):");
    exit(-1);
  }
  
  
  /* 
    Data stream: UDP socket 
  */
  if ((sock_udp=socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
    perror("socket(AF_INET,SOCK_DGRAM,0):");
    exit(-1);
  }
  bzero((char*)&snd_udp_addr, sizeof(snd_udp_addr));
  snd_udp_addr.sin_family         = AF_INET;
  snd_udp_addr.sin_addr.s_addr    = htonl(INADDR_ANY);
  snd_udp_addr.sin_port           = htons(0);
  if (bind(sock_udp, (struct sockaddr*)&snd_udp_addr, sizeof(snd_udp_addr)) < 0) {
    perror("bind(sock_udp):");
    exit(-1);
  }
  send_buff_sz = UDP_BUFFER_SZ;
  if (setsockopt(sock_udp, SOL_SOCKET, SO_SNDBUF, (char*)&send_buff_sz, sizeof(send_buff_sz)) < 0) {
    perror("setsockopt(SOL_SOCKET,SO_SNDBUF):");
    exit(-1);
  }
  
  
  /*
          Check if select can give (roughly) sub-second time intervals
  */
  sleep_time.tv_sec = 0; 
  sleep_time.tv_usec = MIN_TRAIN_SPACING;
  gettimeofday(&prior_sleep,(struct timezone*)0);
  for (i=1;i<=10;i++){
    select(1,NULL,NULL,NULL,&sleep_time);
    sleep_time.tv_sec = 0;
    sleep_time.tv_usec=MIN_TRAIN_SPACING;
  }
  gettimeofday(&current_time,(struct timezone*)0);
  if ((time_to_us_delta(prior_sleep,current_time)<15*MIN_TRAIN_SPACING) &&
      (time_to_us_delta(prior_sleep,current_time)>5*MIN_TRAIN_SPACING))
    {sleep_usecs=1; train_spacing = MIN_TRAIN_SPACING;} 
  else 
    {sleep_usecs=0; train_spacing = TRAIN_SPACING_SEC;}
  
  
  do {
    if (file) fprintf(pathrate_fp,"\n\nWaiting for receiver to establish control stream => ");
    if (verbose) fprintf(stdout,"\n\nWaiting for receiver to establish control stream => ");
    fflush(stdout);
    
    /* 
      Wait until receiver attempts to connect, starting new measurement cycle
    */
    rcv_tcp_adrlen = sizeof(rcv_tcp_addr);
    ctr_strm = accept(sock_tcp, (struct sockaddr*)&rcv_tcp_addr, &rcv_tcp_adrlen);
    if (ctr_strm < 0) {
       perror("accept(sock_tcp):");
       exit(-1);
    }
    if (verbose) printf("OK\n");
    if (file) fprintf(pathrate_fp,"OK\n");
    
    localtm = time(NULL); gethostname(pack_buf, 256);
    host_rcv=gethostbyaddr((char*)&(rcv_tcp_addr.sin_addr), 
    sizeof(rcv_tcp_addr.sin_addr), AF_INET); 
    if (host_rcv!=NULL){
      if (file) fprintf(pathrate_fp,"Receiver %s starts measurements on %s", 
          host_rcv->h_name, ctime(&localtm));
      if (verbose) printf("Receiver %s starts measurements on %s", 
          host_rcv->h_name, ctime(&localtm));
    }
    else{
      if (file) fprintf(pathrate_fp,"Unknown receiver starts measurements on %s", 
          ctime(&localtm));
      if (verbose) printf("Unknown receiver starts measurements on %s", 
          ctime(&localtm));
    }
    
    
    /*
      Form receiving UDP address
    */
    bzero((char*)&rcv_udp_addr, sizeof(rcv_udp_addr));
    rcv_udp_addr.sin_family         = AF_INET;
    rcv_udp_addr.sin_addr.s_addr    = rcv_tcp_addr.sin_addr.s_addr; 
    rcv_udp_addr.sin_port           = htons(UDPRCV_PORT);
    
    
    
    /* 
      Bounce a number of empty messages back to the receiver, in order to measure RTT 
    */
    for (i=0; i<10; i++) {
      ctr_code=recv_ctr_msg(ctr_strm, ctr_buff);
      send_ctr_msg(ctr_strm, ctr_code);
    }
    
    /* 
      Create random packet payload to deal with links that do payload compression
    */
    srandom(getpid());
    for (i=0; i<MAX_PACK_SZ-1; i++) 
      random_data[i]=(char)(random()&0x000000ff);
    bzero((char*)&pack_buf, MAX_PACK_SZ);
    memcpy(pack_buf+2*sizeof(long), random_data, (MAX_PACK_SZ-1)-2*sizeof(sizeof(long)));
    
    
    if (file) fprintf(pathrate_fp, "Measurements are in progress. Please wait..\n");
    if (verbose) printf("Measurements are in progress. Please wait..\n");
    fflush(stdout);
    
    
    
    
    /*
      loop: 
        1) Get control messages for next phase (until SEND command)
        2) Send packets for that phase (until complete, or CONTINUE command)
    */
    
    reset_flag=0; 
    train_id=0;
    trains_lost=0;
    done=0;
    
    while(!done) {
      do {
        /* Wait until a control message arrives (unless if reset_flag=1) */
        /* If reset_flag=1, a control message is already here */
        if (!reset_flag) {
          ctr_code=recv_ctr_msg(ctr_strm, ctr_buff);
        }
        reset_flag=0;
    
        /* Get the command and the data fields from the control message */
        ctr_code_cmnd = ctr_code & 0x000000ff;
        ctr_code_data = ctr_code >> 8;
    
        switch(ctr_code_cmnd) { 
          /* Get maximum packet size from receiver */
          case MAX_PCK_LEN:
            max_pack_sz=ctr_code_data;
            if(Verbose){
              printf("--> Maximum packet size: %d bytes \n", max_pack_sz+28);
              if (file)
              fprintf(pathrate_fp,"--> Maximum packet size: %d bytes \n", max_pack_sz+28);
            }
            break;
      
          /* Get packet size from receiver */
          case PCK_LEN:
            pack_sz=ctr_code_data;
            if (Verbose){
              printf("--> Packet size: %d bytes \n", pack_sz+28);
              if (file)
                fprintf(pathrate_fp,"--> Packet size: %d bytes \n", pack_sz+28);
            }
            break;
      
          /* Get train length from receiver */
          case TRAIN_LEN:
            train_len=ctr_code_data;
            if (Verbose){
              printf("--> New train length: %d\n", train_len);
              if (file) 
                 fprintf(pathrate_fp,"--> New train length: %d\n", train_len);
            }
            break;
      
          /* Get number of trains from receiver */
          case NO_TRAINS:
            //no_trains=ctr_code_data;
            no_trains=1;
            if (Verbose){
              printf("--> New number of trains: %d\n", no_trains);
              if (file) 
                fprintf(pathrate_fp,"--> New number of trains: %d\n", no_trains);
            }
            break;
      
          /* End of measurements */
          case GAME_OVER:
            localtm = time(NULL); 
            if (file) fprintf(pathrate_fp,"Receiver terminates measurements on %s", ctime(&localtm));
            if (verbose) printf("Receiver terminates measurements on %s", ctime(&localtm));
            close(ctr_strm);
            sleep(1);
            done=1;
            break;
      
          /* Skip sending trains and go to next input phase of control messages */ 
          case CONTINUE:
            if (Verbose) {
              printf("--> Continue with next round of measurements \n");
              if (file)
                fprintf(pathrate_fp,"--> Continue with next round of measurements \n");
            }
            break;
      
          /* An ACK for a packet train; ignore at this point  */
          case NEG_ACK_TRAIN:
            if (Verbose) {
              printf("--> Redundant NEG_ACK \n");
              if (file) 
                fprintf(pathrate_fp,"--> Redundant NEG_ACK \n");
            }
            break;
      
          case ACK_TRAIN:
            if (Verbose) {
              printf("--> Redundant ACK \n");
              if (file)
                 fprintf(pathrate_fp,"--> Redundant ACK \n");
            }
            break;
      
          /* Train spacing between successive packet pairs/trains */
          case TRAIN_SPACING:
            train_spacing=ctr_code_data; /* msec */
            if (sleep_usecs) {
              sleep_time.tv_sec = train_spacing/1000;
              sleep_time.tv_usec = train_spacing*1000 - sleep_time.tv_sec*1000000;
              if (Verbose) {
                printf("Time period between packet pairs/trains: %d msec\n",train_spacing); 
                if (file) 
                  fprintf(pathrate_fp,"Time period between packet pairs/trains: %d msec\n",train_spacing); 
              }
            }
            else {
              sleep_secs = (int)floor(train_spacing/1000.)+1; 
              if (Verbose) {
                printf("Time period between packet pairs/trains: %d msec\n",sleep_secs*1000);
                if (file)
                   fprintf(pathrate_fp,"Time period between packet pairs/trains: %d msec\n",sleep_secs*1000);
              }
            }
            break;
      
          /* Start sending the packet trains with the specified
            packet size, train length, and number of trains */ 
          case SEND:
            round_id=ctr_code_data;
            if (Verbose) {
              printf("--> New round number: %d\n", round_id);
              if (file) 
                 fprintf(pathrate_fp,"--> New round number: %d\n", round_id);
            }
            break;
      
          default:
            printf("Unknown control message.. aborting\n");
            done=1;
        }
      } while(ctr_code_cmnd!=SEND && !done);
    
      if (ctr_code_cmnd==SEND) {
         /* Send <no_trains> of length <train_len> with packets of size <pack_sz>.
         * NOTE: We always send one more packet in the train. The first packet
         * (and the corresponding spacing) is ignored, because the processing 
         * of that packet takes longer (due to cache misses). 
         * That first packet has pack_id=0.  */

         train_no=0; trains_ackd=0; reset_flag=0;
         do {
           /* Send train of <train_len> packets. 
            * Each packet carries a packet id (unique in each train),
            * a train id (unique in each round), and a round id (unique
            * in the entire execution).  */
           train_id++; 
           train_id_n = htonl(train_id);
           memcpy(pack_buf+sizeof(long),&train_id_n,sizeof(long));
           round_id_n = htonl(round_id);
           memcpy(pack_buf+2*sizeof(long),&round_id_n,sizeof(long));
           for (pack_id=0; pack_id <= train_len; pack_id++) {
             pack_id_n = htonl(pack_id);
             memcpy(pack_buf, &pack_id_n, sizeof(long));
             sendto(sock_udp, pack_buf, pack_sz, 0, (struct sockaddr*)&rcv_udp_addr,
                sizeof(rcv_udp_addr));
           }
           train_no++;
           if (Verbose) {
              printf("\tTrain-%d:", train_no);
              if (file)
                 fprintf(pathrate_fp,"\tTrain-%d:", train_no);
           }
           
           /* Introduce some delay between consecutive packet trains */
           if (sleep_usecs){
              select(1, NULL, NULL, NULL, &sleep_time);
              sleep_time.tv_sec = train_spacing/1000;
              sleep_time.tv_usec = train_spacing*1000 - sleep_time.tv_sec*1000000;
           }
           else 
             sleep(sleep_secs);
    
           /* Send SENT_TRAIN ctr msgs and wait for ACK/NEG_ACK message */
           ctr_code = SENT_TRAIN;
           send_ctr_msg(ctr_strm,ctr_code);

           ctr_code=recv_ctr_msg(ctr_strm,ctr_buff);
           if ((ctr_code & 0x000000ff)==ACK_TRAIN) {   /* the train has been ACKed */
             trains_ackd++;
             trains_lost=0;
             if (Verbose) {
               printf("  => ACKed\n");
               if (file)
                  fprintf(pathrate_fp,"  => ACKed\n");
             }
           }
           else if ((ctr_code & 0x000000ff)==NEG_ACK_TRAIN) {
             /* Received negative ack, Retransmit */
             if (Verbose){
               printf("  => Retransmit\n");
               if (file)
                  fprintf(pathrate_fp,"  => Retransmit\n");
             }
             /* Primitive congestion avoidance */
             if (++trains_lost > MAX_CONSEC_LOSSES) {
               if (file){
                 fprintf(pathrate_fp,"Too many losses.."); 
                 fprintf(pathrate_fp,"Aborting measurements to avoid congestion \n\n");
               }
               if (verbose){
                 printf("Too many losses.."); 
                 printf("Aborting measurements to avoid congestion \n\n");
               }
               /* Send GAME_OVER signal to rcvr on TCP, then  quit*/
               ctr_code = GAME_OVER;
               send_ctr_msg(ctr_strm,ctr_code);
               sleep(1);
               close(ctr_strm);
               done=1;
             }
           }
           else {   
             /* The receiver has already started a new phase, 
              * and sends different control messages. 
              * Abort this phase, and start the next. */
             printf("DEBUG:: Enter here with code %x\n", (ctr_code & 0x000000FF));
             reset_flag=1;
             trains_lost=0;
           }
         } while (trains_ackd < no_trains && !reset_flag && !done);
      } /* if (ctr_command==SEND) */
    } /* while (!done)  */
  } while(iterative);  /* Repeat forever if in iterative mode */
  
  
  exit(0); 
}


syntax highlighted by Code2HTML, v. 0.9.1