/*===========================================================================*
 * parallel.c							             *
 *									     *
 *	Procedures to make encoder run in parallel			     *
 *									     *
 * EXPORTED PROCEDURES:							     *
 *	StartIOServer							     *
 *	StartCombineServer						     *
 *	StartDecodeServer						     *
 *	SendRemoteFrame							     *
 *	GetRemoteFrame							     *
 *	StartMasterServer						     *
 *	NotifyMasterDone						     *
 *									     *
 *===========================================================================*/

/*
 * Copyright (c) 1995 The Regents of the University of California.
 * All rights reserved.
 *
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without written agreement is
 * hereby granted, provided that the above copyright notice and the following
 * two paragraphs appear in all copies of this software.
 *
 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 */

/*  
 *  $Header: /share/cvs/AFNI/src/mpeg_encodedir/parallel.c,v 1.4 2004/04/02 15:12:40 rwcox Exp $
 *  $Log: parallel.c,v $
 *  Revision 1.4  2004/04/02 15:12:40  rwcox
 *  Cput
 *
 *  Revision 1.3  2003/12/23 13:50:08  rwcox
 *  Cput
 *
 *  Revision 1.2  2003/12/03 14:46:14  rwcox
 *  Cput
 *
 *  Revision 1.1  2001/12/17 16:11:55  rwcox
 *  Cadd
 *
 *  Revision 1.9  1995/08/16 18:22:08  smoot
 *  indents
 *
 *  Revision 1.8  1995/08/14 22:30:20  smoot
 *  added safe_fork to allow us to kill kids when we die.
 *
 *  Revision 1.7  1995/08/07 21:46:14  smoot
 *  spawns the same encoder as it is for combine, etc.
 *  uses new pattern tables to determine frame types
 *
 *  Revision 1.6  1995/06/21 18:32:14  smoot
 *  Defined SOMAXCONN when not (LINUX)
 *  added binary r/w (DOsS!)
 *  ANSIified bcopy call
 *
 * Revision 1.5  1995/01/19  23:09:00  eyhung
 * Changed copyrights
 *
 * Revision 1.4  1994/03/15  00:27:11  keving
 * nothing
 *
 * Revision 1.3  1993/12/22  19:19:01  keving
 * nothing
 *
 * Revision 1.2  1993/07/22  22:23:43  keving
 * nothing
 *
 * Revision 1.1  1993/06/30  20:06:09  keving
 * nothing
 *
 */


/*==============*
 * HEADER FILES *
 *==============*/

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/times.h>
#include <time.h>
#include <netinet/in.h>
#include <unistd.h>
#include <netdb.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include "all.h"
#include "param.h"
#include "mpeg.h"
#include "prototypes.h"
#include "parallel.h"
#include "readframe.h"
#include "fsize.h"
#include "combine.h"
#include "frames.h"


#define MAX_IO_SERVERS	10
#ifndef SOMAXCONN
#define SOMAXCONN 5
#endif

/*==================*
 * CONSTANTS        *
 *==================*/

#define	TERMINATE_PID_SIGNAL	SIGTERM	 /* signal used to terminate forked childs */
#ifndef MAXARGS
#define	MAXARGS		1024   /* Max Number of arguments in safe_fork command */
#endif

/*==================*
 * STATIC VARIABLES *
 *==================*/

static int32   diffTime;
static char	rsh[256];
static struct hostent *hostEntry = NULL;
static boolean	*frameDone;
static int	outputServerSocket;
static int	decodeServerSocket;
static boolean	parallelPerfect = FALSE;
static	int	current_max_forked_pid=0;


/*==================*
 * GLOBAL VARIABLES *
 *==================*/

extern int yuvHeight, yuvWidth;
extern	time_t  timeStart, timeEnd;
extern char	statFileName[256];
extern FILE *statFile;
extern boolean  debugMachines;
extern boolean debugSockets;
int parallelTestFrames = 10;
int parallelTimeChunks = 60;
char *IOhostName;
int ioPortNumber;
int combinePortNumber;
int decodePortNumber;
boolean	niceProcesses = FALSE;
boolean	forceIalign = FALSE;
int	    machineNumber = -1;
boolean	remoteIO = FALSE;
boolean	separateConversion;
time_t	IOtime = 0;
extern char encoder_name[];
int     ClientPid[MAX_MACHINES+4];

/*===============================*
 * INTERNAL PROCEDURE prototypes *
 *===============================*/

static void	TransmitPortNum _ANSI_ARGS_((char *hostName, int portNum,
					       int ioPortNum));
static void	EndIOServer _ANSI_ARGS_((void));
static void SafeRead _ANSI_ARGS_((int fd, char *buf, int nbyte));
static void SafeWrite _ANSI_ARGS_((int fd, char *buf, int nbyte));
static int  CreateListeningSocket _ANSI_ARGS_((int *portNumber));
static int  ConnectToSocket _ANSI_ARGS_((char *machineName, int portNum,
					 struct hostent **hostEnt));
static int safe_fork _ANSI_ARGS_((char *command));
void cleanup_fork _ANSI_ARGS_ ((int dummy));


/*=====================*
 * EXPORTED PROCEDURES *
 *=====================*/

			/*=================*
			 * IO SERVER STUFF *
			 *=================*/


/*===========================================================================*
 *
 * SetIOConvert
 *
 *	sets the IO conversion to be separate or not.  If separate, then
 *	some post-processing is done at slave end
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
SetIOConvert(separate)
    boolean separate;
{
    separateConversion = separate;
}


/*===========================================================================*
 *
 * SetParallelPerfect
 *
 *	If this is called, then frames will be divided up completely, and
 *	evenly (modulo rounding) between all the processors
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    Sets parallelPerfect ....
 *
 *===========================================================================*/
void
SetParallelPerfect(val)
boolean val;
{
    parallelPerfect = val;
}


/*===========================================================================*
 *
 * SetRemoteShell
 *
 *	sets the remote shell program (usually rsh, but different on some
 *	machines)
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
SetRemoteShell(shell)
    char *shell;
{
    strcpy(rsh, shell);
}


/*===========================================================================*
 *
 * StartIOServer
 *
 *	start-up the IOServer with this process
 *	handles slave requests for frames, and exits when master tells it to
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  StartIOServer(numInputFiles, parallelHostName, portNum)
int numInputFiles;
char *parallelHostName;
int portNum;
{
  int	    ioPortNum;
  int	    serverSocket;
  int	    otherSock, otherSize;
  struct sockaddr otherSocket;
  int32   buffer[8];
  boolean	done = FALSE;
  int	    frameNumber;
  MpegFrame *frame;
  register int y;
  int	    numBytes;
  unsigned char   *bigBuffer;
  unsigned char   smallBuffer[1000];
  int	    bigBufferSize;
  FILE    *filePtr;
  uint32  data;
  char    inputFileName[1024];
  char    fileName[1024];

  bigBufferSize = 0;
  bigBuffer = NULL;

  /* once we get IO port num, should transmit it to parallel server */

  serverSocket = CreateListeningSocket(&ioPortNum);

  if ( debugSockets ) {
    fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum);
  }

  TransmitPortNum(parallelHostName, portNum, ioPortNum);

  otherSize = sizeof(otherSocket);

  if ( separateConversion ) {
    SetFileType(ioConversion);	/* for reading */
  } else {
    SetFileType(inputConversion);
  }

  /* now, wait until get done signal */
  while ( ! done ) {
    otherSock = accept(serverSocket, &otherSocket, &otherSize);
    if ( otherSock == -1 ) {
      fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
      exit(1);
    }

    SafeRead(otherSock, (char *)buffer, 4);
    frameNumber = ntohl(buffer[0]);

    if ( frameNumber == -1 ) {
      done = TRUE;
    } else if ( frameNumber == -2 ) {
      /* decoded frame to be output to disk */
      SafeRead(otherSock, (char *)buffer, 4);
      frameNumber = ntohl(buffer[0]);	    

      if ( debugSockets ) {
	fprintf(stdout, "INPUT SERVER:  GETTING DECODED FRAME %d\n", frameNumber);
	fflush(stdout);
      }

      /* should read frame from socket, then write to disk */
      frame = Frame_New(frameNumber, 'i');

      Frame_AllocDecoded(frame, TRUE);

      for ( y = 0; y < Fsize_y; y++ ) {
	SafeRead(otherSock, (char *)frame->decoded_y[y], Fsize_x);
      }

      for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
	SafeRead(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
      }

      for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
	SafeRead(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
      }

      /* now output to disk */
      WriteDecodedFrame(frame);

      Frame_Free(frame);
    } else if ( frameNumber == -3 ) {
      /* request for decoded frame from disk */
      SafeRead(otherSock, (char *)buffer, 4);
      frameNumber = ntohl(buffer[0]);	    

      if ( debugSockets ) {
	fprintf(stdout, "INPUT SERVER:  READING DECODED FRAME %d from DISK\n", frameNumber);
	fflush(stdout);
      }

      /* should read frame from disk, then write to socket */
      frame = Frame_New(frameNumber, 'i');

      Frame_AllocDecoded(frame, TRUE);

      ReadDecodedRefFrame(frame, frameNumber);

      /* now write to socket */
      for ( y = 0; y < Fsize_y; y++ ) {
	SafeWrite(otherSock, (char *)frame->decoded_y[y], Fsize_x);
      }

      for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
	SafeWrite(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
      }

      for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
	SafeWrite(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
      }

      Frame_Free(frame);
    } else if ( frameNumber == -4 ) {
      /* routing output frame from socket to disk */
      SafeRead(otherSock, (char *)buffer, 8);
      frameNumber = buffer[0];
      frameNumber = ntohl(frameNumber);

      /* read in number of bytes */
      numBytes = buffer[1];
      numBytes = ntohl(numBytes);

      /* make sure buffer is big enough for data */
      if ( numBytes > bigBufferSize ) {
	bigBufferSize = numBytes;
	if ( bigBuffer != NULL ) {
	  free(bigBuffer);
	}

	bigBuffer = (unsigned char *) malloc(bigBufferSize*
					     sizeof(unsigned char));
      }

      /* now read in the bytes */
      SafeRead(otherSock, (char *) bigBuffer, numBytes);

      /* open file to output this stuff to */
      sprintf(fileName, "%s.frame.%d", outputFileName, frameNumber);
      if ( (filePtr = fopen(fileName, "wb")) == NULL ) {
	fprintf(stderr, "ERROR:  Could not open output file(3):  %s\n",
		fileName);
	exit(1);
      }

      /* now write the bytes here */
      fwrite(bigBuffer, sizeof(char), numBytes, filePtr);

      fclose(filePtr);

      if ( debugSockets ) {
	fprintf(stdout, "====I/O SERVER:  WROTE FRAME %d to disk\n",
		frameNumber);
	fflush(stdout);
      }
    } else {
      if ( debugSockets ) {
	fprintf(stdout, "I/O SERVER GETTING FRAME %d\n", frameNumber);
	fflush(stdout);
      }

      /* should read in frame, then write to socket */
      frame = Frame_New(frameNumber, 'i');

      if ( separateConversion ) {
	GetNthInputFileName(inputFileName, frameNumber);

	/* do conversion and send right to the socket */
	filePtr = ReadIOConvert(inputFileName);
	do {
	  numBytes = fread(smallBuffer, 1, 1000, filePtr);

	  if ( numBytes > 0 ) {
	    data = numBytes;
	    data = htonl(data);
	    SafeWrite(otherSock, (char *)&data, 4);
	    SafeWrite(otherSock, (char *)smallBuffer, numBytes);
	  }
	}
	while ( numBytes == 1000 );

	if ( strcmp(ioConversion, "*") == 0 ) {
	  fclose(filePtr);
	} else {
	  pclose(filePtr);
	}
      } else {
	GetNthInputFileName(inputFileName, frameNumber);
	ReadFrame(frame, inputFileName, inputConversion, TRUE);

	/* should now transmit yuv values */
	for (y = 0; y < yuvHeight; y++) { /* Y */
	  SafeWrite(otherSock, (char *)frame->orig_y[y], yuvWidth);
	}

	for (y = 0; y < (yuvHeight >> 1); y++) { /* U */
	  SafeWrite(otherSock, (char *)frame->orig_cb[y], yuvWidth >> 1);
	}

	for (y = 0; y < (yuvHeight >> 1); y++) { /* V */
	  SafeWrite(otherSock, (char *)frame->orig_cr[y], yuvWidth >> 1);
	}

	/* now, make sure we don't leave until other processor read everything */

	SafeRead(otherSock, (char *)buffer, 4);
	/* should = 0 */
      }

      if ( debugSockets ) {
	fprintf(stdout, "====I/O SERVER:  READ FRAME %d\n",
		frameNumber);
      }

      Frame_Free(frame);
    }

    close(otherSock);
  }

  close(serverSocket);

  if ( debugSockets ) {
    fprintf(stdout, "====I/O SERVER:  Shutting Down\n");
  }
}


/*===========================================================================*
 *
 * SendRemoteFrame
 *
 *	called by a slave to the I/O server; sends an encoded frame
 *	to the server to be sent to disk
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
SendRemoteFrame(frameNumber, bb)
    int frameNumber;
    BitBucket *bb;
{
    int	clientSocket;
    u_long  data;
    int	    negativeFour = -4;
    time_t  tempTimeStart, tempTimeEnd;

    time(&tempTimeStart);

    clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);

    data = htonl(negativeFour);
    SafeWrite(clientSocket, (char *)&data, 4);

    data = htonl(frameNumber);
    SafeWrite(clientSocket, (char *)&data, 4);

    if ( frameNumber != -1 ) {
	/* send number of bytes */
	data = (bb->totalbits+7)>>3;
	data = htonl(data);
	SafeWrite(clientSocket, (char *)&data, 4);

	/* now send the bytes themselves */
	Bitio_WriteToSocket(bb, clientSocket);
    }

    close(clientSocket);

    time(&tempTimeEnd);
    IOtime += (tempTimeEnd-tempTimeStart);
}




/*===========================================================================*
 *
 * NoteFrameDone
 *
 *	called by slave to the Combine server; tells it these frames are
 *	done
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
NoteFrameDone(frameStart, frameEnd)
    int frameStart;
    int frameEnd;
{
    int	clientSocket;
    u_long  data;
    int	    negativeTwo = -2;
    time_t  tempTimeStart, tempTimeEnd;

    time(&tempTimeStart);

    clientSocket = ConnectToSocket(IOhostName, combinePortNumber, &hostEntry);

    data = negativeTwo;
    data = htonl(negativeTwo);
    SafeWrite(clientSocket, (char *)&data, 4);

    data = htonl(frameStart);
    SafeWrite(clientSocket, (char *)&data, 4);

    data = htonl(frameEnd);
    SafeWrite(clientSocket, (char *)&data, 4);

    close(clientSocket);

    time(&tempTimeEnd);
    IOtime += (tempTimeEnd-tempTimeStart);
}


/*===========================================================================*
 *
 * GetRemoteFrame
 *
 *	called by a slave; gets a remote frame from the I/O server
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  GetRemoteFrame(frame, frameNumber)
MpegFrame *frame;
int frameNumber;
{
  FILE    *filePtr;
  int	clientSocket;
  unsigned char   smallBuffer[1000];
  register int y;
  int	    numBytes;
  u_long  data;
  char    fileName[256];

  Fsize_Note(frameNumber, yuvWidth, yuvHeight);

  if ( debugSockets ) {
    fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
	    getenv("HOST"), frameNumber);
    fflush(stdout);
  }

  clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);

  data = frameNumber;
  data = htonl(data);
  SafeWrite(clientSocket, (char *)&data, 4);

  if ( frameNumber != -1 ) {
    if ( separateConversion ) {
      sprintf(fileName, "/tmp/foobar%d", machineNumber);
      filePtr = fopen(fileName, "wb");

      /* read in stuff, SafeWrite to file, perform local conversion */
      do {
	SafeRead(clientSocket, (char *)&numBytes, 4);
	numBytes = ntohl(numBytes);

	SafeRead(clientSocket, (char *)smallBuffer, numBytes);

	fwrite(smallBuffer, 1, numBytes, filePtr);
      } while ( numBytes == 1000 );
      fflush(filePtr);
      fclose(filePtr);

      /* now do slave conversion */
      ReadFrame(frame, fileName, slaveConversion, FALSE);
    } else {
      Frame_AllocYCC(frame);

      if ( debugSockets ) {
	fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
		getenv("HOST"), frameNumber);
	fflush(stdout);
      }

      /* should now read yuv values */
      for (y = 0; y < yuvHeight; y++) {	/* Y */
	SafeRead(clientSocket, (char *)frame->orig_y[y], yuvWidth);
      }

      for (y = 0; y < (yuvHeight >> 1); y++) { /* U */
	SafeRead(clientSocket, (char *)frame->orig_cb[y], yuvWidth>>1);
      }

      for (y = 0; y < (yuvHeight >> 1); y++) { /* V */
	SafeRead(clientSocket, (char *)frame->orig_cr[y], yuvWidth>>1);
      }
    }
  }

  data = 0;
  data = htonl(data);
  SafeWrite(clientSocket, (char *)&data, 4);

  close(clientSocket);

  if ( debugSockets ) {
    fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
	    getenv("HOST"), frameNumber);
    fflush(stdout);
  }
}


/*===========================================================================*
 *
 * StartCombineServer
 *
 *	start-up the CombineServer with this process
 *	handles combination of frames, and tells the
 *	master when it's done
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  StartCombineServer(numInputFiles, outputFileName, parallelHostName, portNum)
int numInputFiles;
char *outputFileName;
char *parallelHostName;
int portNum;
{
  int	    combinePortNum;
  FILE    *ofp;
  
  /* once we get Combine port num, should transmit it to parallel server */
  
  outputServerSocket = CreateListeningSocket(&combinePortNum);
  
  if ( debugSockets ) {
    fprintf(stdout, "====OUTPUT USING PORT %d\n", combinePortNum);
  }
  
  TransmitPortNum(parallelHostName, portNum, combinePortNum);
  
  frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
  memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
  
  if ( (ofp = fopen(outputFileName, "wb")) == NULL ) {
    fprintf(stderr, "ERROR:  Could not open output file!!\n");
    fflush(stderr);
    exit(1);
  }
  FramesToMPEG(numInputFiles, outputFileName, ofp, TRUE);
  
  if ( debugSockets ) {
    fprintf(stdout, "====COMBINE SERVER:  Shutting Down\n");
    fflush(stdout);
  }
  
  /* tell Master server we are done */
  TransmitPortNum(parallelHostName, portNum, combinePortNum);
  
  close(outputServerSocket);
}


/*===========================================================================*
 *
 * WaitForOutputFile
 *
 *	keep handling output events until we get the specified frame
 *	number
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  WaitForOutputFile(number)
int number;
{
  int	    otherSock;
  static int otherSize = sizeof(struct sockaddr);
  struct sockaddr otherSocket;
  int	    frameNumber;
  int32   buffer[8];
  int frameStart, frameEnd;

  while ( ! frameDone[number] ) {
    otherSock = accept(outputServerSocket, &otherSocket, &otherSize);
    if ( otherSock == -1 ) {
      fprintf(stderr, "ERROR:  Combine SERVER accept returned error %d\n", errno);
      exit(1);
    }

    SafeRead(otherSock, (char *)buffer, 4);
    frameNumber = ntohl(buffer[0]);

    if ( frameNumber == -2 ) {
      /* this is notification from non-remote process that a frame is done */

      SafeRead(otherSock, (char *)buffer, 8);
      frameStart = buffer[0];
      frameStart = ntohl(frameStart);
      frameEnd = buffer[1];
      frameEnd = ntohl(frameEnd);

      for ( frameNumber = frameStart; frameNumber <= frameEnd;
	   frameNumber++ ) {
	frameDone[frameNumber] = TRUE;
      }
    }

    close(otherSock);
  }

  if ( debugSockets ) {
    fprintf(stdout, "WAIT FOR FRAME %d over\n", number);
    fflush(stdout);
  }
}


/*=====================*
 * MASTER SERVER STUFF *
 *=====================*/


/*===========================================================================*
 *
 * StartMasterServer
 *
 *	start the master server with this process
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  StartMasterServer(numInputFiles, paramFile, outputFileName)
int numInputFiles;
char *paramFile;
char *outputFileName;
{
  FILE    *filePtr;
  register int ind, ind2;
  int	    framesPerMachine;
  char    command[1024];
  char    *hostName;
  int	    portNum;
  int	    serverSocket;
  boolean finished[MAX_MACHINES];
  int	    numFinished;
  int	    otherSock, otherSize;
  struct sockaddr otherSocket;
  int	    seconds;
  int32   buffer[8];
  int ioPortNum[MAX_IO_SERVERS];
  int	    combinePortNum, decodePortNum;
  int	    nextFrame;
  int	    startFrames[MAX_MACHINES];
  int	    numFrames[MAX_MACHINES];
  int	    lastNumFrames[MAX_MACHINES];
  int	    numSeconds[MAX_MACHINES];
  float   fps[MAX_MACHINES];
  int	    numMachinesToEstimate;
  float   framesPerSecond;
  float   totalFPS, localFPS;
  int	    framesDone;
  float   avgFPS;
  char    niceNess[256];
  int32   startFrame, endFrame;
  int numInputPorts = 0;
  int	numRemote = SOMAXCONN;
  int totalRemote = 0;
  time_t  startUpBegin, startUpEnd;
  time_t  shutDownBegin, shutDownEnd;
  float   timeChunk;

  time(&startUpBegin);

  if ( niceProcesses ) {
    sprintf(niceNess, "nice");
  } else {
    niceNess[0] = '\0';
  }

  time(&timeStart);

  PrintStartStats(-1, 0);

  /* create a server socket */
  hostName = getenv("HOST");

  if ( hostName == NULL ) {
    fprintf(stderr, "ERROR:  Set HOST environment variable\n");
    exit(1);
  }

  hostEntry = gethostbyname(hostName);
  if ( hostEntry == NULL ) {
    fprintf(stderr, "ERROR:  Could not find host %s in database\n",
	    hostName);
    exit(1);
  }

  hostName = hostEntry->h_name;

  serverSocket = CreateListeningSocket(&portNum);
  if ( debugSockets ) {
    fprintf(stdout, "---USING PORT %d\n", portNum);
  }

  /* START COMBINE SERVER */
  sprintf(command, "%s -max_machines %d -output_server %s %d %d %s",
	  encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
  safe_fork(command);

  /* should now listen for connection from Combine server */
  otherSize = sizeof(otherSocket);
  otherSock = accept(serverSocket, &otherSocket, &otherSize);
  if ( otherSock == -1 ) {
    fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
    exit(1);
  }

  SafeRead(otherSock, (char *)(&combinePortNum), 4);
  combinePortNum = ntohl(combinePortNum);
  combinePortNumber = combinePortNum;
  close(otherSock);

  if ( debugSockets ) {
    fprintf(stdout, "---MASTER SERVER:  Combine port number = %d\n",
	    combinePortNum);
  }

  /* START DECODE SERVER if necessary */
  if ( referenceFrame == DECODED_FRAME ) {
    sprintf(command, "%s -max_machines %d -decode_server %s %d %d %s",
	    encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
    safe_fork(command);

    /* should now listen for connection from Decode server */
    otherSize = sizeof(otherSocket);
    otherSock = accept(serverSocket, &otherSocket, &otherSize);
    if ( otherSock == -1 ) {
      fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
      exit(1);
    }

    SafeRead(otherSock, (char *)(&decodePortNum), 4);
    decodePortNum = ntohl(decodePortNum);
    close(otherSock);

    if ( debugSockets ) {
      fprintf(stdout, "---MASTER SERVER:  Decode port number = %d\n",
	      decodePortNum);
    }
  }

  /* we are doing whole thing (if not, see above) */

  framesPerMachine = numInputFiles/numMachines;

  numFinished = 0;

  /* count number of remote machines */
  for ( ind = 0; ind < numMachines; ind++ ) {
    fps[ind] = -1.0;		/* illegal value as flag */
    if ( remote[ind] ) {
      totalRemote++;
    }
  }

  /* DO INITIAL TIME TESTS */
  nextFrame = 0;
  for ( ind = 0; ind < numMachines; ind++ ) {
    if ( (totalRemote != 0) && (numRemote == SOMAXCONN) ) {
      /* Create an I/O server */
      sprintf(command, "%s -max_machines %d -io_server %s %d %s",
	      encoder_name, numMachines, hostName, portNum, paramFile);
      safe_fork(command);

      /* should now listen for connection from I/O server */
      otherSize = sizeof(otherSocket);
      otherSock = accept(serverSocket, &otherSocket, &otherSize);
      if ( otherSock == -1 ) {
	fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
	exit(1);
      }

      SafeRead(otherSock, (char *)(&ioPortNum[numInputPorts]), 4);
      ioPortNum[numInputPorts] = ntohl(ioPortNum[numInputPorts]);
      close(otherSock);

      if ( debugSockets ) {
	fprintf(stdout, "---MASTER SERVER:  I/O port number = %d\n",
		ioPortNum[numInputPorts]);
      }

      numInputPorts++;
      numRemote = 0;
    }

    finished[ind] = FALSE;
    numSeconds[ind] = 0;

    startFrame = nextFrame;
    if ( parallelPerfect ) {
      endFrame = startFrame+((numInputFiles-startFrame)/
			     (numMachines-ind))  -1;

      if ( forceIalign ) {
	while (FType_Type(endFrame) != 'i') {endFrame++;}
      }

      /* always give at least 1 frame */
      if ( endFrame < startFrame ) {
	endFrame = startFrame;
      }

      /* make sure not out of bounds */
      if ( endFrame >= numInputFiles ) {
	endFrame = numInputFiles-1;
      }
    } else if ( forceIalign ) {
      endFrame = startFrame+framePatternLen-1;
      while (FType_Type(endFrame) != 'i') {endFrame++;}
    } else {
      endFrame = startFrame+parallelTestFrames-1;
    }
	    
    if ( remote[ind] ) {
      sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
	      rsh,
	      machineName[ind], userName[ind], niceNess,
	      executable[ind],
	      hostName, portNum, ioPortNum[numInputPorts-1],
	      combinePortNum, decodePortNum, ind,
	      remote[ind],
	      startFrame, endFrame,
	      remoteParamFile[ind]);
      numRemote++;
      totalRemote--;
    } else {
      sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
	      rsh,
	      machineName[ind], userName[ind], niceNess,
	      executable[ind],
	      hostName, portNum, ioPortNum[numInputPorts-1],
	      combinePortNum, decodePortNum, ind,
	      remote[ind],
	      startFrame, endFrame,
	      paramFile);
    }

    if ( debugMachines ) {
      fprintf(stdout, "---%s:  frames %d to %d\n",
	      machineName[ind],
	      startFrame, endFrame);
    }
	

    safe_fork(command);

    nextFrame = endFrame+1;
    startFrames[ind] = startFrame;
    numFrames[ind] = endFrame-startFrame+1;
    lastNumFrames[ind] = endFrame-startFrame+1;
  }

  framesDone = 0;

  time(&startUpEnd);

  /* now, wait for other processes to finish and boss them around */
  while ( numFinished != numMachines ) {
    otherSize = sizeof(otherSocket);
    otherSock = accept(serverSocket, &otherSocket, &otherSize);
    if ( otherSock == -1 ) {
      fprintf(stderr, "ERROR:  MASTER SERVER 2 accept returned error %d\n", errno);
      exit(1);
    }

    SafeRead(otherSock, (char *)buffer, 8);

    ind = ntohl(buffer[0]);
    seconds = ntohl(buffer[1]);

    NoteFrameDone(startFrames[ind],
		  startFrames[ind]+lastNumFrames[ind]-1);

    numSeconds[ind] += seconds;
    fps[ind] = (float)numFrames[ind]/(float)numSeconds[ind];

    if ( seconds != 0 )
      framesPerSecond = (float)lastNumFrames[ind]/(float)seconds;
    else
      framesPerSecond = (float)lastNumFrames[ind]*2.0;

    framesDone += lastNumFrames[ind];

    if ( nextFrame >= numInputFiles ) {
      buffer[0] = htonl(-1);
      buffer[1] = htonl(0);
      SafeWrite(otherSock, (char *)buffer, 8);
      numFinished++;

      if ( debugMachines ) {
	fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  DONE\n",
		machineName[ind], framesPerSecond, numFinished,
		numMachines);
      }
    } else {
      if (numSeconds[ind] != 0) {
	avgFPS = (float)numFrames[ind]/(float)numSeconds[ind];
      } else {
	avgFPS = 0.1;		/* arbitrary small value */
      }

      startFrame = nextFrame;

      if ( parallelTimeChunks == -1 ) {	/* TAPER STUFF */
	/* estimate time left */
	/* frames left = numInputFiles-nextFrame */
	totalFPS = 0.0;
	numMachinesToEstimate = 0;
	for ( ind2 = 0; ind2 < numMachines; ind2++ ) {
	  if ( fps[ind2] < 0.0 ) {
	    numMachinesToEstimate++;
	  } else {
	    totalFPS += fps[ind2];
	  }
	}

	totalFPS = (float)numMachines*
	  (totalFPS/(float)(numMachines-numMachinesToEstimate));

	timeChunk = (float)(numInputFiles-nextFrame)/totalFPS;

	fprintf(stdout, "ASSIGNING %s %.2f seconds of work\n",
		machineName[ind], timeChunk);
	fflush(stdout);
	endFrame = nextFrame +
	  (int)((float)timeChunk*avgFPS) - 1;
      } else {
	endFrame = nextFrame +
	  (int)((float)parallelTimeChunks*avgFPS) - 1;
      }

      if ( forceIalign ) {
	while (FType_Type(endFrame) != 'i') {endFrame++;}
      }

      if ( endFrame < startFrame ) { /* always give at least 1 frame */
	endFrame = startFrame;
      }
      if ( endFrame >= numInputFiles ) {
	endFrame = numInputFiles-1;
      }

      nextFrame = endFrame+1;

      startFrames[ind] = startFrame;
      numFrames[ind] += (endFrame-startFrame+1);
      lastNumFrames[ind] = (endFrame-startFrame+1);

      if ( debugMachines ) {
	fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  next:  %d to %d\n",
		machineName[ind], framesPerSecond, numFinished,
		numMachines, startFrame, endFrame);
      }

      buffer[0] = htonl(startFrame);
      buffer[1] = htonl(endFrame);

      SafeWrite(otherSock, (char *)buffer, 8);
    }

    close(otherSock);

    if ( debugMachines ) {
      fprintf(stdout, "---FRAMES DONE:  %d\tFARMED OUT:  %d\tLEFT:  %d\n",
	      framesDone, nextFrame-framesDone, numInputFiles-nextFrame);
    }
  }

  time(&shutDownBegin);

  /* end all input servers */
  IOhostName = hostName;
  for ( ind = 0; ind < numInputPorts; ind++ ) {
    ioPortNumber = ioPortNum[ind];
    EndIOServer();
  }

  /* now wait for CombineServer to tell us they're done */
  otherSize = sizeof(otherSocket);
  otherSock = accept(serverSocket, &otherSocket, &otherSize);
  if ( otherSock == -1 ) {
    fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
    exit(1);
  }

  SafeRead(otherSock, (char *)buffer, 4);
  close(otherSock);
    
  close(serverSocket);

  time(&timeEnd);
  diffTime = (int32)(timeEnd-timeStart);

  time(&shutDownEnd);

  for ( ind2 = 0; ind2 < 2; ind2++ ) {
    if ( ind2 == 0 ) {
      filePtr = stdout;
    } else if ( statFile != NULL ) {
      filePtr = statFile;
    } else {
      continue;
    }

    fprintf(filePtr, "\n\n");
    fprintf(filePtr, "PARALLEL SUMMARY\n");
    fprintf(filePtr, "----------------\n");
    fprintf(filePtr, "\n");
    fprintf(filePtr, "START UP TIME:  %d seconds\n",
	    (int)startUpEnd-(int)startUpBegin);
    fprintf(filePtr, "SHUT DOWN TIME:  %d seconds\n",
	    (int)shutDownEnd-(int)shutDownBegin);

    fprintf(filePtr, "%14s\tFrames\tSeconds\tFrames Per Second\tSelf Time\n",
	    "MACHINE");
    fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
    totalFPS = 0.0;
    for ( ind = 0; ind < numMachines; ind++ ) {
      localFPS = (float)numFrames[ind]/(float)numSeconds[ind];
      fprintf(filePtr, "%14s\t%d\t%d\t%f\t\t%d\n",
	      machineName[ind], numFrames[ind], numSeconds[ind],
	      localFPS, (int)((float)numInputFiles/localFPS));
      totalFPS += localFPS;
    }

    fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");

    fprintf(filePtr, "%14s\t\t%d\t%f\n", "OPTIMAL", 
	    (int)((float)numInputFiles/totalFPS),
	    totalFPS);
    fprintf(filePtr, "%14s\t\t%d\t%f\n", "ACTUAL", diffTime, 
	    (float)numInputFiles/(float)diffTime);

    fprintf(filePtr, "\n\n");
  }

  if ( statFile != NULL ) {
    fclose(statFile);
  }
}


/*===========================================================================*
 *
 * NotifyMasterDone
 *
 *	called by a slave process; tells the master process it is done
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
boolean
  NotifyMasterDone(hostName, portNum, machineNumber, seconds, frameStart,
		   frameEnd)
char *hostName;
int portNum;
int machineNumber;
int seconds;
int *frameStart;
int *frameEnd;
{
  int	clientSocket;
  int32   buffer[8];
  time_t  tempTimeStart, tempTimeEnd;

  time(&tempTimeStart);

  clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);

  buffer[0] = htonl(machineNumber);
  buffer[1] = htonl(seconds);

  SafeWrite(clientSocket, (char *)buffer, 8);

  SafeRead(clientSocket, (char *)buffer, 8);
  *frameStart = ntohl(buffer[0]);
  *frameEnd = ntohl(buffer[1]);

  close(clientSocket);

  time(&tempTimeEnd);
  IOtime += (tempTimeEnd-tempTimeStart);

  return ((*frameStart) >= 0);
}


/*===========================================================================*
 *
 * StartDecodeServer
 *
 *	start-up the DecodeServer with this process
 *	handles transfer of decoded frames to/from processes, and exits
 *	when master tells it to
 *	this is necessary only if referenceFrame == DECODED_FRAME
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  StartDecodeServer(numInputFiles, decodeFileName, parallelHostName, portNum)
int numInputFiles;
char *decodeFileName;
char *parallelHostName;
int portNum;
{
  int	    otherSock, otherSize;
  struct sockaddr otherSocket;
  int	    decodePortNum;
  int32   buffer[8];
  int	    frameReady;
  boolean *ready;
  int	    *waitMachine;
  int	    *waitPort;
  int	    *waitList;
  int	    slaveNumber;
  int	    slavePort;
  int	    waitPtr;
  struct hostent *nullHost = NULL;
  int	    clientSocket;

  /* should keep list of port numbers to notify when frames become ready */

  ready = (boolean *) calloc(numInputFiles, sizeof(boolean));
  waitMachine = (int *) calloc(numInputFiles, sizeof(int));
  waitPort = (int *) malloc(numMachines*sizeof(int));
  waitList = (int *) calloc(numMachines, sizeof(int));

  /* once we get Decode port num, should transmit it to parallel server */

  decodeServerSocket = CreateListeningSocket(&decodePortNum);

  if ( debugSockets ) {
    fprintf(stdout, "====DECODE USING PORT %d\n", decodePortNum);
  }

  TransmitPortNum(parallelHostName, portNum, decodePortNum);

  frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
  memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));

  /* wait for ready signals and requests */
  while ( TRUE ) {
    otherSize = sizeof(otherSocket);
    otherSock = accept(decodeServerSocket, &otherSocket, &otherSize);
    if ( otherSock == -1 ) {
      fprintf(stderr, "ERROR:  DECODE SERVER accept returned error %d\n", errno);
      exit(1);
    }

    SafeRead(otherSock, (char *)buffer, 4);
    frameReady = buffer[0];
    frameReady = ntohl(frameReady);

    if ( frameReady == -2 ) {
      SafeRead(otherSock, (char *)buffer, 4);
      frameReady = buffer[0];
      frameReady = ntohl(frameReady);

      if ( debugSockets ) {
	fprintf(stdout, "====DECODE SERVER:  REQUEST FOR %d\n", frameReady);
	fflush(stdout);	    
      }

      /* now respond if it's ready yet */
      buffer[0] = frameDone[frameReady];
      buffer[0] = htonl(buffer[0]);
      SafeWrite(otherSock, (char *)buffer, 4);

      if ( ! frameDone[frameReady] ) {
	/* read machine number, port number */
	SafeRead(otherSock, (char *)buffer, 8);
	slaveNumber = buffer[0];
	slaveNumber = ntohl(slaveNumber);
	slavePort = buffer[1];
	slavePort = ntohl(slavePort);

	if ( debugSockets ) {
	  fprintf(stdout, "WAITING:  SLAVE %d, PORT %d\n",
		  slaveNumber, slavePort);
	}

	waitPort[slaveNumber] = slavePort;
	if ( waitMachine[frameReady] == 0 ) {
	  waitMachine[frameReady] = slaveNumber+1;
	} else {
	  /* someone already waiting for this frame */
	  /* follow list of waiters to the end */
	  waitPtr = waitMachine[frameReady]-1;
	  while ( waitList[waitPtr] != 0 ) {
	    waitPtr = waitList[waitPtr]-1;
	  }

	  waitList[waitPtr] = slaveNumber+1;
	  waitList[slaveNumber] = 0;
	}
      }
    } else {
      frameDone[frameReady] = TRUE;

      if ( debugSockets ) {
	fprintf(stdout, "====DECODE SERVER:  FRAME %d READY\n", frameReady);
	fflush(stdout);
      }

      if ( waitMachine[frameReady] ) {
	/* need to notify one or more machines it's ready */
	waitPtr = waitMachine[frameReady]-1;
	while ( waitPtr >= 0 ) {
	  clientSocket = ConnectToSocket(machineName[waitPtr],
					 waitPort[waitPtr],
					 &nullHost);
	  close(clientSocket);
	  waitPtr = waitList[waitPtr]-1;
	}
      }
    }

    close(otherSock);
  }

  if ( debugSockets ) {
    fprintf(stdout, "====DECODE SERVER:  Shutting Down\n");
    fflush(stdout);
  }

  /* tell Master server we are done */
  TransmitPortNum(parallelHostName, portNum, decodePortNum);

  close(decodeServerSocket);
}


/*=====================*
 * INTERNAL PROCEDURES *
 *=====================*/


/*===========================================================================*
 *
 * TransmitPortNum
 *
 *	called by the I/O or Combine server; transmits the appropriate
 *	port number to the master
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
static void
  TransmitPortNum(hostName, portNum, newPortNum)
char *hostName;
int portNum;
int newPortNum;
{
  int	clientSocket;
  u_long  data;

  clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);

  data = htonl(newPortNum);
  SafeWrite(clientSocket, (char *) &data, 4);

  close(clientSocket);
}


/*===========================================================================*
 *
 * SafeRead
 *
 *	safely read from the given socket; the procedure keeps reading until
 *	it gets the number of bytes specified
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
static void
  SafeRead(fd, buf, nbyte)
int fd;
char *buf;
int nbyte;
{
  int numRead;
  int result;

  numRead = 0;

  while ( numRead != nbyte ) {
    result = read(fd, &buf[numRead], nbyte-numRead);

    if ( result == -1 ) {
      fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
	      nbyte-numRead, nbyte, errno);
      exit(1);
    }
    numRead += result;
  }
}


/*===========================================================================*
 *
 * SafeWrite
 *
 *	safely write to the given socket; the procedure keeps writing until
 *	it sends the number of bytes specified
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
static void
  SafeWrite(fd, buf, nbyte)
int fd;
char *buf;
int nbyte;
{
  int numWritten;
  int result;

  numWritten = 0;

  while ( numWritten != nbyte ) {
    result = write(fd, &buf[numWritten], nbyte-numWritten);

    if ( result == -1 ) {
      fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
	      nbyte-numWritten, nbyte, errno);
      exit(1);
    }
    numWritten += result;
  }
}


/*===========================================================================*
 *
 * EndIOServer
 *
 *	called by the master process -- tells the I/O server to commit
 *	suicide
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
static void
  EndIOServer()
{
  /* send signal to IO server:  -1 as frame number */
  GetRemoteFrame(NULL, -1);
}


/*===========================================================================*
 *
 * NotifyDecodeServerReady
 *
 *	called by a slave to the Decode Server to tell it a decoded frame
 *	is ready and waiting
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  NotifyDecodeServerReady(id)
int id;
{
  int	clientSocket;
  u_long  data;
  time_t  tempTimeStart, tempTimeEnd;

  time(&tempTimeStart);

  clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);

  data = htonl(id);
  SafeWrite(clientSocket, (char *)&data, 4);

  close(clientSocket);

  time(&tempTimeEnd);
  IOtime += (tempTimeEnd-tempTimeStart);
}


/*===========================================================================*
 *
 * WaitForDecodedFrame
 *
 *	blah blah blah
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  WaitForDecodedFrame(id)
int id;
{
  int	clientSocket;
  u_long  data;
  int	    negativeTwo = -2;
  int     ready;

  /* wait for a decoded frame */
  if ( debugSockets ) {
    fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id);
  }

  clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);

  /* first, tell DecodeServer we're waiting for this frame */
  data = negativeTwo;
  data = htonl(negativeTwo);
  SafeWrite(clientSocket, (char *)&data, 4);

  data = htonl(id);
  SafeWrite(clientSocket, (char *)&data, 4);

  SafeRead(clientSocket, (char *)&data, 4);
  ready = data;
  ready = ntohl(ready);

  if ( ! ready ) {
    int	    waitSocket;
    int	    waitPort;
    int	    otherSock, otherSize;
    struct sockaddr otherSocket;

    /* it's not ready; set up a connection and wait for decode server */
    waitSocket = CreateListeningSocket(&waitPort);

    /* tell decode server where we are */
    data = machineNumber;
    data = ntohl(data);
    SafeWrite(clientSocket, (char *)&data, 4);

    data = waitPort;
    data = ntohl(data);
    SafeWrite(clientSocket, (char *)&data, 4);

    close(clientSocket);

    if ( debugSockets ) {
      fprintf(stdout, "SLAVE:  WAITING ON SOCKET %d\n", waitPort);
      fflush(stdout);
    }

    otherSize = sizeof(otherSocket);
    otherSock = accept(waitSocket, &otherSocket, &otherSize);
    if ( otherSock == -1 ) {
      fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
      exit(1);
    }

    /* should we verify this is decode server? */
    /* for now, we won't */

    close(otherSock);

    close(waitSocket);
  } else {
    close(clientSocket);
  }

  if ( debugSockets ) {
    fprintf(stdout, "YE-HA FRAME %d IS NOW READY\n", id);
  }
}


/*===========================================================================*
 *
 * CreateListeningSocket
 *
 *	create a socket, using the first unused port number we can find
 *
 * RETURNS:	the socket; portNumber is modified appropriately
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
static int
  CreateListeningSocket(portNumber)
int *portNumber;
{
  int	    resultSocket;
  u_short tempShort;
  int	    result;
  struct sockaddr_in	nameEntry;

  resultSocket = socket(AF_INET, SOCK_STREAM, 0);
  if ( resultSocket == -1 ) {
    fprintf(stderr, "ERROR:  Call to socket() gave error %d\n", errno);
    exit(1);
  }

  memset((char *) &nameEntry, 0, sizeof(nameEntry));
  nameEntry.sin_family = AF_INET;

  /* find a port number that isn't used */
  (*portNumber) = 2048;
  do {
    (*portNumber)++;
    tempShort = (*portNumber);
    nameEntry.sin_port = htons(tempShort);
    result = bind(resultSocket, (struct sockaddr *) &nameEntry,
		  sizeof(struct sockaddr));
  }
  while ( result == -1 );

  /* would really like to wait for 1+numMachines machines, but this is max
   * allowable, unfortunately
   */
  result = listen(resultSocket, SOMAXCONN);
  if ( result == -1 ) {
    fprintf(stderr, "ERROR:  call to listen() gave error %d\n", errno);
    exit(1);
  }

  return resultSocket;
}


/*===========================================================================*
 *
 * ConnectToSocket
 *
 *	creates a socket and connects it to the specified socket
 *	hostEnt either is the host entry, or is NULL and needs to be
 *	found by using machineName
 *
 * RETURNS:	the socket
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
static int
  ConnectToSocket(machineName, portNum, hostEnt)
char *machineName;
int	portNum;
struct hostent **hostEnt;
{
  int	resultSocket;
  int	    result;
  u_short	    tempShort;
  struct sockaddr_in  nameEntry;

  if ( (*hostEnt) == NULL ) {
    (*hostEnt) = gethostbyname(machineName);
    if ( (*hostEnt) == NULL ) {
      fprintf(stderr, "ERROR:  Couldn't get host by name (%s)\n",
	      machineName);
      exit(1);
    }
  }

  resultSocket = socket(AF_INET, SOCK_STREAM, 0);
  if ( resultSocket == -1 ) {
    fprintf(stderr, "ERROR:  socket returned error %d\n", errno);
    exit(1);
  }

  nameEntry.sin_family = AF_INET;
  memset((void *) nameEntry.sin_zero, 0, 8);
  memcpy((void *) &(nameEntry.sin_addr.s_addr),
	 (void *) (*hostEnt)->h_addr_list[0],
	 (size_t) (*hostEnt)->h_length);
  tempShort = portNum;
  nameEntry.sin_port = htons(tempShort);

  result = connect(resultSocket, (struct sockaddr *) &nameEntry,
		   sizeof(struct sockaddr));
  if ( result == -1 ) {
    fprintf(stderr, "ERROR:  connect (ConnectToSocket, port %d) from machine %s returned error %d\n",
	    portNum, getenv("HOST"), errno);
    exit(1);
  }

  return resultSocket;
}


/*===========================================================================*
 *
 * SendDecodedFrame
 *
 *  Send the frame to the decode server.
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:    none
 *
 *===========================================================================*/
void
  SendDecodedFrame(frame)
MpegFrame *frame;
{
  int	clientSocket;
  register int y;
  int	    negativeTwo = -2;
  uint32  data;

  /* send to IOServer */
  clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);

  data = negativeTwo;
  data = htonl(data);
  SafeWrite(clientSocket, (char *)&data, 4);

  data = frame->id;
  data = htonl(data);
  SafeWrite(clientSocket, (char *)&data, 4);

  for ( y = 0; y < Fsize_y; y++ ) {
    SafeWrite(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
  }

  for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
    SafeWrite(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
  }

  for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
    SafeWrite(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
  }

  close(clientSocket);
}


/*===========================================================================*
 *
 * GetRemoteDecodedFrame
 *
 *  get the decoded frame from the decode server.
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:   
 *
 *===========================================================================*/
void
  GetRemoteDecodedRefFrame(frame, frameNumber)
MpegFrame *frame;
int frameNumber;
{
  int	clientSocket;
  register int y;
  int	    negativeThree = -3;
  uint32  data;

  /* send to IOServer */
  clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);

  /* ask IOServer for decoded frame */
  data = negativeThree;
  data = htonl(data);
  SafeWrite(clientSocket, (char *)&data, 4);

  data = frame->id;
  data = htonl(data);
  SafeWrite(clientSocket, (char *)&data, 4);

  for ( y = 0; y < Fsize_y; y++ ) {
    SafeRead(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
  }

  for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
    SafeRead(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
  }

  for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
    SafeRead(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
  }

  close(clientSocket);
    
}


/*********
  routines handling forks, execs, PIDs and signals
  save, system-style forks
  apian@ise.fhg.de
  *******/


/*===========================================================================*
 *
 * cleanup_fork
 *
 *  Kill all the children, to be used when we get killed
 *
 * RETURNS:	nothing
 *
 * SIDE EFFECTS:   kills other processes
 *
 *===========================================================================*/
void cleanup_fork( dummy )			/* try to kill all child processes */
     int dummy;
{
  register int i;
  for (i = 0;  i < current_max_forked_pid;  ++i ) {

#ifdef DEBUG_FORK
    fprintf(stderr, "cleanup_fork: killing PID %d\n", ClientPid[i]);
#endif

    if (kill(ClientPid[i], TERMINATE_PID_SIGNAL)) {
      fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n", 
	      ClientPid[i], errno);
    }
  }
}

/*===========================================================================*
 *
 * safe_fork
 *
 *  fork a command
 *
 * RETURNS:     success/failure
 *
 * SIDE EFFECTS:   Fork the command, and save to PID so you can kil it later!
 *
 *===========================================================================*/
static int safe_fork(command)		/* fork child process and remember its PID */
     char *command;
{
  static int init=0;
  char *argis[MAXARGS];
  register int i=1;
  
  if (!(argis[0] = strtok(command, " \t"))) return(0); /* tokenize */
  while ((argis[i] = strtok(NULL, " \t")) && i < MAXARGS) ++i;
  argis[i] = NULL;
  
#ifdef DEBUG_FORK
  {register int i=0; 
   fprintf(stderr, "Command %s becomes:\n", command);
   while(argis[i]) {fprintf(stderr, "--%s--\n", argis[i]); ++i;} }
#endif
  
  if (!init) {			/* register clean-up routine */
    signal (SIGQUIT, cleanup_fork);
    signal (SIGTERM, cleanup_fork);
    signal (SIGINT , cleanup_fork);
    init=1;
  }
  
  if (-1 == (ClientPid[current_max_forked_pid] = fork()) )  {
    perror("safe_fork: fork failed ");
    return(-1);
  }
  if( !ClientPid[current_max_forked_pid]) { /* we are in child process */
    execvp(argis[0], argis );
    perror("safe_fork child: exec failed ");
    exit(1);
  }
#ifdef DEBUG_FORK
  fprintf(stderr, "parallel: forked PID=%d\n", ClientPid[current_max_forked_pid]);
#endif
  current_max_forked_pid++;
  return(0);
}


syntax highlighted by Code2HTML, v. 0.9.1