/*

Copyright (C) 2002 Hayato Fujiwara

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.

*/

#include <octave/oct.h>

#include "defun-dld.h"
#include "dirfns.h"
#include "error.h"
#include "help.h"
#include "oct-map.h"
#include "systime.h"
#include "ov.h"
#include "oct-obj.h"
#include "utils.h"
#include "oct-env.h"

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <netinet/in.h>

#define BUFF_SIZE SSIZE_MAX

// COMM

DEFUN_DLD (connect, args, ,
  "connect (hosts)\n\
\n\
Connect hosts and return sockets.")
{

  int sock=0,col=0,row=0,i,j,len;
  double *sock_v=0;
  if (args.length () == 1)
    {
      int pid=0,nl;
      struct sockaddr_in *addr;
      struct hostent *he;
      octave_value hosts=args(0);
      charMatrix cm=hosts.char_matrix_value();
      char *host,*pt,myname[16];
      
      errno=0;
      gethostname(myname,15);
      row= cm.rows();
      col= cm.columns();
      cm= cm.transpose(); 
      
      sock_v=(double *)calloc(3,row*sizeof(double));
       

      for(i=1;i<row;i++){
	host=(char *)calloc(1,col+1);
	strncpy(host,&cm.data()[col*i],col);
	pt=strchr(host,' ');
	if(pt==NULL)
	  host[col]='\0';
	else
	  *pt='\0';
	
	sock=socket(PF_INET,SOCK_STREAM,0);
	if(sock==-1){
	  error("socket error ");
	}
	
	addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in));
	
	addr->sin_family=AF_INET;
	addr->sin_port=htons(12502);
	he=gethostbyname(host);
	if(he == NULL){
	  error("Unknown host %s",host);
	}
	memcpy(&addr->sin_addr,he->h_addr_list[0],he->h_length);
	
	for(j=0;j<10;j++){
	  if(connect(sock,(struct sockaddr *)addr,sizeof(*addr))==0){
	    break;
	  }else if(errno!=ECONNREFUSED){
	    error("connect error ");
	  }else {
	    usleep(5000);
	  }
	}
	if(!sock)
	  error("Unable to connect to %s: Connection refused",host);
	
	sock_v[i+row]=sock;
	
	free(addr);
	free(host);

	int num_nodes=row-1;

	pid=getpid();
	nl=htonl(num_nodes);
	write(sock,&nl,sizeof(int));
	nl=htonl(i);
	write(sock,&nl,sizeof(int));
	nl=htonl(pid);
	write(sock,&nl,sizeof(int));

	host=(char *)calloc(128,sizeof(char));
	for(j=0;j<row;j++){
	  strncpy(host,&cm.data()[col*j],col);
	  pt=strchr(host,' ');
	  if(pt==NULL)
	    host[col]='\0';
	  else
	    *pt='\0';
	  len=strlen(host)+1;
	  nl=htonl(len);
	  write(sock,&nl,sizeof(int));
	  write(sock,host,len);
	}
	free(host);
	int comm_len;
       	std::string directory = octave_env::getcwd ();
	comm_len=directory.length();
	nl=htonl(comm_len);
	write(sock,&nl,sizeof(int));
	write(sock,directory.c_str(),comm_len);
      }      
      usleep(100);

      for(i=1;i<row;i++){
	
	host=(char *)calloc(1,col+1);
	
	strncpy(host,&cm.data()[col*i],col);
	pt=strchr(host,' ');
	if(pt==NULL)
	  host[col]='\0';
	else
	  *pt='\0';
	
	sock=socket(PF_INET,SOCK_STREAM,0);
	if(sock==-1){
	  perror("socket : ");
	  exit(-1);
	}
	
	addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in));
	
	addr->sin_family=AF_INET;
	addr->sin_port=htons(12501);
	he=gethostbyname(host);
	if(he == NULL){
	  error("Unknown host %s",host);
	}
	memcpy(&addr->sin_addr,he->h_addr_list[0],he->h_length);
	while(1){
	  for(j=0;j<10;j++){
	    if(connect(sock,(struct sockaddr *)addr,sizeof(*addr))==0){
	      break;
	    }else if(errno!=ECONNREFUSED){
	      perror("connect : ");
	      exit(-1);
	    }else {
	      usleep(5000);
	    }
	  }
	  if(!sock)
	    error("Unable to connect to %s: Connection refused",host);
	  
	  int bufsize=262144;
	  socklen_t ol;
	  ol=sizeof(bufsize);
	  setsockopt(sock,SOL_SOCKET,SO_SNDBUF,&bufsize,ol);
	  setsockopt(sock,SOL_SOCKET,SO_RCVBUF,&bufsize,ol);
	  bufsize=1;
	  ol=sizeof(bufsize);
	  setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol);
	  

	  int len=0,result=0;;
	  //send pppid
	  nl=htonl(pid);
	  write(sock,&nl,sizeof(int));
	  //send name size
	  strncpy(myname,cm.data(),col);
	  pt=strchr(myname,' ');
	  if(pt==NULL)
	    myname[col]='\0';
	  else
	    *pt='\0';
	  len=strlen(myname);
	  nl=htonl(len);
	  write(sock,&nl,sizeof(int));
	  //send name
	  write(sock,myname,len+1);
	  //recv result code
	  read(sock,&nl,sizeof(int));
	  result=ntohl(nl);
	  if(result==0){
	    sock_v[i]=sock;
	    //recv endian
	    read(sock,&nl,sizeof(int));
	    sock_v[i+2*row]=ntohl(nl);
	    //send endian
	    nl=htonl(__BYTE_ORDER);
	    write(sock,&nl,sizeof(int));
	    break;
	  }else{
	    close(sock);
	  }
	}
	
	free(addr);
	free(host);
      }

      char lf='\n';
      for(i=1;i<row;i++){
	write((int)sock_v[i+row],&lf,sizeof(char));
	//	cout << i+row <<endl;
      }
    }
  else
    {
      print_usage ("connect");
      octave_value retval;
      return retval;
    }
      

  Matrix mx(row,3);
  double *tmp =mx.fortran_vec();
  for (i=0;i<3*row;i++)
    tmp[i]=sock_v[i];
  octave_value retval(mx);

  return retval;
}


/*
;;; Local Variables: ***
;;; mode: C++ ***
;;; End: ***
*/


syntax highlighted by Code2HTML, v. 0.9.1