static char rcsid[] =
	"$Id: pvmdshmem.c,v 1.21 2004/01/14 18:51:02 pvmsrc Exp $";

/*
 *         PVM version 3.4:  Parallel Virtual Machine System
 *               University of Tennessee, Knoxville TN.
 *           Oak Ridge National Laboratory, Oak Ridge TN.
 *                   Emory University, Atlanta GA.
 *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
 *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
 *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
 *                   (C) 1997 All Rights Reserved
 *
 *                              NOTICE
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby granted
 * provided that the above copyright notice appear in all copies and
 * that both the copyright notice and this permission notice appear in
 * supporting documentation.
 *
 * Neither the Institutions (Emory University, Oak Ridge National
 * Laboratory, and University of Tennessee) nor the Authors make any
 * representations about the suitability of this software for any
 * purpose.  This software is provided ``as is'' without express or
 * implied warranty.
 *
 * PVM version 3 was funded in part by the U.S. Department of Energy,
 * the National Science Foundation and the State of Tennessee.
 */

/*
 * pvmdshmem.c
 *
 * Shared-memory MPP interface.
 *
 * $Log: pvmdshmem.c,v $
 * Revision 1.21  2004/01/14 18:51:02  pvmsrc
 * Added new AIX5* arches.
 * (Spanker=kohl)
 *
 * Revision 1.20  2002/02/21 23:18:41  pvmsrc
 * Added new (not to be documented!) PVM_MAX_TASKS env var support.
 * 	- for Mahir Lokvancic <mahir@math.ufl.edu>.
 * 	- forcefully limits the number of tasks that can attach to a
 * 		pvmd, required on Solaris in rare circumstances when hard
 * 		FD_SETSIZE limit is reached, and all hell breaks loose...
 * 	- check return for task_new() call, can now produce NULL ptr,
 * 		indicating PvmOutOfRes...
 * (Spanker=kohl)
 *
 * Revision 1.19  2001/09/25 21:21:04  pvmsrc
 * Yanked "char *pvmgettmp();" decl - now in pvm3.h...
 * (Spanker=kohl)
 *
 * Revision 1.18  2000/02/10 20:45:33  pvmsrc
 * Replaced hard-coded /tmp usage.
 * 	- use pvmgettmp() routine now to determine PVM temp dir.
 * (Spanker=kohl)
 *
 * Revision 1.17  1999/07/08 19:00:11  kohl
 * Fixed "Log" keyword placement.
 * 	- indent with " * " for new CVS.
 *
 * Revision 1.16  1998/08/13  18:31:15  pvmsrc
 * Altered SUNMP to use test and set operations with semaphores
 * 		for page locking instead of MUTEX and cond vars.
 * 	Changes are mainly in pvmshmem.h, with lots of #ifdefs changes.
 * 	Makefile altered to use the PLOCKFILE to indicate the Page Locking
 * 		INLINE code used (from SUNMP.conf).
 * 	Some changes effect AIX MP versions which still use conditional
 * 		variables and may change to semaphores soon.
 * (Spanker=fagg)
 *
 * Revision 1.15  1997/11/04  23:21:43  pvmsrc
 * Added SYSVSTR stuff.
 * (Spanker=kohl)
 *
 * Revision 1.14  1997/07/02  20:27:32  pvmsrc
 * 	Fixed startup race on shmem to that a shmem task can get fully
 * 	configured before getting any messages.
 * 	This involved adding two states
 * 	TF_PRESHMCONN and TF_SHM.  TF_PRESHMCONN indicates that messages
 * 	with MM_PRIO set can be sent to a task, but regular messages are
 * 	queued. This allows shmem tasks to be completely configured
 * 	before any messages flow.  When the daemon changes the state from
 * 	TF_PRESHMCONN to TF_SHMCONN it calls shm_wrt_pkts to write any
 * 	packets that were queued before task state changed to TF_SHMCONN.
 *
 * Revision 1.13  1997/06/27  20:54:41  pvmsrc
 * Allow forked process to be caught by daemon
 * 	when its not ignoring SIGCLD.
 *
 * Revision 1.12  1997/06/25  22:09:34  pvmsrc
 * Markus adds his frigging name to the author list of
 * 	every file he ever looked at...
 *
 * Revision 1.11  1997/06/24  20:39:19  pvmsrc
 * Eliminated unnecessary global externs.
 *
 * Revision 1.10  1997/06/16  13:42:10  pvmsrc
 * Upated forkexec arg list.
 *
 * Revision 1.9  1997/06/02  13:50:03  pvmsrc
 * Added missing #include host.h for waitc.h.
 *
 * Revision 1.8  1997/06/02  13:26:43  pvmsrc
 * Changed mpp_input so that spawned processes can be claimed after
 * 	exiting.  Previously they were claimed as their shadow t0 died
 * 	and set a flag in work() that would get caught leading to
 * 	mpp_dredge() finding them sometime later...
 *
 * Revision 1.7  1997/05/21  16:01:53  pvmsrc
 * Updated ifdefs to include AIX4MP arch type.
 *
 * Revision 1.6  1997/05/19  15:03:01  pvmsrc
 * pvmfrgsiz is now set according to memory page size and msg header
 * 	lengths instead of just the UDP host-host packet size!
 * Mesg headers are set correctly in mpp_input / mpp_output to include
 * 	contexts etc
 *
 * Revision 1.5  1997/04/25  19:17:29  pvmsrc
 * Mashed to make it work within 3.4 new messaging system. Added:
 * 	-	correct includes
 * 	-	changed ph_cod's to ph_tag's and pk_cod -> pk_tag
 * 	-	add duplicate of pvmdunix ppi_kill()
 *
 * Revision 1.4  1997/01/28  19:27:23  pvmsrc
 * New Copyright Notice & Authors.
 *
 * Revision 1.3  1996/10/25  13:58:01  pvmsrc
 * Replaced old #includes for protocol headers:
 * 	- <pvmsdpro.h>, "ddpro.h", "tdpro.h"
 * With #include of new combined header:
 * 	- <pvmproto.h>
 *
 * Revision 1.2  1996/10/24  21:04:50  pvmsrc
 * Moved #include of "global.h" down below other headers:
 * 	- need to have all of the structures / types declared before
 * 		the globals can be declared...
 *
 * Revision 1.1  1996/09/23  23:44:33  pvmsrc
 * Initial revision
 *
 * Revision 1.20  1995/11/02  16:31:23  manchek
 * skip over stale packets from dead tasks in mpp_input
 *
 * Revision 1.19  1995/09/05  19:24:05  manchek
 * mpp_input copies all pages for now (in case sender task exits)
 *
 * Revision 1.18  1995/07/25  17:35:32  manchek
 * mpp_probe cancels retry to a task if not ready.
 * mpp_output returns success code
 *
 * Revision 1.17  1995/07/24  21:48:25  manchek
 * mpp_output puts pkts in order (at the end) of ovfpkts.
 * mpp_probe consumes ovfpkts in order to retry
 *
 * Revision 1.16  1995/07/24  19:22:33  manchek
 * message, frag headers passed in inbox shmpkhdr instead of databuf
 * changes in mpp_input, mpp_output.
 * removed mpp_mcast - wasn't called
 *
 * Revision 1.15  1995/07/12  01:12:14  manchek
 * do nothing in mpp_free if tid is zero.
 * peer_detach now frees peer struct.
 * mpp_dredge can reclaim pidtid slot immediately if task not registered
 *
 * Revision 1.14  1995/07/05  16:21:40  manchek
 * added ST_FINISH to pidtid_dump
 *
 * Revision 1.13  1995/07/05  16:16:29  manchek
 * added mpp_dredge function to skim pidtid table for zombies and call
 * task_cleanup and task_free.
 * mpp_free (called from task_free) now reclaims pidtid table entry
 *
 * Revision 1.12  1995/07/03  19:55:47  manchek
 * hellish cleanup of comments and formatting.
 * removed POWER4 ifdefs.
 * removed removeshm().
 * added mpp_setstatus() and pidtid_dump()
 *
 * Revision 1.11  1995/06/28  15:50:57  manchek
 * added arg to peer_conn calls
 *
 * Revision 1.10  1995/06/19  17:45:01  manchek
 * inc refcount before signalling in mpp_output
 *
 * Revision 1.9  1995/06/02  16:21:10  manchek
 * fixed references to detached memory segments
 *
 * Revision 1.8  1995/05/22  19:51:37  manchek
 * added ifdefs for RS6KMP
 *
 * Revision 1.7  1995/05/18  17:22:25  manchek
 * need to export pvminbox and myshmbufid
 *
 * Revision 1.6  1995/05/17  16:41:29  manchek
 * changed global mytid to pvmmytid.
 * changed inbox to pvminbox and mybufid to myshmbufid.
 * added support for CSPP shared memory.
 * unset TF_FORKD flag if task doesn't reconnect with expected pid
 *
 * Revision 1.5  1995/02/06  22:42:01  manchek
 * new function mpp_setmtu, called before slave_config
 *
 * Revision 1.4  1995/02/01  21:35:06  manchek
 * added nenv and envp args to mpp_load, which passes them to forkexec
 *
 * Revision 1.3  1994/11/08  15:35:07  manchek
 * shared memory damage control
 *
 */


#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <fcntl.h>
#ifdef IMA_SYMM
#include <sys/file.h>		/* XXX for open(); change to fcntl.h in ptx? */
#include <parallel/parallel.h>
#endif
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <unistd.h>
#include <stdlib.h>
#ifdef  SYSVSTR
#include <string.h>
#else
#include <strings.h>
#endif

#include <pvm3.h>
#include <pvmproto.h>
#include "pvmalloc.h"
#include "pvmdabuf.h"
#include "pkt.h"
#include "task.h"
#include "listmac.h"
#include "pvmshmem.h"
#include "bfunc.h"
#include "host.h"
#include "waitc.h"
#include "global.h"

#ifndef max
#define max(a,b)    ((a)>(b)?(a):(b))
#endif

#ifndef min
#define min(a,b)    ((a)<(b)?(a):(b))
#endif

char *getenv();


extern int pvmdebmask;			/* from pvmd.c */
extern int pvm_useruid;			/* from pvmd.c */
extern int pvmmytid;			/* from pvmd.c */
extern int pvmmydsig;			/* from pvmd.c */
extern int pvmudpmtu;			/* from pvmd.c */
extern int pvmmyupid;			/* from pvmd.c */
extern struct peer *peers;		/* from pvmshmem.c */


/***************
 **  Globals  **
 **           **
 ***************/

char *outmsgbuf = 0;			/* outgoing message buffer */
int outbufsz = 0;				/* size of outgoing msg buffer */
int nbufsowned = 0;				/* num shared frags held by us */
int pgsz = 0;					/* page size */
int pvmpgsz = 0;				/* PVM virtual page size */
char *infopage = 0;				/* proto, NDF, pid-tid table */
struct pidtid *pidtids = 0;		/* pid -> tid table */
int maxpidtid = 0;				/* size of pid-tid table */
int shmbufsiz = 0;				/* shared-memory buffer size */
int myshmbufid = -1;			/* shared-memory msg buffer ID */
char *pvminbox = 0;				/* incoming message header buffer */
extern int pvmfrgsiz;			/* From pvmd.c frag size */
								/* included here as we change it in ppi_config*/

/***************
 **  Private  **
 **           **
 ***************/

static char pvmtxt[512];		/* scratch for error log */
static int inboxsz = 0;			/* size of incoming message buffer */
static struct pkt *ovfpkts = 0;	/* packets waiting to be delivered */
static int globid = -1;			/* ID of global shared segment */


/*
 *			Buffer layout
 *		 ____________________
 *		|       inbox        | 1
 *		|____________________|
 *		|   pid->tid table   | 2
 *		|____________________|
 *		|                    | 3
 *		|      outmsgbuf     |
 *      |          .         | .
 *		|          .         | .
 *		|          .         | .
 *
 *
 * Note: tasks don't keep a pid-tid table, so their outgoing message
 * buffer starts at the second page.
 *
 * To send messages to one another, the sender puts the message header,
 * which contains its own task ID and the location of the message body
 * (expressed as an offset from the head of the segment), into the inbox
 * of the intended receiver. When the addressee is ready to accept the
 * message, it reads the message header and locates the message fragments
 * in the sender's outgoing message buffer. Access to buffers are guarded
 * locks. Tasks can read the same data simultaneously, but one must obtain
 * exclusive access to the page before it can modify the data.
 *
 * Messages to tasks on other hosts are routed by pvmd.
 */


int
ppi_config(argc, argv)
	int argc;
	char **argv;
{
	pgsz = sysconf(_SC_PAGESIZE);
	pvmudpmtu = pgsz - PVMPAGEHDR; /* XXX yuck host to host limited */
	pvmfrgsiz = pvmudpmtu;	/* host-task limit.. not same as UDP host to host */
					/* except h-h is not same as it should be, see above */
	return 0;
}


/*	ppi_init()
*
*	Create our shared memory segment, initialize stuff, etc.
*
*	XXX shouldn't just return if something goes wrong
*/

void
ppi_init()
{
	struct pidtidhdr *pvminfo;
	char *pvmtmp = pvmgettmp();
	char *p;
	int key;

#ifdef LOG
	char fname[32];
	FILE *logfp;
#ifdef IMA_CSPP
	int scid = get_scid();
	if (scid > 1)
		sprintf(fname, "%s/pvmt.%d.%d", pvmtmp, pvm_useruid, scid);
	else
#else
	sprintf(fname, "%s/pvmt.%d", pvmtmp, pvm_useruid);
#endif
	logfp = fopen(fname, "w");
	fclose(logfp);
#endif	/*LOG*/

	pvmpgsz = FRAGPAGE*pgsz;
	inboxsz =
		(INBOXPAGE*pgsz - sizeof(struct msgboxhdr))/sizeof(struct shmpkhdr);
	if (!(p = getenv("PVMBUFSIZE")) || !(shmbufsiz = strtol(p, (char**)0, 0)))
		shmbufsiz = SHMBUFSIZE;

	key = pvmshmkey(0);
	if ((myshmbufid = shmget((key_t)key, shmbufsiz, IPC_CREAT|PERMS)) == -1) {
		pvmlogperror("ppi_init() can't create msg buffer");
		pvmbailout(0);
		return;
	}
#ifdef IMA_CSPP
	if ((pvminbox = (char *)shm_search(myshmbufid)) == (char *)-1L)
#else
	if ((pvminbox = (char *)shmat(myshmbufid, 0, 0)) == (char *)-1L)
#endif
	{
		pvmlogperror("ppi_init() can't attach msg buffer");
		shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0);
		myshmbufid = -1;
		pvmbailout(0);
		return;
	}

	infopage = pvminbox + INBOXPAGE*pgsz;
	outmsgbuf = infopage + pgsz;
	if (!(outbufsz = (shmbufsiz - INBOXPAGE*pgsz - pgsz)/pvmpgsz)) {
		pvmlogerror("ppi_init() SHMBUFSIZE too small!");
		shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0);
		myshmbufid = -1;
		pvmbailout(0);
		return;
	}
	nbufsowned = 0;

	msgbufinit(pvminbox);
#ifndef IMA_KSR1
	PAGEINITLOCK(&((struct shmpghdr *)infopage)->pg_lock);
#endif
	((struct shmpghdr *)infopage)->pg_ref = 0;
	pvminfo = (struct pidtidhdr *)(infopage + PVMPAGEHDR);
	pvminfo->i_proto = TDPROTOCOL;
	pvminfo->i_dsig = pvmmydsig;
	pvminfo->i_next = 0;
	pvminfo->i_bufsiz = shmbufsiz;
	pvminfo->i_dpid = pvmmyupid;
	pidtids = (struct pidtid *)(pvminfo + 1);
	maxpidtid = (pgsz - sizeof(struct pidtidhdr) - PVMPAGEHDR)/sizeof(struct pidtid);
	BZERO((char *)pidtids, sizeof(struct pidtid)*maxpidtid);
	{
		int i;

		for (i = 0; i < maxpidtid; i++)
			pidtids[i].pt_stat = ST_EXIT;
	}

	peer_init();

	ovfpkts = pk_new(0);
	ovfpkts -> pk_link = ovfpkts->pk_rlink = ovfpkts;

/*
#if defined(SUN4SOL2)
	sprintf(pvmtxt, "%ld CPUs online\n", sysconf(_SC_NPROCESSORS_ONLN));
	sprintf(pvmtxt, "%ld CPUs online\n", sysconf(15));
	pvmlogerror(pvmtxt);
#endif
*/
}


/*	mpp_free()
*
*	Remove shared resources for tid.
*	Delete it from peers list (possibly removing segment and semaphore).
*	Delete its pidtid table entry.
*/

void
mpp_free(tid)
	int tid;
{
	struct peer *pp;
	struct task *tp;
	int i;

	if (pvmdebmask & (PDMTASK|PDMNODE)) {
		sprintf(pvmtxt, "mpp_free() t%x\n", tid);
		pvmlogerror(pvmtxt);
	}

	if (!tid)
		return;

	for (pp = peers->p_link; pp != peers; pp = pp->p_link) {
		if (pp->p_tid == tid) {
			peer_detach(pp);
			break;
		}
	}

	PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
	for (i = 0; i < maxpidtid; i++)
		if (pidtids[i].pt_tid == tid) {
			pidtids[i].pt_stat = ST_EXIT;
			break;
		}
	PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
}


/*	mpp_conn()
*
*	Fill in pid->tid->key table for a new task.
*	This isn't really "connection" (done by peer_conn).
*/

int
mpp_conn(tp)
	struct task *tp;
{
	int firstidx, idx;
	struct pidtidhdr *pvminfo = (struct pidtidhdr *)(infopage + PVMPAGEHDR);

	PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
	firstidx = idx = pvminfo->i_next;
	while (pidtids[idx].pt_stat != ST_EXIT) {
		if (++idx == maxpidtid)
			idx = 0;
		if (idx == firstidx) {
			PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
			pvmlogerror("mpp_conn() pidtid table full\n");
			return PvmOutOfRes;
		}
	}
	if ((firstidx = idx + 1) == maxpidtid)
		firstidx = 0;
	pvminfo->i_next = firstidx;

	/* XXX so wtf good is an almost-refcount? */
	if (((struct shmpghdr *)infopage)->pg_ref < maxpidtid)
		((struct shmpghdr *)infopage)->pg_ref++;
	pidtids[idx].pt_tid = tp->t_tid;
	pidtids[idx].pt_ptid = tp->t_ptid;
	pidtids[idx].pt_stat = ST_NOTREADY;
	pidtids[idx].pt_pid = tp->t_pid;
	pidtids[idx].pt_key = 0;
	pidtids[idx].pt_cond = 0;
	PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
	if (pvmdebmask & (PDMTASK|PDMMEM)) {
		sprintf(pvmtxt, "mpp_conn() assigning pidtid[%d] to t%x\n",
				idx, tp->t_tid);
		pvmlogerror(pvmtxt);
	}
	if (pvmdebmask & PDMTASK) {
		sprintf(pvmtxt, "mpp_conn() new task t%x\n", tp->t_tid);
		pvmlogerror(pvmtxt);
	}
	return 0;
}


void
mpp_input()
{
	int next;
	struct pkt *pp;
	struct peer *pe;
	struct task *tp;
	int tid;
	int sdr;
	int src;
	int dst;
	int len;
	char *cp, *buf;
	struct msgboxhdr *inbp;
	struct shmpkhdr *inmsgs;

	inbp = (struct msgboxhdr *)pvminbox;
	inmsgs = (struct shmpkhdr *)(inbp + 1);
	do {
		next = (inbp->mb_read + 1) % inboxsz;
		sdr = inmsgs[next].ph_sdr;
		src = inmsgs[next].ph_src;

		if (inmsgs[next].ph_dat < 0) {		/* new task */
			int ipid;

			if (!(ipid = inmsgs[next].ph_dst))
				ipid = src;
			if (!(tp = task_findpid(ipid))) {
				/* not spawned by us */
				if ((tid = tid_new()) < 0) {
					pvmlogerror("mpp_input() out of tids?\n");
					continue;
				}
				if ((tp = task_new(tid)) == NULL) {
					pvmlogerror("mpp_input() too many tasks?\n");
					continue;
				}
				task_setpid(tp, src);
				mpp_conn(tp);

			} else if (tp->t_pid != src) {
				task_setpid(tp, src);
				tp->t_flag &= ~TF_FORKD;
			}
			tp->t_flag |= TF_CONN | TF_SHM;  
			continue;
		}

		if (!(pe = peer_conn(sdr, (int *)0)) || pe == (struct peer *)-1L) {
			sprintf(pvmtxt, "mpp_input() can't connect to sender t%x\n", sdr);
			pvmlogerror(pvmtxt);
			continue;
		}
		cp = pe->p_buf + INBOXPAGE*pgsz + inmsgs[next].ph_dat;
		buf = cp - (inmsgs[next].ph_dat & (pgsz-1)) + PVMPAGEHDR;
		dst = inmsgs[next].ph_dst;
		len = inmsgs[next].ph_len;

		/*
		* must copy all packets (for now).
		* sender may exit, and we don't check if we have page references
		* into a segment before we detach.
		*/
/*
		if (TIDISHERE(dst, pvmmytid) && TIDISTASK(dst)) {
*/
			pp = pk_new(len + MAXHDR);
			pp->pk_dat += MAXHDR;
			BCOPY(cp, pp->pk_dat, len);
			da_unref(buf);
			cp = pp->pk_dat;

/*
		} else {
			pp = pk_new(0);
			pp->pk_dat = cp;
			pp->pk_buf = buf;
			pp->pk_max = pvmudpmtu;
		}
*/
		pp->pk_len = len;
		pp->pk_src = src;
		pp->pk_dst = dst;
		pp->pk_flag = inmsgs[next].ph_flag;
		pp->pk_tag = inmsgs[next].ph_tag;
		pp->pk_ctx = inmsgs[next].ph_ctx;
		pp->pk_enc = inmsgs[next].ph_enc;
		pp->pk_wid = inmsgs[next].ph_wid;
		pp->pk_crc = inmsgs[next].ph_crc;

		if (tp = task_find(src)) {
			loclinpkt(tp, pp);

			/* CL tasks caught here. forked ones caught by SIGCHLD/SIGCLD */
			if ((tp->t_flag & TF_CLOSE) && !(tp->t_flag & TF_FORKD)) { 
				task_cleanup(tp);
				task_free(tp);
			}

		} else {
			sprintf(pvmtxt, "mpp_input() from unknown task t%x\n", src);
			pvmlogerror(pvmtxt);
			pk_free(pp);
		}

	} while ((inbp->mb_read = next) != inbp->mb_last);
}


/*	mpp_output()
*
*	Send packet to a task if it's connected, otherwise queue it in
*	a list to be retried later.
*
*	Returns 0 if packet sent, else 1.
*/

int
mpp_output(tp, pp)
	struct task *tp;
	struct pkt *pp;
{
	struct peer *pe;
	int dst;
	struct shmpkhdr *dmsgs = 0;
	struct pkt *pp1, *pp2;
	struct msgboxhdr *dboxp;
	char *cp;
	int loc;
	int next;

	dst = pp->pk_dst;

	/*
	* if page is private, copy and replace it with one in shared buf
	*/

	if ((loc = pp->pk_dat - outmsgbuf) > outbufsz * pvmpgsz || loc < 0) {
		if (nbufsowned == outbufsz) {
			static int once = 1;

			if (once) {
				pvmlogerror("mpp_output() Message(s) too long for shared buffer, deadlocked.\n");
				once = 0;
			}
		}

		cp = 0;
		do {
			if (cp)
				da_unref(cp);
			cp = da_new(MAXHDR + pp->pk_len);
		} while ((loc = cp - outmsgbuf) > outbufsz*pvmpgsz || loc < 0);

		BCOPY(pp->pk_dat, cp + MAXHDR, pp->pk_len);
		pp->pk_dat = cp + MAXHDR;
		da_unref(pp->pk_buf);
		pp->pk_buf = cp;
	}

	if ((pe = peer_conn(dst, (int *)0)) && pe != (struct peer *)-1L) {
		dboxp = (struct msgboxhdr *)pe->p_buf;
		dmsgs = (struct shmpkhdr *)(dboxp + 1);
		PAGELOCK(&dboxp->mb_lock);
		if ((next = (dboxp->mb_last + 1) % inboxsz) != dboxp->mb_read) {
			if (pvmdebmask & PDMPACKET) {
				sprintf(pvmtxt,
					"mpp_output() src t%x dst t%x ff %x len %d\n",
					pp->pk_src, pp->pk_dst, pp->pk_flag & (FFSOM|FFEOM),
					pp->pk_len);
				pvmlogerror(pvmtxt);
			}
			dmsgs[next].ph_src = pp->pk_src;
			dmsgs[next].ph_dst = dst;
			dmsgs[next].ph_sdr = pvmmytid;
			dmsgs[next].ph_dat = loc;
			dmsgs[next].ph_len = pp->pk_len;
			dmsgs[next].ph_flag = pp->pk_flag & (FFSOM|FFEOM);
			dmsgs[next].ph_ctx = pp->pk_ctx;
			dmsgs[next].ph_tag = pp->pk_tag;
			dmsgs[next].ph_enc = pp->pk_enc;
			dmsgs[next].ph_wid = pp->pk_wid;
			dmsgs[next].ph_crc = pp->pk_crc;
			da_ref(pp->pk_buf);
			dboxp->mb_last = next;

			if (dboxp->mb_sleep) {

/* #if defined(IMA_SUNMP) || defined(IMA_RS6KMP) || defined(IMA_AIX4MP) || defined(IMA_AIX5MP) */
#ifdef PVMUSEMUTEX
#ifdef	IMA_SUNMP
				cond_signal(&dboxp->mb_cond);
#endif
#if defined(IMA_RS6KMP) || defined(IMA_AIX4MP) || defined(IMA_AIX5MP)
				pthread_cond_signal(&dboxp->mb_cond);
#endif
#else
				peer_wake(pe);

#endif	/* PVMUSEMUTEX */
				dboxp->mb_sleep = 0;
			}

			PAGEUNLOCK(&dboxp->mb_lock);
			pk_free(pp);
			return 0;

		} else
			LISTPUTBEFORE(ovfpkts, pp, pk_link, pk_rlink);
		PAGEUNLOCK(&dboxp->mb_lock);

	} else
		LISTPUTBEFORE(ovfpkts, pp, pk_link, pk_rlink);
	return 1;
}


/*	mpp_probe()
*
*	Try to send buffered packets that couldn't be delivered before.
*	Update state of task from NOTREADY to SOCKET if it has socket connection.
*	XXX shouldn't be done here, why not in loclconn.
*
*	Returns 1 if packets ready for receipt, else 0.
*/

int
mpp_probe()
{
	struct pkt *pp, *pp2, *tosend;
	int dst;
	struct task *tp;
	int hasmsg;
	struct msgboxhdr *inbp = (struct msgboxhdr *)pvminbox;
	struct pidtidhdr *pvminfo = (struct pidtidhdr *)(infopage + PVMPAGEHDR);
	int ntids, i;

	tosend = ovfpkts;

	ovfpkts = pk_new(0);
	ovfpkts -> pk_link = ovfpkts -> pk_rlink = ovfpkts;

	while ((pp = tosend->pk_link) != tosend) {
		LISTDELETE(pp, pk_link, pk_rlink);
		dst = pp->pk_dst;
		if (tp = task_find(dst)) {
			if (tp->t_sock < 0) {
				if (mpp_output(tp, pp)) {
					for (pp = tosend->pk_link; pp != tosend; pp = pp2) {
						pp2 = pp->pk_link;
						if (pp->pk_dst == dst) {
							LISTDELETE(pp, pk_link, pk_rlink);
							LISTPUTBEFORE(ovfpkts, pp, pk_link, pk_rlink);
						}
					}
				}

			} else
				pkt_to_task(tp, pp);

		} else {
			sprintf(pvmtxt, "mpp_probe() pkt from t%x to t%x scrapped",
					pp->pk_src, pp->pk_dst);
			pvmlogperror(pvmtxt);
			pk_free(pp);
		}
	}

	pk_free(tosend);

	ntids = min(maxpidtid, ((struct shmpghdr *)infopage)->pg_ref);
	for (i = 0; i < ntids; i++)
		if (pidtids[i].pt_stat == ST_NOTREADY
		&& (tp = task_find(pidtids[i].pt_tid)) && tp->t_sock >= 0)
			pidtids[i].pt_stat = ST_SOCKET;

	hasmsg = (inbp->mb_read != inbp->mb_last);
	return hasmsg;
}


/*	mpp_cleanup()
*
*	We're bailing out.  All hands on deck. Remove our shared segment and 
*   as many segments and semaphores as we can for our tasks.
*
*/

void
mpp_cleanup()
{
	struct peer *pp;
	struct shmid_ds shmds;

/*
	if (pvminbox && shmdt(pvminbox) == -1)
		pvmlogperror("mpp_cleanup() shmdt inbox");
*/

	if (myshmbufid != -1
	&& shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0) == -1)
		pvmlogperror("mpp_cleanup() shmctl IPC_RMID mybuf");

	if (peers) {
		for (pp = peers->p_link; pp != peers; pp = pp->p_link) {
			if (pp->p_buf) {
				shmdt(pp->p_buf);
				pp->p_buf = 0;
			}

			if (pp->p_shmid == -1 && pp->p_key)
				pp->p_shmid = shmget((key_t)pp->p_key, shmbufsiz, 0);
			if (pp->p_shmid != -1 &&
			shmctl(pp->p_shmid, IPC_RMID, (struct shmid_ds *)0) == -1) {
				sprintf(pvmtxt, "shmctl id=0x%x", pp->p_shmid);
				pvmlogperror(pvmtxt);
			}

#ifdef	USERECVSEMAPHORE
			if (pp->p_semid == -1 && pp->p_key)
				pp->p_semid = semget((key_t)pp->p_key, 1, PERMS);
			if (pp->p_semid != -1 && semctl(pp->p_semid, 0, IPC_RMID) == -1) {
				sprintf(pvmtxt, "semctl id=0x%x", pp->p_semid);
				pvmlogperror(pvmtxt);
			}
#endif
			pp->p_key = 0;
		}
	}
}


pidtid_dump()
{
	int i;
	char *s;

	pvmlogerror("pidtid_dump()\n");
	for (i = 0; i < maxpidtid; i++) {
		switch (pidtids[i].pt_stat) {
		case ST_EXIT:
			s = 0;
			break;

		case ST_NOTREADY:
			s = "NOTREADY";
			break;

		case ST_SHMEM:
			s = "SHMEM";
			break;

		case ST_SOCKET:
			s = "SOCKET";
			break;

		case ST_FINISH:
			s = "FINISH";
			break;

		default:
			s = "UNKNOWN";
			break;
		}

		if (s) {
			sprintf(pvmtxt, "%4d pid %d tid %x ptid %x stat %s key 0x%x",
					i,
					pidtids[i].pt_pid,
					pidtids[i].pt_tid,
					pidtids[i].pt_ptid,
					s,
					pidtids[i].pt_key);
#ifdef IMA_CSPP
			sprintf(pvmtxt + strlen(pvmtxt), " node %d\n", pidtids[i].pt_node);
#else
			strcat(pvmtxt, "\n");
#endif
			pvmlogerror(pvmtxt);
		}
	}
	return 0;
}


/*	mpp_setstatus()
*
*	Take snapshot of task conditions and set flags in task records.
*/

int
mpp_setstatus(tid)
	int tid;			/* not used right now */
{
	int i;
	struct task *tp;

	PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
	for (i = 0; i < maxpidtid; i++)
		if (pidtids[i].pt_stat != ST_EXIT)
			if (tp = task_find(pidtids[i].pt_tid)) {
				tp->t_flag &= ~TF_DEADSND;
				if (pidtids[i].pt_cond)
					tp->t_flag |= TF_DEADSND;
			}
	PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
	return 0;
}


/*	mpp_dredge()
*
*	Dredge pidtid table for zombies, with pt_stat == ST_FINISH.
*	Exit the task cleanly if it still exists and recycle the table entry.
*/

int
mpp_dredge()
{
	int i;
	struct task *tp;

	PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
	for (i = 0; i < maxpidtid; i++)
		if (pidtids[i].pt_stat == ST_FINISH) {
			if (tp = task_find(pidtids[i].pt_tid)) {
				PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
				task_cleanup(tp);
				task_free(tp);
				PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
			} else
				pidtids[i].pt_stat = ST_EXIT;
		}
	PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
	return 0;
}


/*	ppi_load()
*
*	Fork and exec new tasks.  Give them pidtid table entries.
*/

int
ppi_load(wxp)
	struct waitc_spawn *wxp;
{
	int i;
	struct task *tp;
	int err = 0;

	for (i = 0; i < wxp->w_veclen; i++) {
		if (err) {
			wxp->w_vec[i] = err;

		} else {
			if (err = forkexec(wxp->w_flags, wxp->w_argv[0], wxp->w_argv,
					wxp->w_nenv, wxp->w_env, 
					(wxp->w_instance+i), wxp->w_hosttotal, wxp->w_outof, &tp)) {
				wxp->w_vec[i] = err;

			} else {
				tp->t_ptid = wxp->w_ptid;
				tp->t_outtid = wxp->w_outtid;
				tp->t_outctx = wxp->w_outctx;
				tp->t_outtag = wxp->w_outtag;
				tp->t_trctid = wxp->w_trctid;
				tp->t_trcctx = wxp->w_trcctx;
				tp->t_trctag = wxp->w_trctag;
				tp->t_sched = wxp->w_sched;
				mpp_conn(tp);	/* XXX this can fail, dunno how to clean up */
				wxp->w_vec[i] = tp->t_tid;
			}
		}
	}
	return 0;
}


/* XXX this is just a basic copy of the pvmdunix code so we can at least compile */
int
ppi_kill(tp, signum)
    struct task *tp;
    int signum;
{
    if (tp->t_pid)
        (void)kill(tp->t_pid, signum);
    else
        pvmlogprintf("ppi_kill() signal for t%x scrapped (pid = 0)\n",
                tp->t_tid);
    return 0;
}


/* write any packets waiting the tp -> t_txq, send queue, these may 
have been stacked up before the task state changed to  TF_SHMCONN
This is currently called by tm_getopt when the state change is
marked from TF_PRESHMCONN to TF_SHMCONN
*/
int
shm_wrt_pkts(tp)
struct task * tp;
{
struct pkt *pp, *pp2, *pptmp;
struct pkt tmplist;

	
	if (tp -> t_txq)
	{
		pptmp = &tmplist;
		pptmp -> pk_link = pptmp->pk_rlink = pptmp;

		/* Take packets off of the t_txq */
		pp = tp -> t_txq -> pk_link;
		while (pp != tp -> t_txq)
		{
			pp2 = pp;
			pp = pp -> pk_link;
			LISTDELETE(pp2, pk_link, pk_rlink);
			LISTPUTBEFORE(pptmp, pp2, pk_link, pk_rlink);
		}

		pp = pptmp -> pk_link;

		while (pp != pptmp)
		{
			pp2 = pp;
			pp = pp -> pk_link;
			LISTDELETE(pp2, pk_link, pk_rlink);
			if (tp -> t_flag & TF_SHMCONN)
			{
				if ( pvmdebmask & PDMPACKET)
					pvmlogprintf("shm_wrt_pkts for %x \n", tp -> t_tid);
				mpp_output(tp, pp2);
			}
			else
				break;
		}	
	}
		
	return 0;
}
	


syntax highlighted by Code2HTML, v. 0.9.1