static char rcsid[] =
	"$Id: lpvmshmem.c,v 1.47 2004/01/14 18:50:57 pvmsrc Exp $";

/*
 *         PVM version 3.4:  Parallel Virtual Machine System
 *               University of Tennessee, Knoxville TN.
 *           Oak Ridge National Laboratory, Oak Ridge TN.
 *                   Emory University, Atlanta GA.
 *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
 *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
 *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
 *                   (C) 1997 All Rights Reserved
 *
 *                              NOTICE
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby granted
 * provided that the above copyright notice appear in all copies and
 * that both the copyright notice and this permission notice appear in
 * supporting documentation.
 *
 * Neither the Institutions (Emory University, Oak Ridge National
 * Laboratory, and University of Tennessee) nor the Authors make any
 * representations about the suitability of this software for any
 * purpose.  This software is provided ``as is'' without express or
 * implied warranty.
 *
 * PVM version 3 was funded in part by the U.S. Department of Energy,
 * the National Science Foundation and the State of Tennessee.
 */

/*
 *	lpvmshmem.c
 *
 *	Libpvm core for MPP environment.
 *
 * $Log: lpvmshmem.c,v $
 * Revision 1.47  2004/01/14 18:50:57  pvmsrc
 * Added new AIX5* arches.
 * (Spanker=kohl)
 *
 * Revision 1.46  2002/02/18 19:18:35  pvmsrc
 * Added "|| defined(IMA_SUN4SOL2)" to "const void *i, *j" #if
 * 	for int_compare() / qsort()...
 * 	(needed for sure for 64 bit) Solaris)
 * (Spanker=kohl)
 *
 * Revision 1.45  2001/09/25 21:21:00  pvmsrc
 * Yanked "char *pvmgettmp();" decl - now in pvm3.h...
 * (Spanker=kohl)
 *
 * Revision 1.44  2001/05/11 17:32:29  pvmsrc
 * Eliminated references to sys_errlist & sys_nerr.
 * 	- unnecessary, and we're whacking that crap anyway.
 * (Spanker=kohl)
 *
 * Revision 1.43  2000/02/10 20:44:41  pvmsrc
 * Replaced hard-coded /tmp for PVMSHMFILE.
 * 	- use pvmgettmp() routine now to determine PVM temp dir.
 * (Spanker=kohl)
 *
 * Revision 1.42  1999/07/08 19:00:02  kohl
 * Fixed "Log" keyword placement.
 * 	- indent with " * " for new CVS.
 *
 * Revision 1.41  1999/03/15  19:06:03  pvmsrc
 * Replaced sleep() calls with pvmsleep().
 * 	- Unix / Win32, C / Fortran compat...
 * (Spanker=kohl)
 *
 * Revision 1.40  1999/02/20  14:17:57  pvmsrc
 * Another try at the SGI compiler warning...  :-]
 * (Spanker=kohl)
 *
 * Revision 1.39  1999/02/19  14:10:56  pvmsrc
 * SGI compiler warning...
 * (Spanker=kohl)
 *
 * Revision 1.38  1998/08/13  18:31:13  pvmsrc
 * Altered SUNMP to use test and set operations with semaphores
 * 		for page locking instead of MUTEX and cond vars.
 * 	Changes are mainly in pvmshmem.h, with lots of #ifdefs changes.
 * 	Makefile altered to use the PLOCKFILE to indicate the Page Locking
 * 		INLINE code used (from SUNMP.conf).
 * 	Some changes effect AIX MP versions which still use conditional
 * 		variables and may change to semaphores soon.
 * (Spanker=fagg)
 *
 * Revision 1.37  1998/07/24  17:11:46  pvmsrc
 * Cleaned up use of SOCKLENISUINT / oslen.
 * 	- use oslen for every socket-related call:
 * 		* bind(), recvfrom(), getsockname() and accept().
 * (Spanker=kohl)
 *
 * Revision 1.36  1998/02/24  15:36:25  pvmsrc
 * Oops...  leftover typo from umbuf -> pmsg renaming way back when.
 * 	- UB_DIFFNODE -> MM_DIFFNODE for IMA_CSPP.
 * (Spanker=kohl)
 *
 * Revision 1.35  1998/02/23  22:51:39  pvmsrc
 * Added AIX4SP2 stuff.
 * (Spanker=kohl)
 *
 * Revision 1.34  1997/12/31  20:50:09  pvmsrc
 * Cleaned Up System Message Handlers.
 * 	- current send / recv buffers now saved before invocation of
 * 		message handler functs.
 * 	- removed manual rbf = setrbuf(mid) saving & resetting in
 * 		handlers...
 * (Spanker=kohl)
 *
 * Revision 1.33  1997/12/01  19:20:44  pvmsrc
 * Replaced #ifdef IMA_OS2 fd_set declarations:
 * 	- new #ifdef FDSETNOTSTRUCT.
 * 	- choose between "fd_set foo" and "struct fd_set foo"...
 * (Spanker=kohl)
 *
 * Revision 1.32  1997/11/04  23:19:21  pvmsrc
 * Cleaned up fd_set stuff (hopefully).
 * (Spanker=kohl)
 *
 * Revision 1.31  1997/10/24  15:18:03  pvmsrc
 * Added TEV_DID_RCX to trace events for Receive Message Context.
 * 	- in pvm_recv(), pvm_trecv(), pvm_nrecv(), and pvm_precv().
 * (Spanker=kohl)
 *
 * Revision 1.30  1997/10/24  14:29:33  pvmsrc
 * Added TEV_DID_MCX / pvmmyctx trace event info to:
 * 	- pvm_send(), pvm_mcast(), pvm_recv(), pvm_trecv(), pvm_nrecv().
 * 	- pvm_psend(), pvm_precv().
 * (Spanker=kohl)
 *
 * Revision 1.29  1997/08/06  22:43:20  pvmsrc
 * Added new SGI6 and SGIMP6 arches.
 *
 * Revision 1.28  1997/07/08  21:25:50  pvmsrc
 * peer_send() now directs control TC messages via the daemon.
 * node_mcast() is no-longer called until its pmsg buffer error is found,
 * i.e. mcast messages are sent via the daemon, using a MCA instead of directly.
 * pvmmcast() sets the context to the current default.
 * msendrecv() now takes a context arg! It sets it as well (and the waitid).
 * beatask() uses SYS contexts on handshakes.
 *
 * Revision 1.27  1997/07/03  17:41:50  pvmsrc
 * Added new sleep_dammit() routine.
 * 	- usleep() on SUNMP, RS6KMP, AIX4MP.
 * 	- select() w/ timeout everywhere else.
 *
 * Revision 1.26  1997/07/02  20:34:58  pvmsrc
 * Commented out usleep() call.
 * 	- not implemented on every arch...
 *
 * Revision 1.25  1997/07/02  14:33:45  pvmsrc
 * beatask() connection attempt tries harder by kicking the daemon
 * 	out of select().
 *
 * Revision 1.24  1997/06/27  20:48:08  pvmsrc
 * Missed AIX arches on write test.
 *
 * Revision 1.23  1997/06/25  22:09:10  pvmsrc
 * Markus adds his frigging name to the author list of
 * 	every file he ever looked at...
 *
 * Revision 1.22  1997/06/16  13:42:44  pvmsrc
 * Oops...  My mistake.  D-Oh.
 * 	- can't use mid as struct pmsg * damn damn damn.
 * 	- wouldn't have been a problem if stupid shmem code didn't
 * 		call mroute() down inside system...  :-/~
 *
 * Revision 1.21  1997/06/12  20:10:46  pvmsrc
 * Made sure all communications for TC_* task control messages
 * 	use the SYSCTX_TC system context.
 * 	- some messages being sent in default context...  D-Oh...
 *
 * Revision 1.20  1997/06/02  13:50:02  pvmsrc
 * Added missing #include host.h for waitc.h.
 *
 * Revision 1.19  1997/06/02  13:24:13  pvmsrc
 * Added test to shmemuptod()
 * 	- so that it creates daemon socket if it doesn't already exist.
 *
 * Revision 1.18  1997/05/29  15:14:05  pvmsrc
 * Removed static decls:
 * 	- pvmtrcsbf, pvmrouteopt now in lpvmglob.c / lpvm.h.
 * 	- pvmtrcmid doesn't exist.
 * Fixed arg casting for SGI's:
 * 	- read_int() takes (int *), not (struct pidtidhdr *)...
 *
 * Revision 1.17  1997/05/27  15:05:30  pvmsrc
 * Added SHMCONN handshake code into pvmbeatask():
 * 	via new call send_my_pid().
 * 	- this removes strange floating tid 0 processes...
 * 	- send_my_pid() calls new function shmemuptod() which is a
 * 		simplified one-way mxfer().
 * 	- shmem node tasks still do not read from pvmd via sockets.
 * Text formatting (JAK).
 *
 * Revision 1.16  1997/05/21  16:01:50  pvmsrc
 * Updated ifdefs to include AIX4MP arch type.
 *
 * Revision 1.15  1997/05/21  15:45:41  pvmsrc
 * pvm_tc_taskexit handled correctly.
 * 	Reports error only when no peer entry exits.
 *
 * Revision 1.14  1997/05/19  15:00:56  pvmsrc
 * Number of small fixes:
 * 	midtobuf, umbuf_new are prototyped correctly now.
 * 	msg contexts set/read correctly in peer_send & peer_recv.
 * 	pvmbeatask():
 * 		shmem key id allocated/located without bizzare attachment error.
 * 		waitc struct is now set using wait_init() before first use... ;)
 * 	pvmmcast():
 * 		mroute is called with a valid mid
 * 		TT direct routes are not checked until ttpcb is defined
 *
 * Revision 1.13  1997/04/30  21:26:07  pvmsrc
 * SGI Compiler Warning Cleanup.
 *
 * Revision 1.12  1997/04/25  19:19:40  pvmsrc
 * altered to allow integration into 3.4 messaging.
 * Changes:
 * 	-	includes
 * 	- 	added mhp struct def (same as pmsg)
 * 	- 	updated pvmmidh, pvmmidsiz and m_umbs
 * 	- 	added ttpcd lists
 * 	- 	own copy of pvmmcast() (same as lpvm.c)
 * 	- 	own copy of ttpcb_find() minus mpp_ttpcb_find routine XXX
 * 	- 	own copy of post_routedelete() (from lpvmgen.c)
 *
 * Revision 1.11  1997/04/07  21:09:14  pvmsrc
 * pvm_addmhf() - new paramter interface
 *
 * Revision 1.10  1997/04/01  21:28:18  pvmsrc
 * Damn Damn Damn.
 * 	- pvm_recvinfo() returns a bufid, not an index.  Damn.
 *
 * Revision 1.9  1997/04/01  20:48:21  pvmsrc
 * Fixed tracer mbox usage:
 * 	- pvm_getinfo() -> pvm_recvinfo(), new semantics handled (recvinfo
 * 		sets rbuf implicitly, a la pvm_recv, need to save rbuf).
 *
 * Revision 1.8  1997/03/27  19:55:31  pvmsrc
 * Fixed up pvmbeatask() to go get tracer info if spawned from shell:
 * 	- env var info including trace mask, trace buffer size, trace opts.
 * 	- use PVMTRACERCLASS mbox entry to fill in values, if matches
 * 		on trctid, trcctx, and trctag.
 *
 * Revision 1.7  1997/03/06  21:50:21  pvmsrc
 * Yanked out #includes for <netinet/in.h> and <netinet/tcp.h>.
 * 	- dups with lpvm.h #includes...
 *
 * Revision 1.6  1997/01/28  19:26:52  pvmsrc
 * New Copyright Notice & Authors.
 *
 * Revision 1.5  1996/12/19  19:57:58  pvmsrc
 * Eradicated remainder of old control message interface.
 * 	- replaced pvmmctl() routine with individual messages handlers:
 * 		* pvm_tc_shmat() (lpvmshmem.c only).
 * 		* pvm_tc_conreq().
 * 		* pvm_tc_conack().
 * 		* pvm_tc_taskexit().
 * 	- added appropriate calls to pvm_addmhf() in pvmbeatask().
 * 	- removed calls to pvmmctl() in mroute() (peer_recv() in
 * 		lpvmshmem.c), replaced with new mesg_input() call,
 * 		use new pmsg_setenc() routine to set message encoding.
 *
 * Revision 1.4  1996/12/18  22:27:50  pvmsrc
 * Extracted duplicate versions of routines from lpvm/mimd/shmem.c,
 * 	inserted into shared lpvmgen.c:
 * 	- pvmbailout().
 * 	- pvmlogerror().
 * 	- vpvmlogprintf(), pvmlogprintf().  (hope these work on MPP & shmem)
 * 	- pvmlogperror().
 *
 * Revision 1.3  1996/10/25  13:57:31  pvmsrc
 * Replaced old #includes for protocol headers:
 * 	- <pvmsdpro.h>, "ddpro.h", "tdpro.h"
 * With #include of new combined header:
 * 	- <pvmproto.h>
 *
 * Revision 1.2  1996/10/24  22:44:35  pvmsrc
 * Modified for New Tracing Facility:
 * 	- moved #include "global.h" below other #include's for typing.
 * 	- removed extra #include <pvm3.h> in lpvm.c...
 * 	- added #include of new "lpvm.h" to replace explicit externs.
 * 	- removed common control message handlers from lpvm.c:
 * 		* extracted to lpvmgen.c for general usage.
 * 		* pvm_tc_noop(), pvm_tc_settmask(), pvm_tc_siblings().
 * 		-> lpvmmimd.c & lpvmshmem.c still need remainder of pvmmctl()
 * 			replaced with control message handlers.
 * 	- arg typing hassles with int_compare() / qsort() exacerbated...
 * 	- modified pvmbeatask():
 * 		* handle new tracing info, unpack tracing and output collection
 * 			parameters into temp storage, and then check for local task
 * 			override before applying.
 * 		* read in new tracing env vars PVMTRCBUF & PVMTRCOPT.
 * 		* install new common message handlers.
 * 		* call new tev_init() routine to set up tracing stuff.
 * 		* use new Pvmtracer structures (pvmtrc & pvmctrc) to store info.
 * 	- removed pvm_getopt() & pvm_setopt() -> moved to common lpvmgen.c.
 * 	- removed old tev_begin(), tev_fin() & tev_do_trace() routines.
 * 	- updated trace event generation for pvm_getfds(), pvm_start_pvmd(),
 * 		pvm_precv(), pvm_psend().
 *
 * Revision 1.1  1996/09/23  23:44:20  pvmsrc
 * Initial revision
 *
 * Revision 1.39  1995/11/10  21:37:46  manchek
 * check for EINTR from semop in peer_wait
 *
 * Revision 1.38  1995/11/02  16:17:20  manchek
 * free replies to control messages in mxfer
 *
 * Revision 1.37  1995/09/08  17:26:02  manchek
 * aargh forgot semicolon
 *
 * Revision 1.36  1995/09/08  16:56:17  manchek
 * experimental changes to pvm_psend to improve performance (postpone freebuf)
 *
 * Revision 1.35  1995/09/06  17:37:24  manchek
 * aargh, forgot pvm_precv
 *
 * Revision 1.34  1995/09/06  17:31:41  manchek
 * pvm_psend returns not implemented instead of bad param for string type
 *
 * Revision 1.33  1995/09/05  19:20:30  manchek
 * changes from bigapple to make busywait work better
 *
 * Revision 1.32  1995/07/28  16:40:59  manchek
 * wrap HASERRORVARS around errno declarations
 *
 * Revision 1.31  1995/07/28  15:35:19  manchek
 * address of copy databuf wasn't set right in peer_send
 *
 * Revision 1.30  1995/07/28  15:10:37  manchek
 * only send message header on first fragment
 *
 * Revision 1.29  1995/07/28  15:07:30  manchek
 * peer_send wasn't generating message checksum
 *
 * Revision 1.28  1995/07/24  18:45:51  manchek
 * message headers passed in inbox shmpkhdr instead of databuf
 * requires changes to peer_recv, peer_send, msendrecv.
 * use pvmnametag function to print message tags symbolically
 *
 * Revision 1.27  1995/07/19  16:37:51  manchek
 * peer_send returns NotImpl if DataInPlace used
 *
 * Revision 1.26  1995/07/18  17:00:13  manchek
 * added code to generate and check crc on each message (MCHECKSUM).
 * get and put wait-id in message header
 *
 * Revision 1.25  1995/07/12  01:20:40  manchek
 * initialize lots of globals, reset in pvmendtask.
 * pvmendtask checks that globals are set before using them.
 * catch_kill reinstalls old sighldr, doesn't call if it's SIG_IGN,
 * sets pidtid state to ST_FINISH.
 * peer_detach now frees the peer struct
 *
 * Revision 1.24  1995/07/11  18:58:30  manchek
 * peer_wait returns int, -1 if error reading semaphore.
 * mroute and dynbuf check peer_wait return for error
 *
 * Revision 1.23  1995/07/05  16:18:10  manchek
 * exiting task sets pidtid entry state to ST_FINISH instead of ST_EXIT
 *
 * Revision 1.22  1995/07/05  15:36:40  manchek
 * free messages before sending TASKEXIT in pvmendtask
 *
 * Revision 1.21  1995/07/03  20:51:15  manchek
 * made convex poll-type code in peer_wait() the default case
 *
 * Revision 1.20  1995/07/03  20:18:26  manchek
 * hellish cleanup of comments and formatting.
 * removed POWER4 ifdefs.
 * added deadlock detection where task owns all pages in shared
 * segment and needs still another
 *
 * Revision 1.19  1995/06/30  16:24:45  manchek
 * aargh
 *
 * Revision 1.18  1995/06/28  18:17:56  manchek
 * moved check_for_exit from pvmumbuf.c
 *
 * Revision 1.17  1995/06/28  17:59:11  manchek
 * typo
 *
 * Revision 1.16  1995/06/28  15:47:18  manchek
 * added TC_SHMAT connect handshake
 *
 * Revision 1.15  1995/06/19  17:47:58  manchek
 * was packing random string in TC_CONACK message in pvmmctl
 *
 * Revision 1.14  1995/06/02  17:21:34  manchek
 * pvm_start_pvmd ignores INT, QUIT, TSTP signals
 *
 * Revision 1.13  1995/05/30  14:48:37  manchek
 * in pvmendtask() must call peer_cleanup() before erasing our tid
 *
 * Revision 1.12  1995/05/24  19:07:37  manchek
 * small fix in getdsock().
 * changed HPPA shared memory name to HPPAMP
 *
 * Revision 1.11  1995/05/22  19:45:45  manchek
 * added ifdefs for RS6KMP
 *
 * Revision 1.10  1995/05/22  19:09:02  manchek
 * took out ifdefs around read_int().
 * with new startup, can read pvmd sockaddr file only once
 *
 * Revision 1.9  1995/05/18  17:22:08  manchek
 * need to export pvminbox and myshmbufid
 *
 * Revision 1.8  1995/05/17  16:21:17  manchek
 * added support for CSPP shared memory.
 * lots of bug fixes from SGI and Convex.
 * added PVMTASKDEBUG envar.
 * pvm_start_pvmd reads sockaddr from pvmd instead of sleeping on addr file.
 * CSPP port (only) uses new TM_GETOPT message to get trace, output dest.
 *
 * Revision 1.7  1995/02/06  21:43:59  manchek
 * pvmmctl now replies to TC_CONREQ message.
 * better cleanup of message buffers in pvmendtask
 *
 * Revision 1.6  1995/02/01  21:24:47  manchek
 * error 4 is now PvmOverflow
 *
 * Revision 1.5  1994/12/20  16:38:44  manchek
 * added pvmshowtaskid variable
 *
 * Revision 1.4  1994/11/07  22:42:39  manchek
 * general damage control and cleanup:
 * initialize variables
 * send null packets to wake up pvmd instead of reconnecting
 * clean up on catching SIGTERM
 *
 * Revision 1.3  1994/06/30  21:35:40  manchek
 * typo in peer_recv()
 *
 * Revision 1.2  1994/06/04  21:44:31  manchek
 * updated header.
 * changed TM_SET to TM_SETOPT
 *
 * Revision 1.1  1994/06/03  20:38:18  manchek
 * Initial revision
 *
 */

#include <sys/param.h>
#include <stdio.h>
#include <rpc/types.h>
#include <rpc/xdr.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <fcntl.h>
#ifdef	SYSVSTR
#include <string.h>
#else
#include <strings.h>
#endif
#include <signal.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/mman.h>
#include <unistd.h>
#ifdef IMA_SYMM
#include <parallel/parallel.h>
#endif
#ifdef IMA_KSR1
#include <pthread.h>
#endif
#include <sys/time.h>
#ifdef NEEDSSELECTH
#include <sys/select.h>
#endif

#include <pvm3.h>
#include <pvmproto.h>
#include "pvmalloc.h"
#include "pvmdabuf.h"
#include "pvmfrag.h"
#include "pmsg.h"
#include "listmac.h"
#include "pvmshmem.h"
#include "bfunc.h"
#include "lpvm.h"
#include "host.h"
#include "waitc.h" /* needed by post_routedelete */
#include <pvmtev.h>
#include "tevmac.h"
#include "global.h"

#ifndef max
#define max(a,b)	((a)>(b)?(a):(b))
#endif

#ifndef min
#define min(a,b)	((a)<(b)?(a):(b))
#endif

char *getenv();

void hex_inadport __ProtoGlarp__ (( char *, struct sockaddr_in * ));

/* These prototypes stop compiler casting structs to ints... */
struct pmsg *midtobuf(); /* Oops someone forgot this one KEV/GEF */
struct pmsg *umbuf_new(); /* Here is another :) KEV/GEF */


extern struct encvec *enctovec();
char *pvmgetpvmd();
char *pvmgethome();
char *pvmdsockfile();
char *pvmnametag();

#ifdef IMA_CSPP
int current_node();
#endif


/***************
 **  Globals  **
 **           **
 ***************/

#ifndef HASERRORVARS
extern int errno;					/* from libc */
#endif

/* XXX this should be in a header file not copied */
/* message heap element */

struct mhp {
	int m_free;                 /* free list (next or 0) */
	struct pmsg *m_umb;         /* message or null if on free list */
};


extern int bufpageused;				/* from pvmshmem.c */
extern struct mhp *pvmmidh;			/* from lpvmpack.c */
extern int pvmmidhsiz;				/* from lpvmpack.c */

int shmbufsiz = 0;					/* shared-memory buffer size */

int pvmtidhmask = TIDHOST;			/* mask - host field of tids */
int pvmtidlmask = TIDLOCAL;			/* mask - local field of tids */

int pgsz = 0;						/* system page size */
int pvmpgsz = 0;					/* PVM virtual page size */
char *outmsgbuf = 0;				/* my outgoing message buffer */
int outbufsz = 0;					/* # of frags in outgoing msg buf */
int nbufsowned = 0;					/* num shared frags held by us */
struct pidtid *pidtids = 0;			/* pid -> tid table */
char *infopage = 0;					/* proto, NDF, pid-tid table */
int maxpidtid = 0;					/* size of pid-tid table */
int myshmbufid = -1;				/* ID of shared-memory buffer */
char *pvminbox = 0;					/* incoming message buffer */

struct pmsg *m_umb;            /* message or null if on free list */



/***************
 **  Private  **
 **           **
 ***************/

static char pvmtxt[512];				/* scratch for error log */
static struct pmsg *rxfrag = 0;			/* not-assembled incm msgs */
static struct frag *rxbuf;				/* buffer for incoming packts */

#ifdef IMA_SYMM
static int cpuonline;					/* the number of CPUs available */
static struct shpage *pvmfraginfo;		/* frag locks and ref counts */
#endif
#ifdef USERECVSEMAPHORE
static int mysemid = -1;				/* ID of semaphore to sleep on */
#endif
static char *pvmdinbox = 0;				/* pvmd's incoming message buffer */
static char *pvmdoutbuf = 0;			/* pvmd's outgoing message buffer */
static int pvminboxsz = 0;				/* size of incoming message buffer */
static int mypidtid = -1;				/* my position in pid-tid table */
static int pvmdpid = 0;					/* pvmd's Unix proc ID */
static struct sockaddr_in pvmdsad;		/* address of pvmd socket */
static int pvmdsock = -1;				/* pvmd socket descriptor */

static void (*pvmoldtermhdlr)() = 0;
static int pvmsettermhdlr = 1;
static int outta_here = 0;

/* TT lists used for mcasting [GEF] */
static struct ttpcb *ttlist = 0;        /* dll of connected tasks */
static struct ttpcb *topvmd = 0;        /* default route (to pvmd) */


/**************************
 **  Internal Functions  **
 **                      **
 **************************/

/*	getdsock()
*
*	Get address of pvmd socket, set up pvmdsad.
*/

void
getdsock()
{
	char buf[128];
	int d;
	int n;
	char *p;

	if (!(p = getenv("PVMSOCK"))) {
		if (!(p = pvmdsockfile())) {
			pvmlogerror("getdsock() pvmdsockfile() failed\n");
			return;
		}
		if ((d = open(p, O_RDONLY, 0)) == -1) {
			pvmlogperror(p);
			return;
		}
		n = read(d, buf, sizeof(buf));
		(void)close(d);
		if (n == -1) {
			pvmlogperror("getdsock() read addr file");
			return;
		}
		if (n == 0) {
			pvmlogerror("getdsock() read addr file: wrong length read\n");
			return;
		}
		buf[n] = 0;
		p = buf;
	}

	hex_inadport(p, &pvmdsad);
	pvmdsad.sin_family = AF_INET;
}


/*	prodpvmd()
*
*	wake up the pvmd, which is sleeping on sockets and not shared memory.
*	XXX this sucks.
*/

void
prodpvmd()
{
	static char dummy[TDFRAGHDR];

	if (pvmdsock == -1) {
		if ((pvmdsock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
			pvmlogperror("prodpvmd() socket");
			return;
		}
		if (connect(pvmdsock, (struct sockaddr*)&pvmdsad, sizeof(pvmdsad))
		== -1) {
			pvmlogperror("prodpvmd() connect");
			close(pvmdsock);
			pvmdsock = -1;
			return;
		}
		BZERO(dummy, sizeof(dummy));

	} else {
		write(pvmdsock, dummy, TDFRAGHDR);
	}
}


/*	peer_wait()
*
*	Check inbox for messages, block on a semaphore if no message.
*/

int
peer_wait()
{
	struct msgboxhdr *inbp = (struct msgboxhdr *)pvminbox;

#ifndef USERECVSEMAPHORE

	PAGELOCK(&inbp->mb_lock);
	while (inbp->mb_read == inbp->mb_last)
#if defined(IMA_SUNMP) && defined(PVMUSEMUTEX)
		cond_wait(&inbp->mb_cond, &inbp->mb_lock);
#endif
#if defined(IMA_RS6KMP) || defined(IMA_AIX4MP) || defined(IMA_AIX5MP)
		pthread_cond_wait(&inbp->mb_cond, &inbp->mb_lock);
#endif
	PAGEUNLOCK(&inbp->mb_lock);

#else	/*USERECVSEMAPHORE*/

	struct sembuf sops;

	sops.sem_num = 0;
	sops.sem_op = -1;
	sops.sem_flg = 0;
	while (inbp->mb_read == inbp->mb_last) {
		if (pvmdebmask & PDMSEM) {
			sprintf(pvmtxt, "peer_wait(): Waiting on semop id = %d\n", mysemid);
			pvmlogerror(pvmtxt);
		}
		if (semop(mysemid, &sops, 1) == -1) {
			if (errno != EINTR) {
				sprintf(pvmtxt, "peer_wait(): Error waiting for semop id = %d",
						mysemid);
				pvmlogperror(pvmtxt);
				return -1;
			}
		}
		else if (pvmdebmask & PDMSEM) {
			sprintf(pvmtxt, "peer_wait(): Processing Event on semop id = %d\n",
					mysemid);
			pvmlogerror(pvmtxt);
		}
	}

#endif	/*USERECVSEMAPHORE*/
	return 0;
}


/*	pvm_tc_shmat()
*
*	Some Shared Memory Attach Protocol Spanking.
*
*	TC_SHMAT()
*/

static int
pvm_tc_shmat(mid)
	int mid;
{
	int src;					/* sender of request */
	struct peer *pp;
	struct msgboxhdr *dboxp;
	int ackd;					/* allow connection (0) */

	pvm_bufinfo(mid, (int *)0, (int *)0, &src);

	if (!(pp = peer_conn(src, (int *)0)) || pp == (struct peer *)-1L) {
		pvmlogprintf("pvm_tc_shmat() can't connect to src t%x\n", src);
		ackd = -1;
	} else
		ackd = pvmmytid;
	dboxp = (struct msgboxhdr *)pp->p_buf;
	dboxp->mb_attached = ackd;

	pvm_freebuf(mid);
	return 0;
}


/*	pvm_tc_conreq()
*
*	Another task requests a connection with us.
*	Reply with a TC_CONACK message.
*
*	TC_CONREQ() {
*		int tdprotocol		// t-d protocol revision number
*		string sockaddr		// address of other socket
*	}
*/

static int
pvm_tc_conreq(mid)
	int mid;
{
	int src;					/* sender of request */
	int sbf = 0;				/* reply message mid */
	int ttpro;					/* protocol revision */
	int ackd;					/* allow connection (0) */
	int i;
	int ictx;

	pvm_bufinfo(mid, (int *)0, (int *)0, &src);

	pvmlogprintf("pvm_tc_conreq() TCP conn request from t%x!\n", src);

	sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
	ttpro = TDPROTOCOL;
	ackd = 1;
	pvm_pkint(&ttpro, 1, 1);
	pvm_pkint(&ackd, 1, 1);
	pvm_pkstr("");
i = pvmrescode;
pvmrescode = 1;
	ictx = pvm_setcontext(SYSCTX_TC);
	pvm_send(src, TC_CONACK);
	pvm_setcontext(ictx);
pvmrescode = i;
	pvm_freebuf(pvm_setsbuf(sbf));

	pvm_freebuf(mid);
	return 0;
}


/*	pvm_tc_conack()
*
*	Another task replies to our connection request.
*
*	TC_CONACK() {
*		int tdprotocol		// t-d protocol revision number
*		int ack				// 0 ok, 1 denied
*		string sockaddr		// address of other socket
*	}
*/

static int
pvm_tc_conack(mid)
	int mid;
{
	int src;					/* sender of reply */

	pvm_bufinfo(mid, (int *)0, (int *)0, &src);

	pvmlogprintf("pvm_tc_conack() unexpected TC msg from t%x!\n", src);

	pvm_freebuf(mid);
	return 0;
}


/*	pvm_tc_taskexit()
*
*	We are notified that another task (to which we have a direct route)
*	has exited.
*/

static int
pvm_tc_taskexit(mid)
	int mid;
{
	int src;					/* sender of notify */
	struct peer *pp;
	int found_one = 0;			/* did we find a peer? */

	pvm_bufinfo(mid, (int *)0, (int *)0, &src);


	if (!outta_here) {
		int tid;
		extern struct peer *peers;

		pvm_upkint(&tid, 1, 1);

		for (pp = peers->p_link; pp != peers; pp = pp->p_link)
			if (pp->p_tid == tid) {
				pp->p_exited = 1;		/* mark as deleted */
				found_one = 1;
				break;
			}

	}

	if (!found_one) /* then it was unexpected! */
		pvmlogprintf("pvm_tc_taskexit() unexpected TC msg from t%x!\n", src);

	pvm_freebuf(mid);
	return 0;
}


/*	mroute()
*
*   Route a message to a destination.
*   Returns when
*       outgoing message (if any) fully sent and
*       (timed out (tmout) or
*           at least one message fully received)
*   Returns >=0 the number of complete messages downloaded, or
*   negative on error.
*/

int
mroute(mid, dtid, code, tmout)
	int mid;				/* message */
	int dtid;				/* dest */
	int code;				/* type tag */
	struct timeval *tmout;	/* get at least one message */
{
	struct pmsg *txup;			/* tx message or null */
	struct frag *txfp = 0;		/* cur tx frag or null */
	int gotem = 0;				/* count complete msgs downloaded */
	int block;					/* get at least one message */
	int loopcount = 0;
	struct msgboxhdr *inbp;		/* incoming box */
	struct timeval tnow, tstop;
	int sbf;					/* reply to control message */
	int cc;
	int freethis = 0;			/* (control) message came from stack */
	int tstkp = 0;
	int tstk[100];				/* XXX shouldn't be a stack */

	/* XXX do we really have to do this? */
/* 	if ((dtid == TIDPVMD && code == TM_MCA) || dtid == TIDGID) */
/* 		return node_mcast(mid, dtid, code); */

	if (tmout) {
		if (tmout->tv_sec || tmout->tv_usec) {
			pvmgetclock(&tnow);
			tstop.tv_sec = tnow.tv_sec + tmout->tv_sec;
			tstop.tv_usec = tnow.tv_usec + tmout->tv_usec;
			block = 1;
		} else
			block = 0;
	} else {
		block = 1;
		tstop.tv_sec = -1;
		tstop.tv_usec = -1;
	}

	if (txup = midtobuf(mid)) {
		txfp = txup->m_frag->fr_link;
		txfp = txfp->fr_buf ? txfp : 0;
	}

	inbp = (struct msgboxhdr *)pvminbox;

	do {
		if (block && tstop.tv_sec != -1) {
			pvmgetclock(&tnow);
			if (tnow.tv_sec > tstop.tv_sec
			|| (tnow.tv_sec == tstop.tv_sec && tnow.tv_usec >= tstop.tv_usec))
				break;
		}

		if (pvmpolltype == PvmPollSleep
		&& loopcount++ > pvmpolltime && !txfp && tstop.tv_sec == -1) {
			PAGELOCK(&inbp->mb_lock);
			if (inbp->mb_read == inbp->mb_last) {
				inbp->mb_sleep = 1;
				PAGEUNLOCK(&inbp->mb_lock);
				if (peer_wait() == -1)
					return PvmSysErr;
				loopcount = 0;
			} else
				PAGEUNLOCK(&inbp->mb_lock);
		}

		if (inbp->mb_read != inbp->mb_last) {
			if ((sbf = peer_recv(&gotem)) == -1)
				return PvmSysErr;
			if (sbf > 0) {
				if (txfp)
					tstk[tstkp++] = sbf;
				else {
					txup = midtobuf(sbf);
					dtid = txup->m_dst;
					code = txup->m_tag;
					txfp = txup->m_frag->fr_link;
					txfp = txfp->fr_buf ? txfp : 0;
					freethis = 1;
				}
			}
		}

		if (txfp) {
			if ((cc = peer_send(txup, txfp, dtid, code)) < 0)
				return cc;
			if (cc) {
				txfp = txfp->fr_link;
				if (!txfp->fr_buf) {
					if (freethis)
						umbuf_free(txup);
					if (tstkp > 0) {
						txup = midtobuf(tstk[--tstkp]);
						dtid = txup->m_dst;
						code = txup->m_tag;
						txfp = txup->m_frag->fr_link;
						txfp = txfp->fr_buf ? txfp : 0;
						freethis = 1;
					} else
						txfp = 0;
				}
			}
		}

	} while (txfp || (block && !gotem));

	return gotem;
}



/*	peer_recv()
*
*	Receive a message fragment from another task.
*	Reassemble frags into messages and place on rxlist.
*	On completing a control message, call mesg_input(),
*		which consumes it.
*	Returns:
*		0 normally,
*		negative if error,
*		or message handle of a reply control message to be sent.
*/

int
peer_recv(gotem)
	int *gotem;
{
	struct pmsg *rxup;			/* rx message */
	struct pmsg *up;
	struct frag *fp;
	int sdr;
	int src;
	int ff;
	struct peer *pp = 0;
	int next;					/* frag being received */
	struct shmpkhdr *inmsgs;	/* incoming messages */
	struct msgboxhdr *inbp;		/* incoming box */
	int sbf = 0;				/* reply control message */

	inbp = (struct msgboxhdr *)pvminbox;
	inmsgs = (struct shmpkhdr *)(inbp + 1);

	next = (inbp->mb_read + 1) % pvminboxsz;
	sdr = inmsgs[next].ph_sdr;
	fp = fr_new(0);
	if ((sdr & ~pvmtidhmask) != TIDPVMD) {
		if (!(pp = peer_conn(sdr, (int *)0)) || pp == (struct peer *)-1L) {
			sprintf(pvmtxt, "peer_recv() can't connect to sender t%x\n", sdr);
			pvmlogerror(pvmtxt);
			return PvmSysErr;
		}
		fp->fr_dat = pp->p_buf + INBOXPAGE*pgsz + inmsgs[next].ph_dat;
	} else
		fp->fr_dat = pvmdoutbuf + inmsgs[next].ph_dat;
	fp->fr_buf = fp->fr_dat - (inmsgs[next].ph_dat & (pgsz-1))
				+ PVMPAGEHDR;
	fp->fr_max = pvmfrgsiz;
	ff = inmsgs[next].ph_flag;

	src = inmsgs[next].ph_src;
	fp->fr_len = inmsgs[next].ph_len;
	fp->fr_max = pvmfrgsiz;

	if (pvmdebmask & PDMPACKET) {
		sprintf(pvmtxt, "peer_recv() sdr t%x src t%x len %d dst t%x flag %d\n",
			sdr, src, fp->fr_len, inmsgs[next].ph_dst, ff);
		pvmlogerror(pvmtxt);
	}

	/*
	* if start of message, make new umbuf, add to frag pile
	*/
	if (ff & FFSOM) {
		rxup = umbuf_new();
		rxup->m_tag = inmsgs[next].ph_tag;
		rxup->m_ctx = inmsgs[next].ph_ctx;
		rxup->m_enc = inmsgs[next].ph_enc;
		rxup->m_wid = inmsgs[next].ph_wid;
		rxup->m_crc = inmsgs[next].ph_crc;
		rxup->m_src = src;
#ifdef IMA_CSPP
		if (pp && pp->p_node != current_node())
			rxup->m_flag |= MM_DIFFNODE;
#endif
		LISTPUTBEFORE(rxfrag, rxup, m_link, m_rlink);
	}

	/* locate frag's message */

	for (rxup = rxfrag->m_link; rxup != rxfrag; rxup = rxup->m_link)
		if (rxup->m_src == src)
			break;

	if (rxup == rxfrag) {	/* uh oh, no message for it */
		pvmlogerror("peer_recv() frag with no message\n");
		fr_unref(fp);

	} else {
		LISTPUTBEFORE(rxup->m_frag, fp, fr_link, fr_rlink);
		rxup->m_len += fp->fr_len;
	/*
	* if end of message, move to rxlist and count it
	*/
		if (ff & FFEOM) {
			LISTDELETE(rxup, m_link, m_rlink);
#ifdef	MCHECKSUM
			if (rxup->m_crc != umbuf_crc(rxup)) {
				sprintf(pvmtxt,
				"peer_recv() message src t%x tag %d bad checksum\n",
						rxup->m_src, rxup->m_tag);
				pvmlogerror(pvmtxt);
				umbuf_free(rxup);

			} else {
#endif
				pmsg_setenc(rxup, rxup->m_enc);
				mesg_input(rxup);
				(*gotem)++;
#ifdef	MCHECKSUM
			}
#endif
		}
	}
	inbp->mb_read = next;
	return sbf;
}


/*	peer_send()
*
*	Send fragment to another process.
*	Establish connection if necessary, exchange handshake message.
*
*	Returns 1 if sent, 0 otherwise, negative on error.
*/

int
peer_send(txup, txfp, dtid, code)
	struct pmsg *txup;		/* tx message or null */
	struct frag *txfp;		/* cur tx frag or null */
	int dtid;				/* dest */
	int code;				/* type code */
{
	char *cp = 0;				/* points to copy-databuf (if necessary) */
	int ff;
	int loc;					/* location of data in shared segment */
	int next;					/* frag being received */
	struct peer *pp;
	struct shmpkhdr *dmsgs;
	struct msgboxhdr *dboxp;	/* receiving box of peer */

	if (!txfp->fr_u.dab) {
		pvmlogerror("peer_send() PvmDataInPlace not implemented\n");
		return PvmNotImpl;
	}

	/* If its a control message always pass via the daemon */
	/* This avoids the I'cant send to you as you haven't handled my */
	/* tc_shmat handshake *message* yet problem... it was the chicken first */
	/* why? because the daemon is connected as beatask() has worked (maybe) */

	if (((dtid & pvmtidhmask) == (pvmmytid & pvmtidhmask)
	&& (dtid & ~pvmtidhmask) != TIDPVMD) /* to local task */
	&& !(code>=TC_FIRST && code<=TC_LAST) )	/* and not a CTRL TC message */
	{
		int new_connection;

		if (!(pp = peer_conn(dtid, &new_connection)))
			return 0;
		if (pp != (struct peer *)-1L) {
			dboxp = (struct msgboxhdr *)pp->p_buf;
			if (new_connection) {
				struct pmsg *mp;
				int sbf, l;
				struct msgboxhdr *inbp = (struct msgboxhdr *)pvminbox;
				static struct timeval ztv = { 0, 0 };

				sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
				l = TDPROTOCOL;
				pvm_pkint(&l, 1, 1);
				sbf = pvm_setsbuf(sbf);
				inbp->mb_attached = 0;
	/* XXX glggg, could we just return to mroute here instead of calling it? */
				mp = midtobuf(sbf);
				mp->m_ctx = SYSCTX_TC;
				mroute(sbf, dtid, TC_SHMAT, &ztv);
				pvm_freebuf(sbf);
 				while (!inbp->mb_attached)  
					mroute(0, 0, 0, &ztv);		/* avoid deadlock */
				if (inbp->mb_attached != dtid) {
					sprintf(pvmtxt,
						"peer_send: mb_attached: expected %x, got %x\n",
						dtid, inbp->mb_attached);
					pvmlogerror(pvmtxt);
				}
			}

		} else
			dboxp = (struct msgboxhdr *)pvmdinbox;
	} else
		dboxp = (struct msgboxhdr *)pvmdinbox;

	if ((dboxp->mb_last + 1) % pvminboxsz == dboxp->mb_read)
		return 0;		/* full */

	/*
	* if page is private, copy and replace it with one in shared buf
	*/

	if ((loc = txfp->fr_dat - outmsgbuf) > outbufsz * pvmpgsz || loc < 0) {
		if (nbufsowned == outbufsz) {
			int i;

			pvmlogerror("peer_send() Message(s) too long for shared buffer, deadlocked.\n");
			PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
			for (i = 0; i < maxpidtid; i++)
				if (pidtids[i].pt_tid == pvmmytid) {
					pidtids[i].pt_cond = 1;
					break;
				}
			PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
		}

		cp = 0;
		do {
			if (cp)
				da_unref(cp);
			cp = da_new(MAXHDR + txfp->fr_len);
		} while ((loc = cp - outmsgbuf + MAXHDR) > outbufsz*pvmpgsz || loc < 0);

		BCOPY(txfp->fr_dat, cp + MAXHDR, txfp->fr_len);
		txfp->fr_dat = cp + MAXHDR;
		da_unref(txfp->fr_buf);
		txfp->fr_buf = cp;
	}

	if (txfp->fr_rlink == txup->m_frag)
		ff = FFSOM;
	else
		ff = 0;
	if (txfp->fr_link == txup->m_frag)
		ff |= FFEOM;

	if (pvmdebmask & PDMPACKET) {
		sprintf(pvmtxt, "peer_send() dst t%x len %d page %d flag %d\n",
			dtid, txfp->fr_len, loc/pgsz + 1, ff);
		pvmlogerror(pvmtxt);
	}

	dmsgs = (struct shmpkhdr *)(dboxp + 1);
	PAGELOCK(&dboxp->mb_lock);
	next = (dboxp->mb_last + 1) % pvminboxsz;
	/* if receive buffer full, must fail here and try again - sgi:jpb */
	if ( next == dboxp->mb_read ) {		/* full */
		PAGEUNLOCK(&dboxp->mb_lock);
		return 0;
	}
	dmsgs[next].ph_src = pvmmytid;
	dmsgs[next].ph_dst = dtid;
	dmsgs[next].ph_sdr = pvmmytid;
	dmsgs[next].ph_dat = loc;
	dmsgs[next].ph_len = txfp->fr_len;
	dmsgs[next].ph_flag = ff;
	if (ff & FFSOM) {
		dmsgs[next].ph_tag = code;
		dmsgs[next].ph_ctx = txup->m_ctx;
		dmsgs[next].ph_enc = txup->m_enc;
		dmsgs[next].ph_wid = txup->m_wid;
#ifdef	MCHECKSUM
		dmsgs[next].ph_crc = umbuf_crc(txup);
#else
		dmsgs[next].ph_crc = 0;
#endif
		if (pvmdebmask & PDMMESSAGE) {
			sprintf(pvmtxt, "peer_send() dst t%x tag %s ctx %d\n",
				dtid, pvmnametag(dmsgs[next].ph_tag,(int*)0), 
				dmsgs[next].ph_ctx);
			pvmlogerror(pvmtxt);
		} /* logging */

	} else {
		dmsgs[next].ph_tag = 0;
		dmsgs[next].ph_ctx = 0;
		dmsgs[next].ph_enc = 0;
		dmsgs[next].ph_wid = 0;
		dmsgs[next].ph_crc = 0;
	}
	da_ref(txfp->fr_buf);
	dboxp->mb_last = next;

	if (dboxp != (struct msgboxhdr *)pvmdinbox && dboxp->mb_sleep) {

/* #if	defined(IMA_SUNMP) || defined(IMA_RS6KMP) || defined(IMA_AIX4MP) || defined(IMA_AIX5MP) */
#ifdef  PVMUSEMUTEX
#ifdef	IMA_SUNMP
		cond_signal(&dboxp->mb_cond);
#endif
#if defined(IMA_RS6KMP) || defined(IMA_AIX4MP) || defined(IMA_AIX5MP)
		pthread_cond_signal(&dboxp->mb_cond);
#endif
#else
		peer_wake(pp);
#endif /* PVMUSEMUTEX */
		dboxp->mb_sleep = 0;
	}

	/* wake up pvmd */

	if (dboxp == (struct msgboxhdr *)pvmdinbox
	&& (dboxp->mb_last + pvminboxsz - 1) % pvminboxsz == dboxp->mb_read) {
		PAGEUNLOCK(&dboxp->mb_lock);
		(void)prodpvmd();
	} else
		PAGEUNLOCK(&dboxp->mb_lock);

	return 1;
}


int
node_mcast(mid, dtid, code)
	int mid;	/* message id */
	int dtid;	/* destination */
	int code;	/* type */
{
	static int *tids = 0;	/* intended recipients of multicast message */
	static int ntids = -1;
	static int ntask;		/* number of tids */
	static struct timeval ztv = { 0, 0 };

	int i;
	long count = 0;
	int cc = 0;
	int dummy;

	/* intercept multicast info */

	if (dtid == TIDPVMD) {
		int sbf = mid;

		pvm_setrbuf(mid);
		pvm_upkint(&ntask, 1, 1);
		if (ntask > ntids) {
			if (tids)
				PVM_FREE(tids);
			tids = TALLOC(ntask, int, "tids");
			ntids = ntask;
		}
		pvm_upkint(tids, ntask, 1);
		/* sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo)); */
		pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
		dummy = TIDGID;
		pvm_pkint(&dummy, 1, 1);
		pvm_setrbuf(pvm_setsbuf(sbf));
		return 0;
	}

	for (i = 0; i < ntask; i++)
		if (tids[i] != pvmmytid)
			cc = mroute(mid, tids[i], code, &ztv);

	ntask = 0;

	return cc;
}


/*	msendrecv()
*
*	Single op to send a system message (usually to our pvmd) and get
*	the reply.
*	Returns message handle or negative if error.
*/

int
msendrecv(other, code, context)
	int other;				/* dst, src tid */
	int code;				/* message tag */
	int context;            /* context to use for the message */
{
	static int nextwaitid = 1;	/* I hope */

	int cc;
	struct pmsg *up;

	if (!pvmsbuf)
		return PvmNoBuf;

	/* send code to other */
	if (pvmdebmask & PDMMESSAGE) {
		sprintf(pvmtxt, "msendrecv() to t%x tag %s\n", other,
				pvmnametag(code, (int *)0));
		pvmlogerror(pvmtxt);
	}

   pvmsbuf->m_wid = nextwaitid++;
   pvmsbuf->m_ctx = context;

   if ((cc = mroute(pvmsbuf->m_mid, other, code, (struct timeval *)0)) < 0)
		return cc;

	/* Oops now what? */
	/* 	if (code == TM_MCA)		 */
		/* for node_mcast() */
	/* 		return 1; */

	/* recv code from other */
	for (up = pvmrxlist->m_link; 1; up = up->m_link) {
		if (up == pvmrxlist) {
			up = up->m_rlink;
			if ((cc = mroute(0, 0, 0, (struct timeval *)0)) < 0)
				return cc;
			up = up->m_link;
		}

		if (pvmdebmask & PDMMESSAGE) {
			sprintf(pvmtxt, "msendrecv() from t%x tag %s\n",
					up->m_src, pvmnametag(up->m_tag, (int *)0));
			pvmlogerror(pvmtxt);
		}
		if (up->m_src == other && up->m_tag == code)
			break;
	}
	LISTDELETE(up, m_link, m_rlink);
	if (pvmrbuf)
		umbuf_free(pvmrbuf);
	pvmrbuf = 0;
	if (cc = pvm_setrbuf(up->m_mid))
		return cc;
	return up->m_mid;
}


static void
catch_kill(sig)
	int sig;
{
	signal(SIGTERM, pvmoldtermhdlr);
	if (pvmoldtermhdlr && pvmoldtermhdlr != SIG_IGN)
		pvmoldtermhdlr(sig);
	/* XXX yes, i know the table isn't locked.  we're in a sighandler */
	if (pidtids && pidtids[mypidtid].pt_stat != ST_EXIT)
		pidtids[mypidtid].pt_stat = ST_FINISH;
	pvmendtask();
	exit(sig);
}


static int
read_int(val)
	int *val;
{
	return *val;
}


/*	pvmbeatask()
*
*	Initialize libpvm, config process as a task.
*	This is called as the first step of each libpvm function so no
*	explicit initialization is required.
*
*	Returns 0 if okay, else error code.
*
*	XXX needs work.  lots of inconsistent state is left on error return.
*	I'm working on it
*/

int
pvmbeatask()
{
	struct shmpkhdr *inmsgs;
	struct pidtidhdr *pvminfo;
	struct msgboxhdr *dboxp;		/* receiving box of pvmd */
	int next;
	int altpid;						/* pid of ancestor forked by pvmd */
	int msgcnt;
	char *msgbuf;					/* my message buffer */
	int bufid;
	int i;
	int pid;
	int bid;
	int rc;
	char *p;
	struct pvmminfo minfo;
	int cc = 0;
	int sbf = 0, rbf = 0;			/* saved rx and tx message handles */
	int outtid, outctx, outtag;
	int trctid, trcctx, trctag;
	int need_trcinfo = 0;
	int new_tracer = 0;
	char tmask[ 2 * TEV_MASK_LENGTH ];
	int tbuf, topt;
	int mid;
/*
	union semun {
		int val;
		struct semid_ds *buf;
		ushort *array;
	} sunion;
*/
#ifdef LOG
	char fname[32];
#endif
	int key, firstkeytried;
	int mytid;
	char *pvmtmp;
	TEV_DECLS

	if (pvmmytid != -1)
		return 0;

	pvmmydsig = pvmgetdsig();

	TEV_EXCLUSIVE;

	if ((pvm_useruid = getuid()) == -1) {
		pvmlogerror("pvmbeatask() can't getuid()\n");
		return PvmSysErr;
	}

	if (p = getenv("PVMTASKDEBUG")) {
		pvmdebmask = pvmxtoi(p);
		if (pvmdebmask) {
			sprintf(pvmtxt,"task debug mask is 0x%x\n", pvmdebmask);
			pvmlogerror(pvmtxt);
		}
	}

	pvmmyupid = getpid();

#ifdef LOG
#ifdef IMA_CSPP
	int scid = get_scid();

	pvmtmp = pvmgettmp();

	if (scid > 1)
		sprintf(fname, "%s/pvmt.%d.%d", pvmtmp, pvm_useruid, scid);
	else
#endif
		sprintf(fname, "%s/pvmt.%d", pvmtmp, pvm_useruid);
		if ((logfp = fopen(fname, "a")) == NULL)
			pvmlogerror("pvmbeatask() can't open log file\n");
#endif

	/*
	* get expected pid from environment in case we were started by
	* the pvmd and someone forked again
	*/

	if (p = getenv("PVMEPID"))
		altpid = atoi(p);
	else
		altpid = 0;

	pgsz = sysconf(_SC_PAGESIZE);
	pvmpgsz = FRAGPAGE*pgsz;
	pvmfrgsiz = pvmpgsz - PVMPAGEHDR;
	pvminboxsz =
		(INBOXPAGE*pgsz - sizeof(struct msgboxhdr))/sizeof(struct shmpkhdr);

	/*
	*	initialize received-message list and fragment reassembly list
	*/

	rxfrag = pmsg_new(1);
	BZERO((char*)rxfrag, sizeof(struct pmsg));
	rxfrag->m_link = rxfrag->m_rlink = rxfrag;

	pvmrxlist = pmsg_new(1);
	BZERO((char*)pvmrxlist, sizeof(struct pmsg));
	pvmrxlist->m_link = pvmrxlist->m_rlink = pvmrxlist;

	peer_init();

	/*
	* SIGTERM handler to clean up our shared memory
	*/

	if (pvmsettermhdlr) {
		pvmoldtermhdlr = signal(SIGTERM, catch_kill);
		pvmsettermhdlr = 0;
	}

	/*
	* get pvmd's message buffer, check protocol revision
	*/

	key = pvmshmkey(0);
	if ((bufid = shmget((key_t)key, 0, PERMS)) == -1) {
		pvmlogperror("pvmbeatask() shmget: can't connect to pvmd");
		return PvmSysErr;
	}
	if ((pvmdinbox = (char *)shmat(bufid, 0, 0)) == (char *)-1L) {
		pvmlogperror("pvmbeatask() shmat pvmd");
		return PvmSysErr;
	}
	infopage = pvmdinbox + INBOXPAGE*pgsz;
	pvmdoutbuf = infopage + pgsz;

	pvminfo = (struct pidtidhdr *)(infopage + PVMPAGEHDR);

	while ( read_int( (int *) &(pvminfo[0]) ) == 0 ) {
		pvmsleep(1);
		if (pvmdebmask & PDMMEM)
			pvmlogerror("Waiting for pvmd to set protocol\n");
	}
	if (pvminfo->i_proto != TDPROTOCOL) {
		sprintf(pvmtxt, "beatask() t-d protocol mismatch (%d/%d)\n",
			TDPROTOCOL, *((int *) &(pvminfo[0])));
		pvmlogerror(pvmtxt);
		return PvmSysErr;
	}

	/*
	* send it a request for connection/task assignment
	*/

	dboxp = (struct msgboxhdr *)pvmdinbox;
	inmsgs = (struct shmpkhdr *)(dboxp + 1);
	PAGELOCK(&dboxp->mb_lock);
	while ((next = (dboxp->mb_last + 1) % pvminboxsz) == dboxp->mb_read) ;
	/* XXX yuck, overloading these fields */
	inmsgs[next].ph_src = pvmmyupid;
	inmsgs[next].ph_dst = altpid;
	inmsgs[next].ph_dat = -1;
	inmsgs[next].ph_sdr = 0;
	inmsgs[next].ph_len = 0;
	inmsgs[next].ph_flag = 0;
	inmsgs[next].ph_tag = 0;
	inmsgs[next].ph_enc = 0;
	inmsgs[next].ph_wid = 0;
	inmsgs[next].ph_crc = 0;
	dboxp->mb_last = next;
	PAGEUNLOCK(&dboxp->mb_lock);

	pvmdpid = pvminfo->i_dpid;
	(void)getdsock();

	PAGELOCK(&dboxp->mb_lock);
	if ((next - 1) % pvminboxsz == dboxp->mb_read) {
		PAGEUNLOCK(&dboxp->mb_lock);
		(void)prodpvmd();
	} else
		PAGEUNLOCK(&dboxp->mb_lock);


	/*
	* get global parameters from pvmd buffer
	*/

/*
	pvmmydsig = pvminfo->i_dsig;
*/
	shmbufsiz = pvminfo->i_bufsiz;
	outbufsz = (shmbufsiz - INBOXPAGE*pgsz)/pvmpgsz;
	nbufsowned = 0;
	pidtids = (struct pidtid *)(pvminfo + 1);
	maxpidtid = (pgsz - sizeof(struct pidtidhdr) - PVMPAGEHDR)/sizeof(struct pidtid);

	/*
	* wait for pvmd to write us an entry in pidtid table
	*/

	pid = altpid ? altpid : pvmmyupid;
	mytid = -1;
	while (mytid == -1) {
		int ntids;			/* number of entries in pid-tid table */

		PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
		ntids = min(maxpidtid, ((struct shmpghdr *)infopage)->pg_ref);
		for (i = 0; i < ntids; i++)
			if (pidtids[i].pt_pid == pid) {
				mytid = pidtids[i].pt_tid;
				pvmmyptid = pidtids[i].pt_ptid;
				/* pidtids[i].pt_pid = pvmmyupid; */
				mypidtid = i;
				break;
			}
		PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
		if (mytid==-1) { /* not entered in pvmd info page yet */
			prodpvmd();	/* kick daemon out of select() in work() */
			sleep_dammit( 10000 );	/* prevent peer page lock race */
									/* on info page */
		}
	}


	/*
	* create shared memory segment (and semaphore)
	* if we can't get the first key, keep trying others
	* XXX this could be moved back to after the protocol num check.
	*/

	firstkeytried = key = pvmshmkey(getpid());
	while (1) {

/* moved here from down there */
myshmbufid = -1;
errno = ENOSPC;
/* static int once = 0; */
/* if (once > 3) */
		myshmbufid = shmget((key_t)key, shmbufsiz, IPC_CREAT|IPC_EXCL|PERMS);
/* else { */
/* myshmbufid = -1; */
/* errno = ENOSPC; */
/* once++; */
/* } */
		if (myshmbufid == -1) {
			if (errno != EACCES && errno != EEXIST) {
				pvmlogperror("pvmbeatask() shmget");
				return PvmSysErr;
			}

		} else {
#ifdef	USERECVSEMAPHORE
			mysemid = semget((key_t)key, 1, IPC_CREAT|IPC_EXCL|PERMS);
			if (mysemid == -1) {
				if (errno != EACCES && errno != EEXIST) {
					pvmlogperror("pvmbeatask() semget");
					shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0);
					return PvmSysErr;

				} else {
					shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0);
				}

			} else {
/*
				sunion.val = 0;
				if (semctl(mysemid, 0, SETVAL, sunion) == -1) {
					pvmlogperror("pvmbeatask() semctl SETVAL");
					return PvmSysErr;
				}
*/
				break;
			}
#else
			break;
#endif
		}

		key = nextpvmshmkey(key);
		if (key == firstkeytried) {
			pvmlogerror("pvmbeatask() can't find a free key!\n");
			return PvmSysErr;
		}
	}

#ifdef IMA_CSPP
	if ((pvminbox = (char *)shm_search(myshmbufid)) == (char *)-1L)
#else
	if ((pvminbox = (char *)shmat(myshmbufid, 0, 0)) == (char *)-1L)
#endif
	{
		pvmlogperror("pvmbeatask() shmat");
		shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0);
		return PvmSysErr;
	}

	outmsgbuf = pvminbox + INBOXPAGE*pgsz;
	msgbufinit(pvminbox);
	/* XXX PAGELOCK(pvminfo); */
	pidtids[mypidtid].pt_key = key;
	pidtids[mypidtid].pt_stat = ST_SHMEM;
#ifdef IMA_CSPP
	pidtids[mypidtid].pt_node = current_node();
#endif
	/* XXX PAGEUNLOCK(pvminfo); */

	pvmmytid = mytid;

	/* OK..
	 * Daemon has been prodded... so lets give it our Unix pid via
	 * the socket so it can match the socket to the process.
	 * That way, when the socket goes EOF pvmd knows which process
	 * it was....
	 * Note we call it this late so that pmsg buffers are available.
	 * Although the message is constructed in shmem its sent via the
	 * socket!
	 */

	send_my_pid();


	/*
	*	Request task trace/output paramters from daemon
	*/

	sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
	rbf = pvm_setrbuf(0);
	if ((cc = msendrecv(TIDPVMD, TM_GETOPT, SYSCTX_TM)) > 0) {
		pvm_upkint(&rc, 1, 1);	/* throw out tid, ptid */
		pvm_upkint(&rc, 1, 1);

		pvm_upkint(&outtid, 1, 1);
		pvm_upkint(&outctx, 1, 1);
		pvm_upkint(&outtag, 1, 1);
		if (!pvmtrc.outtid) {
			pvmtrc.outtid = outtid;
			pvmtrc.outctx = outctx;
			pvmtrc.outtag = outtag;
			pvmctrc.outtid = pvmtrc.outtid;
			pvmctrc.outctx = pvmtrc.outctx;
			pvmctrc.outtag = pvmtrc.outtag;
		}

		pvm_upkint(&trctid, 1, 1);
		pvm_upkint(&trcctx, 1, 1);
		pvm_upkint(&trctag, 1, 1);
		if (!pvmtrc.trctid) {
			pvmtrc.trctid = trctid;
			pvmtrc.trcctx = trcctx;
			pvmtrc.trctag = trctag;
			pvmctrc.trctid = pvmtrc.trctid;
			pvmctrc.trcctx = pvmtrc.trcctx;
			pvmctrc.trctag = pvmtrc.trctag;
			new_tracer++;
		}

		cc = 0;
	}
	pvm_freebuf(pvm_setrbuf(rbf));
	pvm_freebuf(pvm_setsbuf(sbf));

	if (p = getenv("PVMCTX"))
		pvmmyctx = pvmstrtoi(p);

	/* get trace mask from envar or zero it */

	if ( (p = getenv("PVMTMASK")) ) {
		if ( strlen(p) + 1 == TEV_MASK_LENGTH )
			BCOPY(p, pvmtrc.tmask, TEV_MASK_LENGTH);
		else
			TEV_MASK_INIT(pvmtrc.tmask);
	} else {
		TEV_MASK_INIT(pvmtrc.tmask);
		if ( new_tracer ) need_trcinfo++;
	}

	BCOPY(pvmtrc.tmask, pvmctrc.tmask, TEV_MASK_LENGTH);

	/* get trace buffering from envar */

	if ((p = getenv("PVMTRCBUF")))
		pvmtrc.trcbuf = atoi( p );
	else {
		pvmtrc.trcbuf = 0;
		if ( new_tracer ) need_trcinfo++;
	}

	pvmctrc.trcbuf = pvmtrc.trcbuf;

	/* get trace options from envar */

	if ((p = getenv("PVMTRCOPT")))
		pvmtrc.trcopt = atoi( p );
	else {
		pvmtrc.trcopt = 0;
		if ( new_tracer ) need_trcinfo++;
	}

	pvmctrc.trcopt = pvmtrc.trcopt;

	/* Setup waitc structures */
	wait_init(pvmmytid, TIDLOCAL);


	BZERO(&minfo, sizeof(minfo));
	minfo.src = -1;
	minfo.ctx = SYSCTX_TC;
	minfo.tag = TC_SHMAT;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_shmat);
	minfo.tag = TC_CONREQ;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_conreq);
	minfo.tag = TC_CONACK;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_conack);
	minfo.tag = TC_TASKEXIT;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_taskexit);
	minfo.tag = TC_NOOP;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_noop);
	minfo.tag = TC_SETTRACE;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrace);
	minfo.tag = TC_SETTRCBUF;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrcbuf);
	minfo.tag = TC_SETTRCOPT;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrcopt);
	minfo.tag = TC_SETTMASK;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settmask);
	minfo.tag = TC_SIBLINGS;
	pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_siblings);

	if ( need_trcinfo )
	{
		rbf = pvm_setrbuf( 0 );

		if ( pvm_recvinfo( PVMTRACERCLASS, 0, PvmMboxDefault ) > 0 )
		{
			pvm_upkint(&trctid, 1, 1);

			pvm_upkint(&trcctx, 1, 1);
			pvm_upkint(&trctag, 1, 1);

			pvm_upkint(&outctx, 1, 1);  /* unused here */
			pvm_upkint(&outtag, 1, 1);  /* unused here */

			pvm_upkstr(tmask);

			pvm_upkint(&tbuf, 1, 1);
			pvm_upkint(&topt, 1, 1);

			if ( pvmtrc.trctid == trctid && pvmtrc.trcctx == trcctx
					&& pvmtrc.trctag == trctag )
			{
				if ( strlen(tmask) + 1 == TEV_MASK_LENGTH ) {
					BCOPY(tmask, pvmtrc.tmask, TEV_MASK_LENGTH);
					BCOPY(pvmtrc.tmask, pvmctrc.tmask, TEV_MASK_LENGTH);
				}

				pvmtrc.trcbuf = tbuf;
				pvmctrc.trcbuf = pvmtrc.trcbuf;

				pvmtrc.trcopt = topt;
				pvmctrc.trcopt = pvmtrc.trcopt;
			}

			pvm_freebuf(pvm_setrbuf(rbf));
		}

		else
			pvm_setrbuf(rbf);
	}

	tev_init();

	if (TEV_AMEXCL) {
		TEV_ENDEXCL;
	}

	return cc;
}


/* XXX shouldn't clean up unless pvmmytid set?  or at least should check. */

int
pvmendtask()
{
	int i;
	struct shmid_ds shmds;
	struct pmsg *up;
	int sbf;
	static struct timeval ztv = { 0, 0 };
	struct peer *pp;
	extern struct peer *peers;
	struct pmsg *mp;

	/*
	* free any left-over messages.
	*/

	pvmsbuf = 0;
	pvmrbuf = 0;
	for (i = 1; i < pvmmidhsiz; i++)
		if (up = pvmmidh[i].m_umb)
			umbuf_free(up);

	/*
	* notify all connected tasks that we are exiting
	*/

	outta_here = 1;

	if (peers) {
		sbf = pvm_initsend(PvmDataFoo);
		mp = midtobuf(sbf);
		mp->m_ctx = SYSCTX_TC;
		pvm_pkint(&pvmmytid, 1, 1);

		for (pp = peers->p_link; pp != peers; pp = pp->p_link)
			if (pp->p_tid && pp->p_tid != pvmmytid)
				mroute(sbf, pp->p_tid, TC_TASKEXIT, &ztv);

		pvm_freebuf(sbf);
	}

	if (pidtids) {
		PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
		pidtids[mypidtid].pt_stat = ST_EXIT;
		PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
	}

	shmdt(pvminbox);
	if (shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0) == -1)
		pvmlogperror("pvmendtask() shmctl RMID");
#ifdef USERECVSEMAPHORE
	if (semctl(mysemid, 0, IPC_RMID) == -1)
		pvmlogperror("pvmendtask() semctl RMID");
	mysemid = -1;
#endif
	pvminbox = 0;
	shmdt(pvmdinbox);
	pvmdinbox = 0;
	infopage = 0;
	pvmdoutbuf = 0;
	pidtids = 0;
	mypidtid = -1;

	peer_cleanup();

	pvmmytid = -1;
	if (pvmdsock != -1) {
		(void)close(pvmdsock);
		pvmdsock = -1;
	}

	/* XXX free rxfrag and rxlist */
#ifdef LOG
	fclose(logfp);
#endif

	return 0;
}


/*	check_for_exit()
*
*	If peer struct for task id marked as exited, check for messages
*	in heap from the task.  If none (borrowing no pages), detach.
*/

void
check_for_exit(src)
	int src;
{
	extern struct peer *peers;
	struct peer *pp;

	for (pp = peers->p_link; pp != peers; pp = pp->p_link)
		if (pp->p_tid == src) {
			if (pp->p_exited) {
				int i;
				int detach = 1;
				struct pmsg *up;

				for (i = 1; i < pvmmidhsiz; i++) {
					if ((up = pvmmidh[i].m_umb) && (up->m_src == src)) {
						detach = 0;
						break;
					}
				}

				if (detach) {
					peer_detach(pp);
				}
			}
			break;
		}
}


/************************
 **  Libpvm Functions  **
 **                    **
 ************************/


int
pvm_getfds(fds)		/* XXX this function kinda sucks */
	int **fds;			/* fd list return */
{
	int cc;

	cc = PvmNotImpl;
	return (cc < 0 ? lpvmerr("pvm_getfds", cc) : cc);
}


int
pvm_start_pvmd(argc, argv, block)
	int argc;		/* number of args to pass to pvmd (>= 0) */
	char **argv;	/* args for pvmd or null */
	int block;		/* if true, don't return until add hosts are started */
{
	char *sfn;
	struct stat sb;
	int cc;
	char *fn;			/* file to exec */
	char **av;
	int pfd[2];
	int n;
	FILE *ff;
	char buf[128];
	int x;
	TEV_DECLS

	if (TEV_EXCLUSIVE) {
		if (pvmmytid != -1
				&& TEV_DO_TRACE(TEV_START_PVMD,TEV_EVENT_ENTRY)) {
			TEV_PACK_INT( TEV_DID_BF, TEV_DATA_SCALAR, &block, 1, 1 );
			TEV_PACK_STRING( TEV_DID_AS, TEV_DATA_ARRAY,
				argv, argc, 1 );
			TEV_FIN;
		}
	}

	if (argc < 0 || !argv)
		argc = 0;

	if ((pvm_useruid = getuid()) == -1) {
		pvmlogerror("can't getuid()\n");
		cc = PvmSysErr;
		goto bail;
	}

	if (!(sfn = pvmdsockfile())) {
		pvmlogerror("pvm_start_pvmd() pvmdsockfile() failed\n");
		cc = PvmSysErr;
		goto bail;
	}

	if (stat(sfn, &sb) != -1) {
		cc = PvmDupHost;
		goto bail;
	}

#ifdef	IMA_TITN
	if (socketpair(AF_UNIX, SOCK_STREAM, 0, pfd) == -1)
#else
	if (pipe(pfd) == -1)
#endif
	{
		cc = PvmSysErr;
		goto bail;
	}

	fn = pvmgetpvmd();

	av = TALLOC(argc + 2, char *, "argv");
	if (argc > 0)
		BCOPY((char *)&argv[0], (char *)&av[1], argc * sizeof(char*));
	av[0] = fn;
	av[argc + 1] = 0;

	if (!fork()) {
		(void)close(pfd[0]);
	/* fork again so the pvmd is not the child - won't have to wait() for it */
		if (!fork()) {
			if (pfd[1] != 1)
				dup2(pfd[1], 1);
			for (n = getdtablesize(); n-- > 0; )
				if (n != 1)
					(void)close(n);
			(void)open("/dev/null", O_RDONLY, 0);	/* should be 0 */
			(void)open("/dev/null", O_WRONLY, 0);	/* should be 2 */
			(void)signal(SIGINT, SIG_IGN);
			(void)signal(SIGQUIT, SIG_IGN);
			(void)signal(SIGTSTP, SIG_IGN);
			execvp(av[0], av);
		}
		_exit(0);
	}
	(void)close(pfd[1]);
	(void)wait(0);
	PVM_FREE(av);

	if (!(ff = fdopen(pfd[0], "r"))) {
		cc = PvmSysErr;
		(void)close(pfd[0]);
		goto bail;
	}

	strcpy(buf, "PVMSOCK=");
	n = strlen(buf);
	if (!fgets(buf + n, sizeof(buf) - n - 1, ff)) {
		cc = PvmCantStart;
		fclose(ff);
		goto bail;
	}
	fclose(ff);
	if (strlen(buf + n) < 2) {
		cc = PvmCantStart;
		goto bail;
	}
	n = strlen(buf);
	if (buf[n - 1] == '\n')
		buf[n - 1] = 0;
	pvmputenv(STRALLOC(buf));
/*
	fprintf(stderr, "pvm_start_pvmd: %s\n", buf);
*/

	if (cc = BEATASK)
		goto bail;

	if (block) {
		struct pvmhostinfo *hip;
		int t = 1;

		pvm_config((int*)0, (int*)0, &hip);
		while ((cc = pvm_addhosts(&hip[0].hi_name, 1, (int*)0)) == PvmAlready) {
			pvmsleep(t);
			if (t < 8)
				t *= 2;
		}
		if (cc != PvmDupHost)
			goto bail;
		cc = BEATASK;
	}

bail:

	if (TEV_AMEXCL) {
		if (TEV_DO_TRACE(TEV_START_PVMD,TEV_EVENT_EXIT)) {
			TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
			TEV_FIN;
		}
		TEV_ENDEXCL;
	}

	return (cc < 0 ? lpvmerr("pvm_start_pvmd", cc) : 0);
}


int
pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen)
	int tid;
	int tag;
	void *cp;
	int len;
	int dt;
	int *rtid;
	int *rtag;
	int *rlen;
{
	static int last_rbf = 0;

	int nb, mc, src;
	int rbf;
	int cc = 0;
	long ad;
	TEV_DECLS

	if (TEV_EXCLUSIVE) {
		if (TEV_DO_TRACE(TEV_PRECV,TEV_EVENT_ENTRY)) {
			TEV_PACK_INT( TEV_DID_RST, TEV_DATA_SCALAR, &tid, 1, 1 );
			TEV_PACK_INT( TEV_DID_RMC, TEV_DATA_SCALAR, &tag, 1, 1 );
			TEV_PACK_INT( TEV_DID_RCX, TEV_DATA_SCALAR,
					&pvmmyctx, 1, 1 );
			ad = (long)cp;
			TEV_PACK_LONG( TEV_DID_PDA, TEV_DATA_SCALAR, &ad, 1, 1 );
			TEV_PACK_INT( TEV_DID_PC, TEV_DATA_SCALAR, &len, 1, 1 );
			TEV_PACK_INT( TEV_DID_PDT, TEV_DATA_SCALAR, &dt, 1, 1 );
			TEV_FIN;
		}
	}

	switch (dt) {

	case PVM_BYTE:
		len *= sizeof(char);
		break;

	case PVM_SHORT:
	case PVM_USHORT:
		len *= sizeof(short);
		break;

	case PVM_INT:
	case PVM_UINT:
		len *= sizeof(int);
		break;

	case PVM_LONG:
	case PVM_ULONG:
		len *= sizeof(long);
		break;

	case PVM_FLOAT:
		len *= sizeof(float);
		break;

	case PVM_CPLX:
		len *= sizeof(float) * 2;
		break;

	case PVM_DOUBLE:
		len *= sizeof(double);
		break;

	case PVM_DCPLX:
		len *= sizeof(double) * 2;
		break;

	case PVM_STR:
		cc = PvmNotImpl;
		break;

	default:
		cc = PvmBadParam;
		break;
	}

	if (!cc) {
		if (last_rbf > 0) {
			pvm_freebuf(last_rbf);
			last_rbf = 0;
		}
		rbf = pvm_setrbuf(0);
		cc = pvm_recv(tid, tag);
		if (cc > 0) {
			pvm_bufinfo(cc, &nb, &mc, &src);
			if (rlen)
				*rlen = nb;
			if (nb < len)
				len = nb;
			if (rtag)
				*rtag = mc;
			if (rtid)
				*rtid = src;
			pvm_upkbyte((char *)cp, len, 1);
			last_rbf = cc;
			cc = 0;
		}
		pvm_setrbuf(rbf);
	}

	if (TEV_AMEXCL) {
		if (TEV_DO_TRACE(TEV_PRECV,TEV_EVENT_EXIT)) {
			TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
			if ( cc < 0 )
				nb = mc = src = -1;
			TEV_PACK_INT( TEV_DID_MNB, TEV_DATA_SCALAR, &nb, 1, 1 );
			TEV_PACK_INT( TEV_DID_MC, TEV_DATA_SCALAR, &mc, 1, 1 );
			TEV_PACK_INT( TEV_DID_MCX, TEV_DATA_SCALAR,
					&pvmmyctx, 1, 1 );
			TEV_PACK_INT( TEV_DID_SRC, TEV_DATA_SCALAR, &src, 1, 1 );
			TEV_FIN;
		}
		TEV_ENDEXCL;
	}

	if (cc < 0)
		lpvmerr("pvm_precv", cc);
	return cc;
}


/* find dynamic buffer */
char *
dynbuf(tid, len)
	int tid;
	int len;
{
	struct peer *pp;
	int fd;
	char fname[32];
	struct shmpghdr *ph;
	char *pvmtmp;

	while (!(pp = peer_conn(tid, (int *)0)))
		;
/* XXX this doesn't check for peer_conn returning -1 */
	if (len > SHMBUFSIZE && len > pp->p_dlen && pp->p_dbuf) {
		munmap((caddr_t)pp->p_dbuf, SHMBUFSIZE);
		pp->p_dbuf = 0;
	}

	if (!(ph = (struct shmpghdr *)pp->p_dbuf)) {
		pvmtmp = pvmgettmp();
		sprintf(fname, PVMSHMFILE, pvmtmp, tid);
		if ((fd = open(fname, O_CREAT|O_RDWR, 0600)) == -1 ||
		(pp->p_dbuf = (char *)mmap(0, max(len,SHMBUFSIZE), PROT_READ|PROT_WRITE,
#if defined(IMA_SGIMP) || defined(IMA_SGIMP6) || defined(IMA_SGIMP64)
		MAP_SHARED|MAP_AUTOGROW, fd, 0)) == (char *)-1L)
#else
		MAP_SHARED, fd, 0)) == (char *)-1)
#endif
		{
			pvmlogperror(fname);
			return (char *)-1L;
		}
#ifdef IMA_SUNMP
		/* fill buffer with 0's */
		lseek(fd, len - 1, SEEK_SET);
		write(fd, fname, 1);
#endif
		close(fd);
		pp->p_dlen = len;
	} else if (ph->pg_ref) {	/* previous msg has not been recv'd */
		while (ph->pg_ref)
			if (peer_wait() == -1)
				return (char *)-1L;
#ifdef IMA_SUNMP
	} else if (pp->p_dlen < len) {
		if ((fd = open(fname, O_CREAT|O_RDWR, 0600)) == -1) {
			pvmlogperror(fname);
			return (char *)-1L;
		}
		lseek(fd, len - 1, SEEK_SET);
		write(fd, fname, 1);
		close(fd);
		pp->p_dlen = len;
#endif
	}

	return pp->p_dbuf;
}

int
pvm_psend(tid, tag, cp, len, dt)
	int tid;
	int tag;
	void *cp;
	int len;
	int dt;
{
	int sbf;
	int cc = 0;
	long ad;
	TEV_DECLS

	if (TEV_EXCLUSIVE) {
		if (TEV_DO_TRACE(TEV_PSEND,TEV_EVENT_ENTRY)) {
			TEV_PACK_INT( TEV_DID_DST, TEV_DATA_SCALAR, &tid, 1, 1 );
			TEV_PACK_INT( TEV_DID_MC, TEV_DATA_SCALAR, &tag, 1, 1 );
			TEV_PACK_INT( TEV_DID_MCX, TEV_DATA_SCALAR,
					&pvmmyctx, 1, 1 );
			ad = (long)cp;
			TEV_PACK_LONG( TEV_DID_PDA, TEV_DATA_SCALAR, &ad, 1, 1 );
			TEV_PACK_INT( TEV_DID_PC, TEV_DATA_SCALAR, &len, 1, 1 );
			TEV_PACK_INT( TEV_DID_PDT, TEV_DATA_SCALAR, &dt, 1, 1 );
			TEV_FIN;
		}
	}

	switch (dt) {

	case PVM_BYTE:
		len *= sizeof(char);
		break;

	case PVM_SHORT:
	case PVM_USHORT:
		len *= sizeof(short);
		break;

	case PVM_INT:
	case PVM_UINT:
		len *= sizeof(int);
		break;

	case PVM_LONG:
	case PVM_ULONG:
		len *= sizeof(long);
		break;

	case PVM_FLOAT:
		len *= sizeof(float);
		break;

	case PVM_CPLX:
		len *= sizeof(float) * 2;
		break;

	case PVM_DOUBLE:
		len *= sizeof(double);
		break;

	case PVM_DCPLX:
		len *= sizeof(double) * 2;
		break;

	case PVM_STR:
		cc = PvmNotImpl;
		break;

	default:
		cc = PvmBadParam;
		break;
	}

	if (!cc) {
#if 0
		if ((tid & pvmtidhmask) == (pvmmytid & pvmtidhmask)
		&& (tid & ~pvmtidhmask) != TIDPVMD) {		/* to local task */
			char *dbuf;

			len += sizeof(struct shmpghdr);
			if ((dbuf = dynbuf(tid, len)) != (char *)-1L) {
				BCOPY(cp, dbuf, len);

			} else
				cc = PvmSysErr;

		} else
#endif /*0*/
		{
			sbf = pvm_setsbuf(pvm_mkbuf(PvmDataRaw));
			pvm_pkbyte((char *)cp, len, 1);
			if ((cc = pvm_send(tid, tag)) > 0)
				cc = 0;
			pvm_freebuf(pvm_setsbuf(sbf));
		}
	}

	if (TEV_AMEXCL) {
		if (TEV_DO_TRACE(TEV_PSEND,TEV_EVENT_EXIT)) {
			TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
			TEV_FIN;
		}
		TEV_ENDEXCL;
	}

	if (cc < 0)
		lpvmerr("pvm_psend", cc);
	return cc;
}


/* XXX copied from lpvm.c as needed by pvmmcast */
/* XXX duplicate code should be cleaned up in a support lpvm* file [GEF97] */

static int
int_compare(i, j)
#if defined(IMA_SGI) || defined(IMA_SGI5) || defined(IMA_SGI6) || defined(IMA_SGI64) || defined(IMA_SGIMP) || defined(IMA_SGIMP6) || defined(IMA_SGIMP64) || defined(IMA_SUN4SOL2)
	const void *i, *j;
#else
	void *i, *j;
#endif
{
	return *((int *)i) - *((int *)j);
}


/* XXX copied from lpvm.c as needed by pvmmcast again */
/* XXX duplicate code should be cleaned up in a support lpvm* file [GEF97] */
/* XXX, XXX!! note that although shmem is a MPP port I had to take out the */
/* XXX ISA_MPP section as MPP_TTPCB_FIND won't work for shmem machines yet! */

/*  ttpcb_find()
*
*   Find a task-task entry by tid in ttlist.
*/

struct ttpcb *
ttpcb_find(tid)
	int tid;        /* peer tid */
{
	struct ttpcb *pcbp;

/* GEF fire fighter (TM) */
/*     return (struct ttpcb*)0; */

#if defined(IMA_MPP)
	if ( pcbp = mpp_ttpcb_find(tid))
		return pcbp;
#endif
	for (pcbp = ttlist->tt_link; pcbp != ttlist; pcbp = pcbp->tt_link)
		if (pcbp->tt_tid >= tid)
			break;
	return (pcbp->tt_tid == tid) ? pcbp : (struct ttpcb*)0;
}



/* XXX copied from lpvm.c as needed by lpvmgen.c */

/*  pvmmcast()
*
*   Multicast a message to a list of destinations.
*/

int
pvmmcast(mid, tids, count, tag)
	int mid;
	int *tids;
	int count;
	int tag;
{
	static struct timeval ztv = { 0, 0 };

	int *dst;
	int i, j;
	int cc = 0;
	struct ttpcb *pcbp;
	int sbf;
	struct pmsg *mp;

	/*
	* make sorted list of destinations
	*/
	dst = TALLOC(count, int, "mcal");
	BCOPY(tids, dst, count * sizeof(int));
	qsort((char*)dst, count, sizeof(int), int_compare);

	/* 	pvmsbuf->m_ctx = pvmmyctx; */
	/* we are sending mesg mid which may not the same as pvmsbuf->m_mid */
	mp = midtobuf(mid);
	mp->m_ctx = pvmmyctx;

	/*
	* remove duplicates
	*/
	j = 0;
	for (i = 1; i < count; i++)
		if (dst[i] != dst[j])
			dst[++j] = dst[i];
	count = j + 1;

	/*
	* remove self from list
	* send over any direct routes we have
	*
	* XXX we should attempt new routes here if RouteDirect is on.
	*/
	j = 0;
	for (i = 0; i < count; i++) {
		if (dst[i] == pvmmytid)
			continue;
		/* if ((pcbp = ttpcb_find(dst[i])) && pcbp->tt_state == TTOPEN) */
			/* mroute(pvmsbuf->m_mid, dst[i], tag, &ztv); */
		/* else */
			dst[j++] = dst[i];
	}
	count = j;      /* destinations still to go */

	/*
	* send rest of addresses to pvmd for distribution
	*/
	if (count > 0) {
		/* announce multicast address list to pvmd */
		sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
		pvm_pkint(&count, 1, 1);
		pvm_pkint(dst, count, 1);
		sbf = pvm_setsbuf(sbf);
		if ((cc = mroute(sbf, TIDPVMD, TM_MCA, &ztv)) > 0)
			cc = 0;
		pvm_freebuf(sbf);

		/* send message */
		if (cc >= 0)
/*             if ((cc = mroute(pvmsbuf->m_mid, pvmmytid | TIDGID, tag, &ztv)) > 0) */
			if ((cc = mroute(mid, pvmmytid | TIDGID, tag, &ztv)) > 0)
				cc = 0;
	}

	PVM_FREE(dst);
	return cc;
}


/* XXX copied from lpvm.c as needed by lpvmgen.c */
int
post_routedelete(tid, ctx, tag)
	int tid;
	int ctx;
	int tag;
{
	int sbf;
	struct waitc *wp;
	struct pmsg *up;
	int i;

	sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
	pvm_pkint(&tid, 1, 1);
	i = -1;
	pvm_pkint(&i, 1, 1);
	sbf = pvm_setsbuf(sbf);
	up = midtobuf(sbf);
	up->m_ctx = ctx;
	up->m_tag = tag;

	if (ttpcb_find(tid)) {
		wp = wait_new(WT_ROUTED);
		wp->wa_tid = pvmmytid;
		wp->wa_on = tid;
		wp->wa_mesg = up;

	} else {
		mesg_input(up);
	}
	return 0;
}



/*	shmemuptod()
 *
 *	Move message frags between task and pvmd using SOCKETs only.
 *	Returns when
 *		Outgoing message fully sent or
 *		(Timed out (tmout) OR errored.
 *
 *	Returns 0 if OK, or negative on error.
 */

#define HEADER_SIZE (TDFRAGHDR + MSGHDRLEN)

static int
shmemuptod(txup)
	struct pmsg *txup;			/* outgoing message */
{
	struct frag *txfp = 0;		/* cur ogm frag or null */
	struct timeval tin;
	struct timeval tnow;
	struct timeval *tvp;
#ifdef FDSETNOTSTRUCT
	fd_set rfds, wfds;
#else
	struct fd_set rfds, wfds;
#endif
	int ff;
	int n;

	char *txcp = 0;				/* point to remainder of txfp */
	int txtogo = 0;				/* len of remainder of txfp */
	struct sockaddr_in sad;
	int s;
	struct msgid *sendmsg = (struct msgid *) NULL;
	char errtxt[64];
	int upfd;					/* one-way socket to the daemon */

	if (txup) {
	/* transmitting a umsg */
		txfp = txup->m_frag->fr_link;
		if (!txfp->fr_buf) {
			if (!(txfp = fr_new(MAXHDR)))
				return PvmNoMem;
			txfp->fr_dat += MAXHDR;
			LISTPUTBEFORE(txup->m_frag, txfp, fr_link, fr_rlink);
		}

		/* To the daemon ? */
		upfd = pvmdsock;

		/* txup->m_ref++; */
	}

	if (pvmdebmask & PDMMESSAGE) {
		pvmlogprintf("shmemuptod() mid %d", (txup ? txup->m_mid : 0));
		if (txup) {
			pvmlogprintf(" ctx %d tag %s",
					txup->m_ctx,
					pvmnametag(txup->m_tag, (int *)0));
		}
	}

	/* must block sending to pvmd */
	tvp = (struct timeval *) NULL;

	/* We haven't sent anything, yet, Build Header */

	txcp = txfp->fr_dat; /* packed data */
	/* XXXX we assume thats its only packed data TMs */

	txtogo = txfp->fr_len;	/* actual data size */

	/*
	 * This is first and last frag. So prepend message header
	 */
	txcp -= MSGHDRLEN;
	txtogo += MSGHDRLEN;
	pvmput32(txcp,
			(txup->m_enc == 0x20000000 ? pvmmydsig : txup->m_enc));
	pvmput32(txcp + 4, txup->m_tag);
	pvmput32(txcp + 8, txup->m_ctx);
	pvmput32(txcp + 16, txup->m_wid);
#ifdef	MCHECKSUM
	pvmput32(txcp + 20, umbuf_crc(txup));
#else
	pvmput32(txcp + 20, 0);
#endif
	ff = FFSOM | FFEOM;	/* Alpha Omega: first and last frag man */

	/*
	 * prepend frag header
	 */
	txcp -= TDFRAGHDR;
	pvmput32(txcp, txup->m_dst);
	pvmput32(txcp + 4, pvmmytid);
	pvmput32(txcp + 8, txtogo);
	pvmput32(txcp + 12, 0);		/* to keep putrify happy */
	pvmput8(txcp + 12, ff);
	txtogo += TDFRAGHDR;

	/*
	 * Make sure we have a valid socket address 
	 * Prod will connect us to the daemon if we are not already
	 * connected.
	 */

	if ( pvmdsock < 0 ) {

		prodpvmd();			/* connect to the daemon setting pvmdsock */

		if ( pvmdsock < 0 ) {	/* nested yuck.. */
			pvmlogerror("shmemuptod() Can't get daemon socket open\n");
		    pmsg_unref(txup);   /* unref the message */
			return -1;
		}
	}

	if (pvmdebmask & PDMPACKET) {
		pvmlogprintf("shmemuptod() dst t%x n=%d\n",
				txup->m_dst, txtogo);
	}

	/* XXX yep... blocking loop forever!!!!! */
	/* If I can't get 100+ bytes up that socket we are */
	/* in trouble anyway! */
	while (txtogo) {

#if defined(IMA_RS6K) || defined(IMA_SP2MPI) || defined(IMA_AIX4) \
		|| defined (IMA_AIX4MP) || defined(IMA_AIX5MP) \
		|| defined(IMA_AIX4SP2) || defined(IMA_AIX5SP2)
		n = write(pvmdsock, txcp, min(txtogo, 4096));
#else
		n = write(pvmdsock, txcp, txtogo);
#endif

		if (pvmdebmask & PDMPACKET)
			pvmlogprintf("shmemuptod() wrote %d\n", n);

		if (n == -1 && errno != EWOULDBLOCK && errno != EINTR)
		{
			pvmlogperror("shmemuptod() write pvmd sock");
			return PvmSysErr;
		}

		if (n > 0)
		{
			txcp += n;
			txtogo -= n;
		}

	} /* while we have data to send */


	/* we made it here so the message went! */

	pmsg_unref(txup);	/* unref the message */

	return 0;
}


int
send_my_pid()
{
	struct pmsg *up;
	int sbf;
	int cc;

	sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
	pvm_pkint(&pvmmyupid, 1, 1);
	sbf = pvm_setsbuf(sbf);

	up = midtobuf(sbf);
	up->m_dst = TIDPVMD;
	up->m_tag = TM_SHMCONN;
	up->m_ctx = SYSCTX_TM;

	/* I would love a mxfer... but we have to use a custom */
	/* one-off routine! */
	/* cc = mxfer(up2, &ztv); */
	cc = shmemuptod (up);

	pvm_freebuf(sbf);
	return cc;
}


int
sleep_dammit( msecs )
int msecs;
{
#if defined(IMA_SUNMP) || defined(IMA_RS6KMP) || defined(IMA_AIX4MP) \
		defined(IMA_AIX5MP)
	return( usleep( msecs ) );
#else
	struct timeval biteme;

	biteme.tv_sec = 0;
	biteme.tv_usec = msecs;

	while ( biteme.tv_usec > 1000000 )
	{
		(biteme.tv_sec)++;
		biteme.tv_usec -= 1000000;
	}

	return( select( 0,
#ifdef	FDSETISINT
				(int *) NULL, (int *) NULL, (int *) NULL,
#else
				(fd_set *) NULL, (fd_set *) NULL, (fd_set *) NULL,
#endif
				&biteme ) );
#endif
}



syntax highlighted by Code2HTML, v. 0.9.1