/*
 *         PVM version 3.4:  Parallel Virtual Machine System
 *               University of Tennessee, Knoxville TN.
 *           Oak Ridge National Laboratory, Oak Ridge TN.
 *                   Emory University, Atlanta GA.
 *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
 *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
 *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
 *                   (C) 1997 All Rights Reserved
 *
 *                              NOTICE
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby granted
 * provided that the above copyright notice appear in all copies and
 * that both the copyright notice and this permission notice appear in
 * supporting documentation.
 *
 * Neither the Institutions (Emory University, Oak Ridge National
 * Laboratory, and University of Tennessee) nor the Authors make any
 * representations about the suitability of this software for any
 * purpose.  This software is provided ``as is'' without express or
 * implied warranty.
 *
 * PVM version 3 was funded in part by the U.S. Department of Energy,
 * the National Science Foundation and the State of Tennessee.
 */

/*
 *	pvmdmimd.c
 *
 *  MPP interface.
 *
 *		void mpp_init(int argc, char **argv):	
 *			Initialization. Create a table to keep track of active nodes.
 *			argc, argv: passed from main.
 *
 *		int mpp_load( struct waitc_spawn *wxp ) 
 *
 *			Load executable onto nodes; create new entries in task table,
 *			encode node number and process type into task IDs, etc.
 *
 *				Construction of Task ID:
 *
 *			 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0
 *			+-+-+-----------------------+-+-----+--------------------------+
 *			|s|g|		host index	    |n| prt |    	node # (16384)	   |
 *			+-+-+-----------------------+-+-----+--------------------------+
 *
 *			The "n" bit is set for node task but clear for host task.
 *
 *			flags:	exec options;
 *			name:	executable to be loaded;
 *			argv:	command line argument for executable
 *			count:	number of tasks to be created;
 *			tids:	array to store new task IDs;
 *			ptid:	parent task ID.
 *
 *			mpp_new(int count, int ptid):		
 *				Allocate a set of nodes. (called by mpp_load())
 *				count: number of nodes;  ptid: parent task ID.
 *
 *		int mpp_output():	
 *			Send all pending packets to nodes via native send. Node number
 *			and process type are extracted from task ID.
 *
 *		struct task *mpp_find(int pid):
 *			Find a task in task table by its Unix pid.
 *
 *		void mpp_free(struct task *tp):
 *			Remove node/process-type from active list.
 *			tp: task pointer.
 *
$Log: pvmdmimd.c,v $
Revision 1.2  2002/02/21 23:19:15  pvmsrc
Added new (not to be documented!) PVM_MAX_TASKS env var support.
	- for Mahir Lokvancic <mahir@math.ufl.edu>.
	- forcefully limits the number of tasks that can attach to a
		pvmd, required on Solaris in rare circumstances when hard
		FD_SETSIZE limit is reached, and all hell breaks loose...
	- check return for task_new() call, can now produce NULL ptr,
		indicating PvmOutOfRes...
(Spanker=kohl)

Revision 1.1  2000/02/17 21:02:24  pvmsrc
Architecture-specific Makefile and Pvmd code.
	- submitted by Paul Springer <pls@smokeymt.jpl.nasa.gov>.
	- hacked a little by Jeembo for convention compliance...  :-)
(Spanker=kohl)

 *
 */

#include <stdlib.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <netdb.h>
#ifdef  SYSVSTR
#include <string.h>
#define CINDEX(s,c) strchr(s,c)
#else
#include <strings.h>
#define CINDEX(s,c) index(s,c)
#endif

#include <pvm3.h>
#include <pvmproto.h>
#include "global.h"
#include "host.h"
#include "waitc.h"
#include "pvmalloc.h"
#include "pkt.h"
#include "task.h"
#include "listmac.h"
#if 0
#include "tvdefs.h"
#endif
#include "pvmdmp.h"
#include "pvmmimd.h"
#include "bfunc.h"

#define BLCOMM		"/usr/bin/rsh"
#define BLARGC		3

char *getenv();

/* Global */

extern int pvmdebmask;			/* from pvmd.c */
extern char **epaths;			/* from pvmd.c */
extern int myhostpart;			/* from pvmd.c */
extern int myndf;			/* from pvmd.c */
extern struct htab *hosts;		/* from pvmd.c */
extern struct task *locltasks;	/* from task.c */
extern int pvmmydsig;			/* from pvmd.c */
extern int pvmudpmtu;			/* from pvmd.c */


int tidtmask = TIDPTYPE;		/* mask for ptype field of tids */
int tidnmask = TIDNODE;			/* mask for node field of tids */

/* private */

static char rcsid[] = "$Id: pvmdmimd.c,v 1.2 2002/02/21 23:19:15 pvmsrc Exp $";
static struct nodeset *busynodes;	/* active nodes; ordered by proc type */
static char pvmtxt[512];		/* scratch for error log */
static int ptypemask;			/* mask; we use these bits of ptype in tids */
static u_long *nodeaddr = 0;
static char **nodelist = 0;		/* default poe node list */
static int partsize = 0;		/* total number of nodes allocated */

static int mpppvminfo[SIZEHINFO];

void
mpp_init(argc, argv)
	int *argc;
	char **argv;
{
	struct hostent *hostaddr;
	int 	i;
	int		n;
	struct in_addr	node_sin_addr;
	char 	nname[128];	/* node name */
	char 	*p, *q;
	char 	*plist;		/* processor list */

	if ((plist = getenv("PROC_LIST"))) 
		{
		sprintf(pvmtxt, 
		  "PROC_LIST=%s\n",plist);
		pvmlogerror(pvmtxt);
		p = plist;
		while (1)
			{
			while (*p == ':')
				p++;
			if (!*p)
				break;
			n = (q = CINDEX(p, ':')) ? q - p : strlen(p);
			partsize++;
			p += n;
			}

		nodeaddr = TALLOC(partsize, u_long, "nodeaddr");
		nodelist = TALLOC(partsize, char*, "nname");
		p = plist;
		i = 0;
		while (1)
			{	/* now get the node names */
			while (*p == ':')
				p++;
			if (!*p)
				break;
			n = (q = CINDEX(p, ':')) ? q - p : strlen(p);
			strncpy(nname, p, n);
			nname[n] = 0;
			if (!(hostaddr = gethostbyname( nname )))
				{
				sprintf( pvmtxt, "mpp_init() can't gethostbyname() for %s\n", 
						nname );
				pvmlogerror( pvmtxt );
				}
			else
				{	/* got addr, now save it */
				BCOPY( hostaddr->h_addr_list[0], (char*)&node_sin_addr,
					sizeof(struct in_addr));
				}
			nodeaddr[i] = node_sin_addr.s_addr;
			nodelist[i++] = STRALLOC(nname);
			p += n;
			}


		} 
	else 
		pvmlogerror("mpp_init() PROC_LIST must be set for parallelism.\n");

	sprintf(pvmtxt, "%d nodes in list.\n", partsize);
	pvmlogerror(pvmtxt);

	busynodes = TALLOC(1, struct nodeset, "nsets");
	BZERO((char*)busynodes, sizeof(struct nodeset));
	busynodes->n_link = busynodes;
	busynodes->n_rlink = busynodes;

	ptypemask = tidtmask >> (ffs(tidtmask) - 1);
}


/*
 * find a set of free nodes from nodelist; assign ptype sequentially,
 * only tasks spawned together get the same ptype
 */
struct nodeset *
mpp_new(count, ptid)
	int count;		/* number of nodes requested */
	int ptid;		/* parent's tid */
{
	struct nodeset *sp, *newp, *sp2;
	int last = -1;
	int ptype = 0;

	if (!(newp = TALLOC(1, struct nodeset, "nsets"))) {
		pvmlogerror("mpp_new() can't get memory\n");
		pvmbailout(0);
	}
	BZERO((char*)newp, sizeof(struct nodeset));

	newp->n_size = count;
	for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) {
		if (sp->n_first - last > count)
			break;
		last = sp->n_first + sp->n_size - 1;
/*
		if (sp->n_link == busynodes && partsize - last > count)
			break;
*/
		if (ptype <= sp->n_ptype)
			ptype = sp->n_ptype + 1;
	}
	if (sp == busynodes && partsize - last <= count) {
		pvmlogerror("mpp_new() not enough nodes in partition\n");
		PVM_FREE(newp);
		return (struct nodeset *)0;
	}
	for (sp2 = busynodes->n_link; sp2 != busynodes; sp2 = sp2->n_link)
		if ((sp2->n_ptype & ptypemask) == (ptype & ptypemask))
			break;
	if (sp2 != busynodes || ptype == NPARTITIONS) {
		for (ptype = 0; ptype < NPARTITIONS; ptype++) {
			for (sp2 = busynodes->n_link; sp2 != busynodes; sp2 = sp2->n_link)
				if ((sp2->n_ptype & ptypemask) == (ptype & ptypemask))
					break;
			if (sp2 == busynodes)
				break;
		}
		if (ptype == NPARTITIONS) {
			pvmlogerror("mpp_new() out of ptypes: too many spawns\n");
			return (struct nodeset *)0;
		}
	}

done:
	if (pvmdebmask & PDMNODE) {
		sprintf(pvmtxt, "mpp_new() %d nodes %d ... ptype=%d ptid=%x\n",
			count, last+1, ptype, ptid);
		pvmlogerror(pvmtxt);
	}
	newp->n_first = last + 1;
	newp->n_ptype = ptype;
	newp->n_ptid = ptid;
	newp->n_alive = count;
	sprintf( pvmtxt, "mpp_new() sp=%x sp->n_link=%x\n", sp, sp->n_link );
	pvmlogerror( pvmtxt );
	LISTPUTBEFORE(sp, newp, n_link, n_rlink);
	sprintf( pvmtxt, "mpp_new() sp=%x sp->n_link=%x\n", sp, sp->n_link );
	pvmlogerror( pvmtxt );

	return newp;
}

/*
 * remove node/ptype from active list; if tid is the last to go, shutdown
 * pvmhost's socket, but do not destroy the node set because pvmhost may 
 * not exit immediately. To avoid a race condition, let mpp_output()
 * do the cleanup.
 */
void
mpp_free(tp)
	struct task *tp;
{
	struct nodeset *sp;
	int ptype;
#if 0
	struct timeval tout;
#endif
	struct task *tp2;
	int tid = tp->t_tid;

	if (!TIDISNODE(tid))
		return;

	ptype = TIDTOTYPE(tid);
	tp->t_txq = 0;				/* don't free pvmhost's txq */
	sp = busynodes->n_link;
	sprintf( pvmtxt, "mpp_free() sp=%x sp->n_link=%x\n", sp, sp->n_link );
	pvmlogerror( pvmtxt );
	for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) {
		sprintf(pvmtxt, "mpp_free() n_ptype = %d, ptype = %d\n",
			sp->n_ptype, ptype );
		pvmlogerror(pvmtxt);
		if ((sp->n_ptype & ptypemask) == ptype) {

			if (pvmdebmask & PDMNODE) {
				sprintf(pvmtxt, "mpp_free() t%x type=%ld alive=%d\n",
					tid, sp->n_ptype, sp->n_alive);
				pvmlogerror(pvmtxt);
			}
			if (sp->n_alive == 0)
				{
				sprintf( pvmtxt, "mpp_free called for dead task t%x\n", tid );
				pvmlogerror( pvmtxt );
				return;
				}
			if (tp2 = task_findpid(tp->t_pid)) {
				/* find corresponding pvmhost and shut it down */
				tp2->t_flag |= TF_CLOSE;
#if 0
				pvmlogprintf( "mpp_free() Marked t%x for closure\n", tp2->t_tid );
#endif
				if (tp2->t_sock != -1) {
/*
					wrk_fds_delete(tp2->t_sock, 3);
					(void)close(tp2->t_sock);
					tp2->t_sock = -1;
*/
					shutdown(tp2->t_sock, 1);
				}
					/* close stdout after pvmhost dies */
				tp2->t_out = tp->t_out;
#if 0
				pvmlogprintf( "mpp_free() Set t%x t_tout to %d\n", tp2->t_tid, tp->t_out );
#endif
			}
			if (--sp->n_alive == 0) {
/*
				LISTDELETE(sp, n_link, n_rlink);
				PVM_FREE(sp);
*/
			}
#if 0
			if (tp->t_out >= 0) {

				fd_set rfds;
				FD_ZERO(&rfds);
				FD_SET(tp->t_out, &rfds);
				TVCLEAR(&tout);
				if (select(tp->t_out + 1,
						(fd_set *)&rfds, (fd_set *)0, (fd_set *)0,
						&tout) == 1)
					{
					pvmlogprintf( "mpp_free() unprinted stdout on t%x\n", tp->t_tid );
					loclstout( tp );
					}
				else
					pvmlogprintf( "mpp_free() no stdout on t%x\n", tp->t_tid );
			}
#endif
			tp->t_out = -1;		/* don't free shared stdout if alive > 0 */
			return;
		}
	}
	sprintf(pvmtxt, "mpp_free() t%x not active\n", tid);
	pvmlogerror(pvmtxt);
	return;
}


/* load executable onto the given set of nodes */
int
mpp_load( wxp )
struct waitc_spawn *wxp;
{
	int flags = 0;              /* exec options */
	char *name;             /* executable */
	char **argv;            /* arg list (argv[-1] must be there) */
	int count;				/* how many */
	int *tids;				/* array to store new tids */
	int ptid;				/* parent task ID */
	int ptypepart;			/* type field */
	int j;
	struct task *tp;
	struct pkt *hosttxq;	/* out-going queue of pvmhost */
	int err = 0;
	struct nodeset *sp;
	char c[128];			/* buffer to store count, name.host */
	int nargs;
	char **ep, **eplist;
	char path[MAXPATHLEN];
	struct stat sb;
	char **av;
	int hostout;			/* stdout of pvmhost */
	int hostpid;			/* Unix pid of pvmhost */

	static char *nullep[] = { "", 0 };

    /* -- initialize some variables from the waitc_spawn struct -- */

    name = wxp->w_argv[0];
    argv = wxp->w_argv;
    count = wxp->w_veclen;
    tids = wxp->w_vec;
    ptid = wxp->w_ptid;

	eplist = CINDEX(name, '/') ? nullep : epaths;

	for (ep = eplist; *ep; ep++) {
		/* search for file */
		(void)strcpy(path, *ep);
		if (path[0])
			(void)strcat(path, "/");
		(void)strncat(path, name, sizeof(path) - strlen(path) - 1);

		if (stat(path, &sb) == -1
		|| ((sb.st_mode & S_IFMT) != S_IFREG)
		|| !(sb.st_mode & S_IEXEC)) {
			if (pvmdebmask & PDMTASK) {
				sprintf(pvmtxt, "mpp_load() stat failed <%s>\n", path);
				pvmlogerror(pvmtxt);
			}
			continue;
		}

		if (!(sp = mpp_new(count, ptid))) {
			err = PvmOutOfRes;
			goto done;
		}
		ptypepart = (0 << (ffs(tidtmask) - 1)) | TIDONNODE;

		if (argv)
			for (nargs = 0; argv[nargs]; nargs++);
		else
			nargs = 0;

		/* ar[-1], rsh, host, command */
		nargs += BLARGC + 1;
		av = TALLOC(nargs + 1, char*, "argv");
		av++;				/* reserve room for debugger */
		BZERO((char*)av, nargs * sizeof(char*));
		av[0] = BLCOMM;
		av[2] = path;
		av[--nargs] = 0;
		for (j = 3; j < nargs; j++)
			av[j] = argv[j - 2];	/* poe name argv -procs # -euilib us */
/*
		if ((sock = mksock()) == -1) {
			err = PvmSysErr;
			goto done;
		}
*/
		if (flags & PvmTaskDebug)
			av++;					/* pdbx name -procs # -euilib us */

		for (j = 0; j < count; j++) 
			{
			av[1] = nodelist[sp->n_first + j];
			sprintf(pvmtxt, "mppload(): Forking to %s\n", av[1]);
			pvmlogerror(pvmtxt);
			/* if (err = forkexec(flags, av[0], av, 0, (char **)0, &tp)) */
			if (err = forkexec(flags, av[0], av, 0, (char **)0, 0,
				0, 0, &tp))
				goto done;
			tp->t_ptid = ptid;
			PVM_FREE(tp->t_a_out);
			sprintf(c, "%s.host", name);
			tp->t_a_out = STRALLOC(c);
			if (pvmdebmask & PDMNODE) 
				pvmlogprintf( "mpp_load() setting n_ptid to %x\n", tp->t_tid );
			sp->n_ptid = tp->t_tid;		/* pvmhost's tid */
			hosttxq = tp->t_txq;
			hostout = tp->t_out;
			hostpid = tp->t_pid;
			tp->t_out = -1;
			mpppvminfo[0] = TDPROTOCOL;
			mpppvminfo[1] = myhostpart + ptypepart;
			mpppvminfo[2] = ptid;
			mpppvminfo[3] = wxp->w_outtid;
			mpppvminfo[4] = wxp->w_outtag;
			mpppvminfo[5] = wxp->w_outctx;
			mpppvminfo[6] = wxp->w_trctid;
			mpppvminfo[7] = wxp->w_trctag;
			mpppvminfo[8] = wxp->w_trcctx;
			mpppvminfo[9] = pvmudpmtu;
			mpppvminfo[10] = pvmmydsig;
/*
		pvminfo[0] = TDPROTOCOL;
		pvminfo[1] = myhostpart + ptypepart;
		pvminfo[2] = ptid;
		pvminfo[3] = MAXFRAGSIZE;
		pvminfo[4] = myndf;
		if (sockconn(sock, tp, pvminfo) == -1) {
			err = PvmSysErr;
			task_free(tp);
			goto done;
		}
*/
			if (pvmdebmask & PDMTASK) {
				sprintf(pvmtxt, "mpp_load() %d type=%d ptid=%x t%x...\n",
					count, 0, ptid, myhostpart + ptypepart + sp->n_first + j);
				pvmlogerror(pvmtxt);
			}

		/* create new task structs */

			if ((tp = task_new(myhostpart + ptypepart
					+ sp->n_first + j)) == NULL) {
				err = PvmOutOfRes;
				goto done;
			}
			tp->t_a_out = STRALLOC(name);
			tp->t_ptid = ptid;
			tids[j] = tp->t_tid;
			PVM_FREE(tp->t_txq);
			tp->t_txq = hosttxq;		/* node tasks share pvmhost's txq */
			tp->t_out = hostout;        /* and stdout */
#if 0
			pvmlogprintf( "Setting t%x pid to p%d\n", tp->t_tid, hostpid );
#endif
			tp->t_pid = hostpid;		/* pvm_kill should kill pvmhost */
			tp->t_outtid = wxp->w_outtid;  /* catch stdout/stderr  */
			tp->t_outtag = wxp->w_outtag; 
			tp->t_outctx = wxp->w_outctx; 
		}
		return 0;
	}
    if (pvmdebmask & PDMTASK) {
        sprintf(pvmtxt, "mpp_load() didn't find <%s>\n", name);
        pvmlogerror(pvmtxt);
    }
    err = PvmNoFile;

done:
	for (j = 0; j < count; j++)
		tids[j] = err;
	return err;
}

/* Find task by socket address */

struct task *mpp_find( tp )
	struct task	*tp;
{
	int			i;
	struct task	*nodetp = 0;
	int			ptypepart;

	for (i = 0; i < partsize; i++)
		if (nodeaddr[i] == tp->t_sad.sin_addr.s_addr)
			{
			ptypepart = (0 << (ffs(tidtmask) - 1)) | TIDONNODE;
			sprintf(pvmtxt, "mpp_find looking for t%x\n", myhostpart + ptypepart + i);
			pvmlogerror(pvmtxt);
			if (nodetp = task_find( myhostpart + ptypepart + i ))
				break;	/* found match */
			}
	if (!nodetp)
		pvmlogerror( "mpp_find:  Task not found\n" );
	return nodetp;
}	/* mpp_find() */


/*
 * Add pvmhost's socket to wfds if there are packets waiting to
 * be sent to a related node task. Node tasks have no sockets;
 * they share pvmhost's packet queue (txq). Pvmhost simply
 * forwards any packets it receives to the appropriate node.
 */

int
mpp_output(dummy1, dummy2)
	struct task *dummy1;
	struct pkt *dummy2;
{
	struct nodeset *sp, *sp2;
	struct task *tp;
	int ptype;

	sp = busynodes->n_link;
	for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link)
		if ((tp = task_find(sp->n_ptid))) {
			if (tp->t_txq->pk_link->pk_buf && tp->t_sock != -1)
				wrk_fds_add(tp->t_sock, 2);
		} else {
			if (sp->n_alive) {
				sprintf(pvmtxt, "mpp_output() pvmhost %d died!\n", sp->n_ptype);
				pvmlogerror(pvmtxt);
				/* clean up tasks it serves */
				ptype = sp->n_ptype & ptypemask;
				for (tp = locltasks->t_link; tp != locltasks; tp = tp->t_link)
					if (TIDISNODE(tp->t_tid) && TIDTOTYPE(tp->t_tid) == ptype) {
						tp->t_txq = 0;
						tp = tp->t_rlink;
						task_cleanup(tp->t_link);
						task_free(tp->t_link);
					}
			}
			/* pvmhost has died, destroy the node set */
			sp2 = sp;
			sp = sp->n_rlink;
			LISTDELETE(sp2, n_link, n_rlink);
			PVM_FREE(sp2);
		}
	return 0;
}



syntax highlighted by Code2HTML, v. 0.9.1