static char rcsid[] =
	"$Id: pvmdmimd.c,v 1.10 2002/02/21 23:19:28 pvmsrc Exp $";

/*
 *         PVM version 3.4:  Parallel Virtual Machine System
 *               University of Tennessee, Knoxville TN.
 *           Oak Ridge National Laboratory, Oak Ridge TN.
 *                   Emory University, Atlanta GA.
 *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
 *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
 *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
 *                   (C) 1997 All Rights Reserved
 *
 *                              NOTICE
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby granted
 * provided that the above copyright notice appear in all copies and
 * that both the copyright notice and this permission notice appear in
 * supporting documentation.
 *
 * Neither the Institutions (Emory University, Oak Ridge National
 * Laboratory, and University of Tennessee) nor the Authors make any
 * representations about the suitability of this software for any
 * purpose.  This software is provided ``as is'' without express or
 * implied warranty.
 *
 * PVM version 3 was funded in part by the U.S. Department of Energy,
 * the National Science Foundation and the State of Tennessee.
 */

/*
 *	pvmdmimd.c
 *
 *  MPP interface.
 *
 *		void mpp_init(int argc, char **argv):	
 *			Initialization. Create a table to keep track of active nodes.
 *			argc, argv: passed from main.
 *
 *		int mpp_load( struct waitc_spawn *wxp ) 
 *
 *			Load executable onto nodes; create new entries in task table,
 *			encode node number and process type into task IDs, etc.
 *
 *				Construction of Task ID:
 *
 *			 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0
 *			+-+-+-----------------------+-+-----+--------------------------+
 *			|s|g|		host index	    |n| prt |    	node # (16384)	   |
 *			+-+-+-----------------------+-+-----+--------------------------+
 *
 *			The "n" bit is set for node task but clear for host task.
 *
 *			flags:	exec options;
 *			name:	executable to be loaded;
 *			argv:	command line argument for executable
 *			count:	number of tasks to be created;
 *			tids:	array to store new task IDs;
 *			ptid:	parent task ID.
 *
 *			mpp_new(int count, int ptid):		
 *				Allocate a set of nodes. (called by mpp_load())
 *				count: number of nodes;  ptid: parent task ID.
 *
 *		int mpp_output():	
 *			Send all pending packets to nodes via native send. Node number
 *			and process type are extracted from task ID.
 *
 *		int mpp_mcast(int src, struct pkt pp, int tids[], int ntask):	
 *			Global send.
 *			src:	source task ID;
 *			pp:		packet;
 *			tids:	list of destination task IDs;
 *			ntask:	how many.
 *
 *		int mpp_probe():	
 *			Probe for pending packets from nodes (non-blocking). Returns
 *			1 if packets are dectected, otherwise 0.
 *
 *		void mpp_input():	
 *			Receive pending packets (from nodes) via native recv.
 *
 *		struct task *mpp_find(int pid):
 *			Find a task in task table by its Unix pid.
 *
 *		void mpp_free(struct task *tp):
 *			Remove node/process-type from active list.
 *			tp: task pointer.
 *
 */

/*
 *
 * $Log: pvmdmimd.c,v $
 * Revision 1.10  2002/02/21 23:19:28  pvmsrc
 * Added new (not to be documented!) PVM_MAX_TASKS env var support.
 * 	- for Mahir Lokvancic <mahir@math.ufl.edu>.
 * 	- forcefully limits the number of tasks that can attach to a
 * 		pvmd, required on Solaris in rare circumstances when hard
 * 		FD_SETSIZE limit is reached, and all hell breaks loose...
 * 	- check return for task_new() call, can now produce NULL ptr,
 * 		indicating PvmOutOfRes...
 * (Spanker=kohl)
 *
 * Revision 1.9  2000/02/17 21:10:31  pvmsrc
 * Cleaned up comments...  mpp_load() args...
 * (Spanker=kohl)
 *
 * Revision 1.8  1999/07/08 19:00:27  kohl
 * Fixed "Log" keyword placement.
 * 	- indent with " * " for new CVS.
 *
 * Revision 1.7  1997/07/09  13:54:32  pvmsrc
 * Fixed Author Header.
 *
 * Revision 1.6  1997/06/02  13:49:06  pvmsrc
 * Moved #include waitc.h below #include host.h.
 *
 * Revision 1.5  1997/05/05  20:07:48  pvmsrc
 * 	Fix so that output is collected properly. One bug remains:
 * 	tasks can exit before all of the output is collected. Hmm.
 *
 * Revision 1.4  1997/04/29  20:30:27  pvmsrc
 * 	Use new call sequences defined in mppmsg.h
 * 	Use the asynchronous send list management functions.
 *
 * Revision 1.3  1997/03/06  21:19:42  pvmsrc
 * 		Wholesale changes to this code. Tried for simplification.
 * 		still needs work.
 *
 * Revision 1.2  1997/01/28  19:30:31  pvmsrc
 * New Copyright Notice & Authors.
 *
 * Revision 1.1  1996/09/23  23:13:45  pvmsrc
 * Initial revision
 *
 * Revision 1.12  1995/11/02  15:59:01  manchek
 * fixed so spawned tasks inherit pvmd environment plus parent task env
 *
 * Revision 1.11  1995/07/28  20:30:38  manchek
 * pvmtxt should have been etext
 *
 * Revision 1.10  1995/07/25  17:40:26  manchek
 * mpp_output returns int
 *
 * Revision 1.9  1995/07/24  20:03:15  manchek
 * message header no longer part of packet data, goes in pkt struct.
 * drivers must strip and reconstitute headers
 *
 * Revision 1.8  1995/06/16  16:16:06  manchek
 * mpp_load passes trace and output sink to task
 *
 * Revision 1.7  1995/06/12  16:03:20  manchek
 * added PGON partition size to pvminfo array
 *
 * Revision 1.6  1995/05/30  17:21:41  manchek
 * Handle absolute path name properly in mpp_load() (changes from forkexec).
 * mpp_free() takes struct task instead of tid.
 * Declare partsize static.
 * Fixed bug in node allocation in mpp_new().
 * mpp_new() opens pipe to collect stdout
 *
 * Revision 1.5  1995/02/01  20:51:05  manchek
 * added nenv and envp args to mpp_load
 *
 * Sat Dec  3 14:54:20 EST 1994
 *		copy new code that handles absolute filename to mpp_load() 
 *		from forkexec()
 *
 * Revision 1.4  1994/11/07  21:30:45  manchek
 * Modify mpp_output() and mpp_mcast() to send a null packet to alert precv().
 * Modify mpp_input() to handle DataInPlace properly.
 * mpp_new() should return PvmOutOfRes when it runs out of nodes.
 * Fix a bug in the way ptype is computed in mpp_mcast()
 *
 * Revision 1.3  1994/06/03  20:54:52  manchek
 * version 3.3.0
 *
 * Revision 1.2  1993/12/20  15:39:49  manchek
 * patch 6 from wcj
 *
 * Revision 1.1  1993/08/30  23:36:09  manchek
 * Initial revision
 *
 * Jul 12 23:57:07 EDT 1993
 *      deleted loclinput(), and merged loclinpkt() into pvmd.c
 */

#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/stat.h>
#include <errno.h>
#include <stdio.h>
#ifdef  SYSVSTR
#include <string.h>
#define CINDEX(s,c) strchr(s,c)
#else
#include <strings.h>
#define CINDEX(s,c) index(s,c)
#endif

#if defined(IMA_PGON)
#include <nx.h>
#endif

#if defined(IMA_PGONPUMA)
#include <hostlib.h>
#endif
#include <pvm3.h>
#include <pvmproto.h>
#include "global.h"
#include "pvmalloc.h"
#include "host.h"
#include "waitc.h"
#include "pkt.h"
#include "task.h"
#include "listmac.h"
#include "pvmdmp.h"
#include "pvmmimd.h"
#include "bfunc.h"
#include "lmsg.h"
#include "mppchunk.h"
#include "mppmsg.h"

#ifndef min
#define min(a,b)	((a)<(b)?(a):(b))
#endif


#if defined(IMA_PGONPUMA)
#include <hostlib.h>
#endif
/* Global */

extern char **pvmcopyenv();
extern int pvmenvinsert();
extern int pvmfreeenv();

extern char **environ;

extern int pvmdebmask;			/* from pvmd.c */
extern char **epaths;			/* from pvmd.c */
extern int myhostpart;			/* from pvmd.c */
extern struct htab *hosts;		/* from pvmd.c */
extern int tidhmask;			/* from pvmd.c */
extern int ourudpmtu;			/* from pvmd.c */

int tidtmask = TIDPTYPE;		/* mask for ptype field of tids */
int tidnmask = TIDNODE;			/* mask for node field of tids */

#if defined(IMA_PGONPUMA)
static short phystolmap[2048];		/* XXX HACK. match physical to logical node*/
#endif
/* private */

static struct nodeset *busynodes;	/* active nodes; ordered by proc type */
static char etext[512];			/* scratch for error log */
static int ptypemask;			/* mask; we use these bits of ptype in tids */
static long isendmid = -1;		/* msg ID returned by isend() */
static struct pkt *outpkt = 0;	/* packet being sent */
static int partsize;			/* size of partition */
static int taskstdout;			/* stdout/stderr of pvmd and node tasks */
static int myndf = 0;			/* obsolete.  Fix this from pvmd.c */


/* information to handle multiple recv bufs */
static MPP_DIRECTI_PTR mpdirect = (MPP_DIRECTI_PTR) NULL; 

/* the receive buffers, themselves */
static MSG_INFO_PTR	mprecvbufs =  (MSG_INFO_PTR) NULL; 

/* need to keep track of packets that need to be written to tasks */
static struct pkt *mppopq = (struct pkt *) NULL; 

/* array of mids of outstanding messages to be sent to backend */
#define NMPPSBUFMIDS 32
#define MPPMIDFREE -1
#define MPPMIDALLOCED -2
static msgmid_t mppsendmids[NMPPSBUFMIDS];
static struct pkt *mppoutpkts[NMPPSBUFMIDS]; 
static int lastidx = 0;

static int appid = -1;
static int killing_appid = 0;

void
mpp_init(argc, argv)
	int *argc;
	char **argv;
{
	int i;
	int cc;

#if defined(IMA_PGON) 
	if ((partsize = nx_initve((char *)0, 0, "", argc, argv)) < 0) 
	{
		pvmlogperror("mpp_init() nx_initve\n");
		pvmbailout();
	}
	sprintf(etext, "using %d nodes\n", partsize);
	pvmlogerror(etext);
#endif

#if defined(IMA_PGONPUMA)
   	if ( (cc = host_lib_init()) < 0)
	{
		pvmlogperror("mpp_init(): host_lib_int\n"); 	
		pvmbailout();
	}
	for (i=0; i < (sizeof(phystolmap)/sizeof(short)); i++)
		phystolmap[i] = -1;
#endif

	busynodes = TALLOC(1, struct nodeset, "nsets");
	BZERO((char*)busynodes, sizeof(struct nodeset));
	busynodes->n_link = busynodes;
	busynodes->n_rlink = busynodes;

	ptypemask = tidtmask >> (ffs(tidtmask) - 1);

	/* need a list of packets that have been sent */
	mppopq = TALLOC(1, struct pkt , "mppopq");
	mppopq->pk_link = mppopq;
	mppopq->pk_rlink = mppopq;;	

	pvm_init_asynch_list( mppsendmids, (CHUNK_PTR *) mppoutpkts, NMPPSBUFMIDS);
}

/*
 *	allocate a set of nodes; ptype is always 0, so all tasks can exchange
 * 	messages directly, bypassing pvmd
 *
 *	Node sets are allocated in blocks. When count processes are needed,
 *	There must be a contiguous range of free nodes of size count 
*/

struct nodeset *
mpp_new(count, ptid)
	int count;		/* number of nodes requested */
	int ptid;		/* parent's tid */
{

	static int first = 1;
	int i;
	int last = -1;
	int pfd[2];		/* pipe to read stdout */

	long ptype = busynodes->n_ptype;

	struct nodeset *sp, *newp;

	ptype = 0;

#if defined(IMA_PGONPUMA)
	/* XXX Only run a single PUMA partition for now */
	if (appid != -1)
	{
		pvmlogerror("mpp_new() only one partition allowed, sorry\n");
		return (struct nodeset *) 0;
	}
	partsize = count;
#endif
	if (first)	/* take care of task input/output */ 
	{
		if (pipe(pfd) != -1)  	/* pfd[0] is a read, pfd[1] is a write */
		{
			dup2(pfd[1], 1);	/* send standard output write end of pipe */
			dup2(1, 2);			/* and stderr to write end of pipe */
			taskstdout = pfd[0];
			wrk_fds_add(taskstdout, 1);
		} 
		else
		{
			pvmlogperror("mpp_new() pipe");
		}

		first = 0;
	}

	if (!(newp = TALLOC(1, struct nodeset, "nsets"))) 
	{
		pvmlogerror("nodes_new() can't get memory\n");
		pvmbailout(0);
	}
	BZERO((char*)newp, sizeof(struct nodeset));
	
	newp->n_size = count;

	/* this loop is bypassed on the first mpp_new */
	for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) 
	{
		if (sp->n_ptype == ptype) 
		{
			if (sp->n_first - last > count)
				goto done;
			else 
			{
				last = sp->n_first + sp->n_size - 1;
				if (sp->n_link == busynodes && partsize - last > count)
					goto done;
			}
		} 
		else if (sp->n_ptype > ptype) 
		{
			if (partsize - last > count)
				goto done;
			else 
			{
				ptype = sp->n_ptype;	/* go to the next layer */
				last = -1;
			}
		}
	} /* end the for loop */

	if (partsize - last <= count) 
	{
		pvmlogerror("mpp_new() not enough nodes in partition\n");
		PVM_FREE(newp);
		return (struct nodeset *) 0;
	}

done:
	if (pvmdebmask & PDMNODE) {
		sprintf(etext, "mpp_new() %d nodes %d ... ptype=%d ptid=%x\n",
			count, last+1, ptype, ptid);
		pvmlogerror(etext);
	}
	newp->n_first = last + 1;
	newp->n_ptype = ptype;
	newp->n_alive = count;
	newp->n_ptid = ptid;

	LISTPUTAFTER(sp, newp, n_link, n_rlink);

	return newp;
}

/* remove nodes/ptype from active list */
void
mpp_free(tp)
	struct task *tp;
{
	struct nodeset *sp;
	int i;
	int node;
	int ptype;
	int tid = tp->t_tid;

	if (!TIDISNODE(tid))
		return;

	node = tid & tidnmask;
	ptype = TIDTOTYPE(tid);
	tp->t_out = -1;

	for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) 
	{
		if ((sp->n_ptype & ptypemask) == ptype && node >= sp->n_first
				&& node - sp->n_first < sp->n_size) 
		{

			if (pvmdebmask & PDMNODE) {
				sprintf(etext, "mpp_free() t%x type=%ld alive=%d\n",
					tid, sp->n_ptype, sp->n_alive);
				pvmlogerror(etext);
			}
			if (--sp->n_alive == 0) 
			{
				LISTDELETE(sp, n_link, n_rlink);
				PVM_FREE(sp);

#if defined(IMA_PGONPUMA)
				if (pvmdebmask & PDMNODE) {
					sprintf(etext, "mpp_free() freeing %d recv bufs \n", NRBUFS);
					pvmlogerror(etext);
				}
				for (i=0; i < NRBUFS; i++)
				{
					pk_free(mprecvbufs->rpkt);
					mprecvbufs++;
				}

				mprecvbufs = (MSG_INFO_PTR) NULL;

				if (pvmdebmask & PDMNODE) {
					sprintf(etext, "mpp_free() mprecvbufs is %x \n", mprecvbufs);
					pvmlogerror(etext);
				}

				for (i=0; i < partsize; i++)
				{
					if (mpdirect->ordering)
						PVM_FREE(mpdirect->ordering);
					PVM_FREE(mpdirect);
					mpdirect ++;
				}

				mpdirect = (MPP_DIRECTI_PTR) NULL;

				partsize = -1;

				if (!killing_appid)
					kill_app(appid);

				disc_app(appid);

				appid = -1;

				killing_appid = 0;
#endif

			}
			return;
		}
	}
	sprintf(etext, "mpp_free() t%x not active\n", tid);
	pvmlogerror(etext);
	return;
}

/* load executable onto the given set of nodes */
int
mpp_load( wxp )
struct waitc_spawn *wxp;
{
int flags;              /* exec options */
char *name;             /* executable */
char **argv;            /* arg list (argv[-1] must be there) */
int count;				/* how many */
int *tids;				/* array to store new tids */
int ptid;				/* parent task ID */
int nenv;				/* length of environment */
char **envp;			/* environment strings */
int outtid;				/* output tid */
int outcod;				/* output code */
int trctid;				/* tid for trace messages */
int trccod;				/* code to use on trace messages */

    char **ep, **eplist;
	static char *nullep[] = { "", 0 };
    char path[MAXPATHLEN];
	char **cenv;

	int i, cc;
	int err = 0;
	static int first = 1;
	int j;
	int cnode;
	int cpid;
	int pvminfo[SIZEHINFO];	/* proto, myset, parent tid, frag size, NDF */
	int ptypepart;

	int *pids = 0;			/* array of OSF/1 process IDs */
	int *nodes = 0;
	int nnodes;			/* number of nodes to needed */

	msgmid_t mid;

	info_t minfo[MPPINFOSIZE];

	struct nodeset *sp;

    struct stat sb;

	struct task *tp;

	MPP_DIRECTI_PTR tdirect;

	MSGFUNC_PTR mfunc;

	/* -- initialize some variables from the struct waitc_spawn  struct -- */

	name = wxp->w_argv[0];
	argv = wxp->w_argv;
	count = wxp->w_veclen;
	tids = wxp->w_vec;
	ptid = wxp->w_ptid;
	nenv = wxp->w_nenv;
	envp = wxp->w_env; 
	

	mfunc = pvm_hostmsgfunc();

    eplist = CINDEX(name, '/') ? nullep : epaths;

	/* -- look for the executable name in the executable path (ep) -- */
    for (ep = eplist; *ep; ep++) 
	{
		strcpy(path, *ep);	 /* search for file */

		if (path[0])
			(void)strcat(path, "/");

		strncat(path, name, sizeof(path) - strlen(path) - 1);

        if (stat(path, &sb) == -1
                || ((sb.st_mode & S_IFMT) != S_IFREG)
                || !(sb.st_mode & S_IEXEC)) 
		{
            if (pvmdebmask & PDMTASK) 
			{
                sprintf(etext, "mpp_load() stat failed <%s>\n", path);
                pvmlogerror(etext);
            }
			continue;	/* try next path element */
        }

		/* Here we have found an executable -- try to start it */

		if ( !(pids = TALLOC(count, int, "pids")) 
				||  !(nodes = TALLOC(count, int, "nodes"))) 
		{
			err = PvmNoMem;
			goto done;
		}

		if (!(sp = mpp_new(count, ptid)))  
		{
			err = PvmOutOfRes;
			goto done;
		}

		for (j = 0; j < count; j++)
			nodes[j] = sp->n_first + j;

		/* copy the pvmd's environment, augment with what is passed to us */
		cenv = pvmcopyenv(environ);
		while (nenv > 0)
			pvmenvinsert(&cenv, envp[--nenv]);


		/* Do the load, return how many were actually started */

		err = nx_loadve((long *) nodes, (long) count, (long) sp->n_ptype, (long *)pids, path, argv, cenv);

		pvmfreeenv(cenv); /* free the copied environment */

		if (err < count)  /* loadve yacked ?? */
		{
			sprintf(etext, "mpp_load() loaded only %d <%s>\n", err, path);
			pvmlogerror(etext);
			err = PvmDSysErr;
			goto done;
		}

		if (first)		/* first time through, do some initialization */ 
		{
			if ( _setptype(PVMDPTYPE) < 0)
        		pvmlogperror("mpp_load() setptype to PVMDPTYPE");
			first = 0;

			/* Set up the prealloced buffers      */
			mpdirect = new_vdirectstruct( partsize, NRBUFS, NSBUFS );	

			mprecvbufs = init_recv_list(NRBUFS, PMTDBASE, 
					MAXFRAGSIZE, 0, MPPANY, mfunc);

		}


		/* configure the tasks
		/* alright, send some important information to the tasks that we
			just started */

		ptypepart = ((sp->n_ptype & ptypemask) << (ffs(tidtmask) - 1)) 
					| TIDONNODE;
		pvminfo[0] = TDPROTOCOL;
		pvminfo[1] = myhostpart + ptypepart;
		pvminfo[2] = ptid;
		pvminfo[3] = MAXFRAGSIZE;
		pvminfo[4] = myndf;
		sprintf(etext, "partsize is %d\n",partsize); 
		pvmlogerror(etext);
		pvminfo[5] = partsize;
		pvminfo[6] = wxp->w_outtid;
		pvminfo[7] = wxp->w_outtag;
		pvminfo[8] = wxp->w_outctx;
		pvminfo[9] = wxp->w_trctid;
		pvminfo[10] = wxp->w_trctag;
		pvminfo[11] = wxp->w_trcctx;

		if (pvmdebmask & PDMTASK) 
		{
			sprintf(etext, "mpp_load() %d type=%ld ptid=%x pid%ld... t%x...\n",
					count, sp->n_ptype, ptid, pids[0], 
					myhostpart + ptypepart + sp->n_first);

			pvmlogerror(etext);
		}

		pvmlogerror("Starting configuration Message send Loop\n");
		for (i = 0; i < count; i ++ )
		{
			cnode =  nodes[i];
			if ( (*mfunc->imsgsend)(appid, PMTCONF,  (char *) pvminfo, 
					sizeof(pvminfo), cnode, PVMDPTYPE, &mid) < 0) 
			{
				pvmlogperror("mpp_load() configuration message");
				err = PvmDSysErr;
				goto done;
			}
			while (! ((*mfunc->msgdone)(appid, &mid, minfo)));
		}
		pvmlogerror("Finished Message send Loop\n");


		/* create new task structs */

		for (j = 0; j < count; j++) 
		{
			if (pids[j] > 0) 
			{
				if ((tp = task_new(myhostpart + ptypepart
						+ sp->n_first + j)) == NULL) {
					err = PvmOutOfRes;
					goto done;
				}
				task_setpid(tp, pids[j]);
				tp->t_a_out = STRALLOC(name);
				tp->t_ptid = ptid;
				tp->t_flag |= TF_CONN;		/* no need for the authorization */
				tp->t_out = taskstdout;
				tp->t_outtid = wxp->w_outtid;
				pvmlogprintf("setting output context of %x to %x\n",
								tp->t_tid, tp->t_outtid);
				tp->t_outctx = wxp->w_outctx;
				tp->t_outtag = wxp->w_outtag;
				tp->t_trctid = wxp->w_trctid;
				tp->t_trcctx = wxp->w_trcctx;
				tp->t_trctag = wxp->w_trctag;
				tids[j] = tp->t_tid;

			/* initialize the sequence numbers for this task */
				fill_directstruct (mpdirect + nodes[j], NSBUFS, tp->t_tid, 
						pids[j], PMTDBASE, 0, appid);   
				init_chunkostruct( (mpdirect + nodes[j])->ordering, NRBUFS);

			} else
				tids[j] = PvmDSysErr;
		}

		if (pids)
			PVM_FREE(pids);

		if (nodes)
			PVM_FREE(nodes);

		return 0;
	}


	/* here, only if there is an error in the way things were spawned */
	if (pvmdebmask & PDMTASK) 
	{
		sprintf(etext, "mpp_load() didn't find <%s>\n", name);
		pvmlogerror(etext);
	}

	err = PvmNoFile;

done:
	for (j = 0; j < count; j++)
		tids[j] = err;

	if (pids)
		PVM_FREE(pids);

	return err;
}


/* input from node tasks */
int
mpp_input()
{
	static int cbuf = 0;
	struct pkt *pp = 0;

	int buf; 				/* buffer we are working on */
	int mxbufs = NRBUFS;	/* maximum number of buffers we will proc on read */
	int npkts = 0;
	int tsrc, src;
	int tag;
	int pid = 0;
	int mid;

	MPP_DIRECTI_PTR tcon;

	static CHUNK_PTR readyList = (CHUNK_PTR) NULL;

	struct pkt *hdr; 

	if ( mprecvbufs == (MSG_INFO_PTR) NULL )  
	{
		/* we haven't alloc'ed any buffers. No tasks loaded */
		return npkts;
	}

	for (buf = 0; buf < mxbufs; buf++) /* limit #of packets to read */
	{
		if (pp = (struct pkt *) pvm_chunkReady(mprecvbufs, mxbufs,
				pvm_hostmsgfunc(), mpdirect, partsize, &cbuf,
				&readyList) )
		{
#if defined(IMA_PGONPUMA)
			src = extract_lnid(src);
#endif
			npkts ++;

			/* Step 2) extract header information from the packet */

			pp->pk_dst = pvmget32(pp->pk_dat);
			pp->pk_src = pvmget32(pp->pk_dat + 4);
			pp->pk_flag = pvmget8(pp->pk_dat + 12);
			pp->pk_len -= TDFRAGHDR;
			pp->pk_dat += TDFRAGHDR;
			if (pp->pk_flag & FFSOM) 
			{
				if (pp->pk_len < MSGHDRLEN) 
				{
					sprintf(etext,
							"mpp_input() SOM pkt src t%x dst t%x short\n",
							pp->pk_src, pp->pk_dst);
					pvmlogerror(etext);
					pk_free(pp);
					return;
				}
				pp->pk_enc = pvmget32(pp->pk_dat);
				pp->pk_tag = pvmget32(pp->pk_dat + 4);
				pp->pk_ctx = pvmget32(pp->pk_dat + 8);
				pp->pk_wid = pvmget32(pp->pk_dat + 16);
				pp->pk_crc = pvmget32(pp->pk_dat + 20);
				pp->pk_len -= MSGHDRLEN;
				pp->pk_dat += MSGHDRLEN;
			}
		
			/* Step 3) Deliver packet to the PVM processing input stream */

			loclinpkt((struct task *)0, pp);

		}
		else
			break;	/* no packets to read */

	}

	return npkts;
}

/* output to node tasks */
/* this is supposed to be like pkt_to_task from the pvmd. 
	if pp is NULL then mpp_output should try to write any
	packets that are on the output queue
*/ 
int
mpp_output(tp, pp)
	struct task *tp;
	struct pkt *pp;
{
	char *cp;
	int len;
	long node;				/* node number */
	long ptype;				/* process type */
	struct nodeset *sp;

	int cc;
	int i;
	int idx1;
	int tag;

	struct pkt * delpkt;
	struct pkt * npkt;
	struct pkt * pp2;
	struct pkt * tstpkt;
	struct task *tp2;

	MPP_DIRECTI_PTR tdirect;

	MSGFUNC_PTR	mfunc;

	mfunc = pvm_hostmsgfunc();

	/* check if we have any packets to queue up and send */
	if (tp && pp )
	{
		node = tp->t_tid & tidnmask;

		ptype = (tp->t_tid & tidtmask) >> (ffs(tidtmask) - 1);

#if defined(IMA_PGON)
		/* XXX ptype is not used for PUMA host-node communication */
		for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link)
		{
			if ((sp->n_ptype & ptypemask) == ptype) 
			{
				ptype = sp->n_ptype;
				break;
			}
		}
		if (sp == busynodes) 
		{
			sprintf(etext, "mpp_output() pkt to t%x scrapped (no ptype %ld)\n",
				tp->t_tid, ptype);
			pvmlogerror(etext);
			pk_free(pp);
			goto done;
		}

#endif
		cp = pp->pk_dat;

		len = pp->pk_len;

		if (pp->pk_flag & FFSOM) 
		{
			cp -= MSGHDRLEN;

			len += MSGHDRLEN;

			if (cp < pp->pk_buf) {
				pvmlogerror("mpp_output() no headroom for message header\n");
				pk_free(pp);
				goto done;
			}
			/* If we are not an MCA'ed pkt OR FFMCAWH (MCA write hdr)
											is set, then write the header */
			if ( !(pp->pk_flag & FFMCA)  ||  pp->pk_flag & FFMCAWH)
			{
				pvmput32(cp, pp->pk_enc);
				pvmput32(cp + 4, pp->pk_tag);
				pvmput32(cp + 8, pp->pk_ctx);
				pvmput32(cp + 16, pp->pk_wid);
				pvmput32(cp + 20, pp->pk_crc);
				if (pvmdebmask & PDMMCA)
				{
					sprintf(etext, "mpp_output wrt TDMSGHDR for %x\n", pp->pk_buf);
					pvmlogerror(etext);
				} 
			}

		}
		if (pvmdebmask & PDMPACKET) {
			sprintf(etext,
				"mpp_output() src t%x dst t%x ff %x len %d ptype %d\n",
				pp->pk_src, pp->pk_dst, pp->pk_flag & (FFSOM|FFEOM),
				len, ptype);
			pvmlogerror(etext);
		}
		cp -= TDFRAGHDR;
		if (cp < pp->pk_buf) {
			pvmlogerror("mpp_output() no headroom for packet header\n");
			pk_free(pp);
			goto done;
		}
		/* If we are not an MCA'ed pkt OR FFMCAWH (MCA write hdr)
									is set, then write the header */
		if ( !(pp->pk_flag & FFMCA)  ||  pp->pk_flag & FFMCAWH)
		{
			pvmput32(cp, pp->pk_dst);
			pvmput32(cp + 4, pp->pk_src);
			pvmput32(cp + 8, len);
			pvmput32(cp + 12, 0);
			pvmput8(cp + 12, pp->pk_flag & (FFSOM|FFEOM|FFMCA));
			if (pvmdebmask & PDMMCA)
			{
				sprintf(etext, "mpp_output wrt TDFRAGHDR for %x\n", pp->pk_buf);
				pvmlogerror(etext);
			} 
		}
		len += TDFRAGHDR;

		pp->pk_dat = cp; 	/* store where the data starts */ 
		pp->pk_len = len;	/* store the length of the buffer */

		/* okay we've done everything to the packet, now put it on the output
			queue */

		LISTPUTBEFORE(mppopq, pp, pk_link, pk_rlink);
	}

	tstpkt = mppopq -> pk_link;
	while (tstpkt != mppopq)
	{
		/* find free index */
		if ( (idx1 = pvm_mpp_find_midx(mppsendmids, (CHUNK_PTR *) mppoutpkts, 
					&lastidx, NMPPSBUFMIDS, mfunc) ) >= 0 ) 
		{
			node = tstpkt->pk_dst & tidnmask;

			tdirect = mpdirect + node;

			ptype = (tstpkt->pk_dst & tidtmask) >> (ffs(tidtmask) - 1);

			tag = PMTDBASE + tdirect->sseq;

			if (++(tdirect->sseq) >= tdirect->nbufs)
				tdirect->sseq = 0;

			/* send the actual packet of information */
			/* Hmm, should check here for long packets  ... ?? */
			/* XXX appid needs to be checked here for PUMA */ 
#if defined(IMA_PGONPUMA)
			ptype = extract_pid(appid, node);
			node = extract_nid(appid, node);
#endif
			
			len = tstpkt->pk_len;

			if (len < 16)
			{
				sprintf(etext,"mpp_output() bogus len %d? (dst %x, ff %d)\n",
					len, tstpkt->pk_dst, tstpkt->pk_flag);
		
				pvmlogperror(etext);
			}
			(*mfunc->imsgsend)(appid, tag, tstpkt->pk_dat, len, 
					node, ptype, &cc); 
			pvm_assign_mid(mppsendmids, cc, idx1);
			if (cc < 0 ) 
			{
				sprintf(etext,"mpp_output() can't send to t%x", tstpkt->pk_dst);
				pvmlogperror(etext);

				/* clear the output queue of packets for this tid in the output
				   queue */	
				npkt = tstpkt->pk_link;
				while (npkt != tstpkt)
				{
					if ( npkt->pk_dst == tstpkt->pk_dst)
					{
						delpkt = npkt;
						npkt = npkt->pk_link;
						LISTDELETE(delpkt,pk_link, pk_rlink);
						pk_free(delpkt);
					}
					else
						npkt = npkt->pk_link;
				} /* mppopq queue is cleared */

				delpkt = tstpkt;

				tstpkt = tstpkt->pk_link;

				LISTDELETE(delpkt, pk_link, pk_rlink);

				tp2 = task_find(delpkt->pk_dst);	

				pk_free(delpkt);

				mpp_free(tp);

				tp2 = tp2->t_rlink;

				task_cleanup(tp2->t_link);

				task_free(tp2->t_link);

			} 
			else  /* isend was ok, remember the pkt, go onto the next one */
			{
				pvm_assign_chunk((CHUNK_PTR *) mppoutpkts, (CHUNK_PTR) tstpkt,
								 idx1);

				if (pvmdebmask & (PDMPACKET | PDMNODE)) 
				{
					sprintf(etext,
						"mpp_output() from mppopq src t%x dst t%x len %d ptype %d flags %s\n",
						tstpkt->pk_src, tstpkt->pk_dst, len, ptype,
						pkt_flags(tstpkt->pk_flag));
					pvmlogerror(etext);
				}

				/* remove the packet from the mppopq */ 
				delpkt = tstpkt;
				tstpkt = tstpkt->pk_link;

				LISTDELETE(delpkt,pk_link,pk_rlink);
			}
		}
		else
		{
			if (pvmdebmask & PDMPACKET) {
				sprintf(etext,
					"mpp_output() no free send message ids.\n");
				pvmlogerror(etext);
			}
			pvmlogerror("(*)\n");
			return 0;
		}
	}
done:
	if (tp && tp->t_flag & TF_CLOSE) {
	    mpp_free(tp);
		tp = tp->t_rlink;
/* XXX tm_exit() also calls task_cleanup(); should only be done once */
		task_cleanup(tp->t_link);
		task_free(tp->t_link);
	}
	return 0;
}


/* probe for messages from node tasks */
int
mpp_probe()
{
	if (busynodes->n_link != busynodes)
	{
		return 1; /* don't probe messages -- can do this in one-shot at recv*/
	}
	else	/* task queue empty */
		return 0;
}

#if defined(IMA_PGONPUMA)
int
extract_lnid(pnid)
int pnid;
{
	if (pnid >=0 && pnid < (sizeof(phystolmap)/sizeof(short)))
		return phystolmap[pnid];
	else
		return -1;
}

void
mpp_kill(tp, signum)
	struct task *tp;
	int signum;
{
	struct nodeset *sp;
	int cid;

	if (TIDISNODE(tp->t_tid)) 
	{
		if (signum == SIGKILL || signum == SIGTERM) 
		{
       	    		mpp_free(tp);
/* XXX tm_exit() also calls task_cleanup(); should only be done once */
			tp = tp->t_rlink;
			task_cleanup(tp->t_link);
			task_free(tp->t_link);
		} else {
			sprintf(etext,"mpp_kill() signal %d to node t%x ignored\n",
				signum, tp->t_tid);
			pvmlogerror(etext);
		}
	} 
	else 	/* Not a node */
		(void)kill(tp->t_pid, signum);
}

#endif

/* ----------- find_direct ---------- */
/* this is a hack to find the correct ordering structure for
	a node. 
*/
MPP_DIRECTI_PTR 
pvm_find_direct (dlist, nstruct, node)
MPP_DIRECTI_PTR dlist;
int nstruct;
int node;
{
	node = node & TIDNODE;  /* make sure this is a node */

	if ( dlist && dlist == mpdirect )
	{
			return dlist + node;
	}	
}

struct msgid *
pvm_mpp_get_precvids()
{
	return (struct msgid *) NULL;
}
		 


syntax highlighted by Code2HTML, v. 0.9.1