static char rcsid[] =
	"$Id: tdpro.c,v 1.34 2002/02/21 23:18:42 pvmsrc Exp $";

/*
 *         PVM version 3.4:  Parallel Virtual Machine System
 *               University of Tennessee, Knoxville TN.
 *           Oak Ridge National Laboratory, Oak Ridge TN.
 *                   Emory University, Atlanta GA.
 *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
 *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
 *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
 *                   (C) 1997 All Rights Reserved
 *
 *                              NOTICE
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby granted
 * provided that the above copyright notice appear in all copies and
 * that both the copyright notice and this permission notice appear in
 * supporting documentation.
 *
 * Neither the Institutions (Emory University, Oak Ridge National
 * Laboratory, and University of Tennessee) nor the Authors make any
 * representations about the suitability of this software for any
 * purpose.  This software is provided ``as is'' without express or
 * implied warranty.
 *
 * PVM version 3 was funded in part by the U.S. Department of Energy,
 * the National Science Foundation and the State of Tennessee.
 */

/*
 *	tdpro.c
 *
 *	Entry points for messages from local tasks.
 *
 * $Log: tdpro.c,v $
 * Revision 1.34  2002/02/21 23:18:42  pvmsrc
 * Added new (not to be documented!) PVM_MAX_TASKS env var support.
 * 	- for Mahir Lokvancic <mahir@math.ufl.edu>.
 * 	- forcefully limits the number of tasks that can attach to a
 * 		pvmd, required on Solaris in rare circumstances when hard
 * 		FD_SETSIZE limit is reached, and all hell breaks loose...
 * 	- check return for task_new() call, can now produce NULL ptr,
 * 		indicating PvmOutOfRes...
 * (Spanker=kohl)
 *
 * Revision 1.33  2001/09/25 21:20:20  pvmsrc
 * Minor TMPNAMFUN()/tmpnam() cleanup.
 * 	- moved macro def to pvm3.h, renamed PVMTNPMAN().
 * 	- same for LEN_OF_TMP_NAM -> PVMTMPNAMLEN.
 * 	- mostly a huge waste of time, since *both* tmpnam() & mktemp()
 * 		produce the same "dangerous" warning message in Linux/gcc...
 * 	- damn.
 * (Spanker=kohl)
 *
 * Revision 1.32  2001/04/23 14:16:14  pvmsrc
 * Added new working directory option to pvm_spawn().
 * 	- use "where" argument to cram in working directory,
 * 		e.g. where = "msr.epm.ornl.gov:/home/user/project/bozo"
 * 	- TM_SPAWN strips out working directory & creates PVMSPAWNWD env var
 * 	- exectasks() (called by DM_EXEC or SM_EXEC) checks for env var
 * 		and does a chdir() (even uses getcwd() to reset directory :-).
 * 	- should not introduce any run-time incompatibility with older
 * 		PVM releases.
 * 	- PvmTaskHost or PvmTaskArch flags need not be used, i.e.
 * 		pvm_spawn( "foo", 0, PvmTaskDefault, ":/tmp", 1, &tid ) works.
 * (Spanker=kohl)
 *
 * Revision 1.31  2001/02/07 23:15:58  pvmsrc
 * 2nd Half of CYGWIN Check-ins...
 * (Spanker=kohl)
 *
 * Revision 1.30  2000/02/17 23:12:20  pvmsrc
 * *** Changes for new BEOLIN port ***
 * 	- MPP-like, similar to SP2, etc.
 * 	- submitted by Paul Springer <pls@smokeymt.jpl.nasa.gov>.
 * 	- format-checked & cleaned up by Jeembo...  :-)
 * (Spanker=kohl)
 *
 * Revision 1.29  2000/02/16 22:00:30  pvmsrc
 * Fixed up #include <sys/types.h> stuff...
 * 	- use <bsd/sys/types.h> for IMA_TITN...
 * 	- #include before any NEEDMENDIAN #includes...
 * (Spanker=kohl)
 *
 * Revision 1.28  1999/07/08 19:00:16  kohl
 * Fixed "Log" keyword placement.
 * 	- indent with " * " for new CVS.
 *
 * Revision 1.27  1998/11/20  20:06:49  pvmsrc
 * Changes so that win32 will compile & build. Also, common
 * Changes so that compiles & builds on NT. Also
 * common source on win32 & unix.
 * (spanker=sscott)
 *
 * Revision 1.26  1998/01/12  21:13:31  pvmsrc
 * Replaced inline constants with new task output op defines.
 * 	- TO_NEW == -2.
 * 	- TO_SPAWN == -1.
 * 	- TO_EOF == 0.
 * (Spanker=kohl)
 *
 * Revision 1.25  1997/09/22  21:14:02  pvmsrc
 * Modified TM_CONN2/tm_conn2() linkage for shell-spanwed tasks.
 * 	- check for additional task name packed in message, save it in
 * 		task structure, use it for tev_newtask().
 * (Spanker=kohl)
 *
 * Revision 1.24  1997/09/18  14:14:44  pvmsrc
 * Fixed bug in tm_mca():
 * 	- loop to filter out bogus dests could hang, if a tid was not
 * 		local and on a failed host, index i would not get incremented,
 * 		resulting in an infinite loop...
 * 	- added second index to count dsts.
 * 	- submitted by Kamran Kazemi (kamran@cs.uwa.edu.au).
 * (Spanker=kohl)
 *
 * Revision 1.23  1997/08/29  13:35:20  pvmsrc
 * OS2 Port Submitted by Bohumir Horeni, horeni@login.cz.
 * (Spanker=kohl)
 *
 * Revision 1.22  1997/08/26  22:55:28  pvmsrc
 * Make sure wp->wa_mesg is cleared after sendmessage() call.
 * 	- otherwise wait_delete() will re-free the pmsg struct causing
 * 		major bogusness...
 * (Spanker=kohl)
 *
 * Revision 1.21  1997/08/11  14:40:13  pvmsrc
 * Attempt to remove CRAY compiler warning.
 * 	- if ( 1 ) got bail caused unreachables...
 * 	- replaced with "dummy_true" var set to 1, to trick compiler?!  :-)
 *
 * Revision 1.20  1997/07/02  20:27:35  pvmsrc
 * 	Fixed startup race on shmem to that a shmem task can get fully
 * 	configured before getting any messages.
 * 	This involved adding two states
 * 	TF_PRESHMCONN and TF_SHM.  TF_PRESHMCONN indicates that messages
 * 	with MM_PRIO set can be sent to a task, but regular messages are
 * 	queued. This allows shmem tasks to be completely configured
 * 	before any messages flow.  When the daemon changes the state from
 * 	TF_PRESHMCONN to TF_SHMCONN it calls shm_wrt_pkts to write any
 * 	packets that were queued before task state changed to TF_SHMCONN.
 *
 * Revision 1.19  1997/06/27  17:32:56  pvmsrc
 * Updated for WIN32 header files & Authors.
 *
 * Revision 1.18  1997/06/27  16:10:01  pvmsrc
 * 		Spawning tasks with option PvmNoSpawnParent get the
 * 		parent tid set to PvmParentNotSet in spawned tasks.
 *
 * Revision 1.17  1997/06/24  20:39:22  pvmsrc
 * Eliminated unnecessary global externs.
 *
 * Revision 1.16  1997/06/23  21:26:46  pvmsrc
 * Added checking for master host in tm_hoster().
 * 	- actually check hd_hostpart instead of ht_local/ht_master indices,
 * 		in case freaking host table has copies/duplicates.
 * 		(always wear 2 condoms...  :-)
 * 	- return new PvmHostrNMstr error code.
 *
 * Revision 1.15  1997/06/16  13:41:44  pvmsrc
 * assign_tasks() now passes distribution info in DM_EXEC message.
 *
 * Revision 1.14  1997/06/02  13:20:48  pvmsrc
 * tm_shmconn() marks old task w/ TF_CLOSE
 * 	- so work() doesn't complain about a dead socket.
 *
 * Revision 1.13  1997/05/28  14:04:05  pvmsrc
 * Cleaned up TM_SHMCONN() handler routine.
 * 	- formatted like hell, and didn't compile clean on SGI64.
 *
 * Revision 1.12  1997/05/27  14:45:53  pvmsrc
 * Added new TM message TM_SHMCONN with routine tm_shmconn():
 * 	which merges records and sets TF_SHMCONN flag.
 * Updated entry point in loclentry() as well:
 * 	to accept this type of connection.
 *
 * Revision 1.11  1997/05/12  20:28:22  pvmsrc
 * Removed duplicate #includes...
 *
 * Revision 1.10  1997/05/07  21:22:12  pvmsrc
 * AAAIIIEEEEEEEEE!!!!
 * 	- removed all static-limited string unpacking:
 * 		* replaced with use of:
 * 			upkstralloc() / PVM_FREE() (for pvmd stuff).
 * 			new pvmupkstralloc() / PVM_FREE() (for lpvm stuff).
 * 		* manual allocation of local buffers for sprintf() & packing.
 * 		* alternate static string arrays to replace fixed-length cases.
 * 		* I hope this shit works...  :-Q
 *
 * Revision 1.9  1997/04/30  21:26:31  pvmsrc
 * SGI Compiler Warning Cleanup.
 *
 * Revision 1.8  1997/03/27  19:57:10  pvmsrc
 * Spanked in PvmNoSpawnParent flag usage.
 * 	- in tm_spawn(), don't set wxp->w_ptid if PvmNoSpawnParent set.
 * 	- in assign_tasks(), use wxp->w_ptid as parent tid instead of
 * 		snarfing it from wp->wa_tid...  (Bob Hack) D-Oh...
 *
 * Revision 1.7  1997/02/13  15:10:05  pvmsrc
 * Removed unnecessary extern for struct waitc *waitlist.
 * 	- now in global.h.
 *
 * Revision 1.6  1997/01/28  19:27:49  pvmsrc
 * New Copyright Notice & Authors.
 *
 * Revision 1.5  1996/12/30  13:37:49  pvmsrc
 * Fixed closing comment in last log entry...
 *
 * Revision 1.4  1996/12/20  15:02:07  pvmsrc
 * Fixed missing comment end *-/ ...  D-Oh!
 *
 * Revision 1.3  1996/10/25  13:58:08  pvmsrc
 * Replaced old #includes for protocol headers:
 * 	- <pvmsdpro.h>, "ddpro.h", "tdpro.h"
 * With #include of new combined header:
 * 	- <pvmproto.h>
 *
 * Revision 1.2  1996/10/24  22:11:04  pvmsrc
 * Moved #include "global.h" below other #include's for typing.
 * Added #include <pvmtev.h> for tracing constants.
 * Added extern struct Pvmtracer pvmtracer for tracer info.
 * Added new tm_tracer() routine to handle tracer registration requests
 * 	from pvm_reg_tracer().
 * In tm_conn2(), insert tracer info for tasks spawned from the shell.
 * 	- can't get env vars set here, so rely on mailbox for that info.
 * 	- use new tev_send_newtask() routine.
 * In assign_tasks():
 * 	- replace inline trace event code with new tev_send_spntask().
 * 	- check trctid / outtid with > 0, not non-zero, for new case where
 * 		task denies external collection.
 * Modified change_trace() to use new event routines:
 * 	- tev_send_newtask(), tev_send_spntask() & tev_send_endtask().
 *
 * Revision 1.1  1996/09/23  23:44:45  pvmsrc
 * Initial revision
 *
 * Revision 1.9  1995/07/19  21:27:26  manchek
 * use new function pvmnametag instead of [dts]mname
 *
 * Revision 1.8  1995/07/03  19:11:40  manchek
 * added tickle #100
 *
 * Revision 1.7  1995/05/17  16:55:20  manchek
 * changed global mytid to pvmmytid.
 * added TM_GETOPT message class
 *
 * Revision 1.6  1994/10/15  19:37:46  manchek
 * cast message tags for comparison as integers.
 * check newhosts when deleting host.
 * unset TF_FORKD if task reconnects with a different pid
 *
 * Revision 1.5  1994/06/21  18:30:00  manchek
 * subscript arith in tmname() broke with opt
 *
 * Revision 1.4  1994/06/04  21:45:54  manchek
 * misc
 *
 * Revision 1.3  1994/06/03  20:38:31  manchek
 * version 3.3.0
 *
 * Revision 1.2  1993/11/30  15:56:23  manchek
 * tm_conn() complains if it can't write t-auth file (fs full?)
 *
 * Revision 1.1  1993/08/30  23:26:52  manchek
 * Initial revision
 *
 */

#include <stdio.h>

#ifdef IMA_TITN
#include <bsd/sys/types.h>
#else
#include <sys/types.h>
#endif

#ifdef NEEDMENDIAN
#include <machine/endian.h>
#endif
#ifdef NEEDENDIAN
#include <endian.h>
#endif
#ifdef NEEDSENDIAN
#include <sys/endian.h>
#endif

#include <pvm3.h>

#if defined(WIN32) || defined(CYGWIN)
#include "..\xdr\types.h"
#include "..\xdr\xdr.h"
#else
#include <rpc/types.h>
#include <rpc/xdr.h>
#endif

#ifdef WIN32
#include <time.h>
#else
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#endif

#include <fcntl.h>
#ifdef	SYSVSTR
#include <string.h>
#else
#include <strings.h>
#endif
#include <errno.h>

#include <pvmproto.h>
#include "pvmalloc.h"
#include "pmsg.h"
#include "task.h"
#include "host.h"
#include "waitc.h"
#include "listmac.h"
#include "bfunc.h"
#include <pvmtev.h>
#include "global.h"
#ifdef IMA_BEOLIN
#include "pvmdmp.h"
#endif

#ifndef	min
#define	min(a,b)	((a)<(b)?(a):(b))
#endif


/***************
 **  Globals  **
 **           **
 ***************/

extern void task_dump();
char *debug_flags();
char *inadport_hex();
void i_dump();
void tev_send_newtask();
void tev_send_spntask();
void tev_send_endtask();

extern int pvmdebmask;				/* from pvmd.c */
extern int hostertid;				/* from pvmd.c */
extern struct htab *hosts;			/* from pvmd.c */
extern int pvmmydsig;				/* from pvmd.c */
extern int myhostpart;				/* from pvmd.c */
extern int pvmmytid;				/* from pvmd.c */
extern struct htab *newhosts;		/* from pvmd.c */
extern int nopax;					/* from pvmd.c */
extern int pvmudpmtu;				/* from pvmd.c */
extern int pvmschedtid;				/* from pvmd.c */
extern int taskertid;				/* from pvmd.c */
extern struct Pvmtracer pvmtracer;	/* from pvmd.c */
extern int tidhmask;				/* from pvmd.c */


/***************
 **  Private  **
 **           **
 ***************/

int tm_addhost();
int tm_config();
int tm_conn2();
int tm_connect();
int tm_context();
int tm_db();
int tm_delhost();
int tm_exit();
int tm_getopt();
int tm_halt();
int tm_hoster();
int tm_hostsync();
int tm_mca();
int tm_mstat();
int tm_notify();
int tm_pstat();
int tm_sendsig();
int tm_sched();
int tm_setopt();
int tm_spawn();
int tm_task();
int tm_tasker();
int tm_tickle();
int tm_tracer();
int tm_shmconn();

int (*loclswitch[])() = {
	tm_connect,
	tm_conn2,
	tm_exit,
	tm_addhost,
	tm_delhost,
	tm_config,
	tm_mstat,
	tm_halt,
	tm_tickle,
	tm_spawn,
	tm_pstat,
	tm_sendsig,
	tm_task,
	tm_mca,
	tm_notify,
	tm_db,
	tm_sched,
	tm_tasker,
	tm_hoster,
	tm_tracer,
	tm_hostsync,
	tm_setopt,
	tm_getopt,
	tm_context,
	tm_shmconn,
};


int
loclentry(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int c = mp->m_tag;

	if (pvmdebmask & PDMMESSAGE) {
		pvmlogprintf("loclentry() from t%x tag %s\n", mp->m_src,
				pvmnametag(c, (int *)0));
/*
		pvmhdump(mp->m_cfrag->fr_dat, mp->m_cfrag->fr_len, "frag: ");
*/
	}

/*
	if (mp->m_enc != 1) {
		pvmlogprintf("loclentry() message from t%x with bad enc %d\n",
				tp->t_tid, mp->m_enc);
		goto bail;
	}
*/

	if (c < (int)TM_FIRST || c > (int)TM_LAST) {
		pvmlogprintf("loclentry() message from t%x with bogus tag %d\n",
				tp->t_tid, c);
		goto bail;
	}

	if ((!(tp->t_flag & TF_CONN) && c != TM_CONNECT && c != TM_CONN2 
								 && c != TM_SHMCONN)
	|| ((tp->t_flag & TF_AUTH) && c != TM_CONN2)) {
		pvmlogerror("loclentry() non-connect message from anon task\n");
		tp->t_flag |= TF_CLOSE;
		goto bail;
	}

	c -= TM_FIRST;
	(loclswitch[c])(tp, mp);

bail:
	pmsg_unref(mp);
	return 0;
}


/*	replymessage()
*
*	Construct reply message to sender with matching context, tag, wait id
*/

struct pmsg *
replymessage(mp)
	struct pmsg *mp;
{
	struct pmsg *rmp;

	if (rmp = mesg_new(0)) {
		rmp->m_dst = mp->m_src;
		rmp->m_ctx = mp->m_ctx;
		rmp->m_tag = mp->m_tag;
		rmp->m_wid = mp->m_wid;
	}
	return rmp;
}


/*********************************
 **  Task request entry points  **
 **                             **
 *********************************/


/*	tm_connect()
*
*	Task connecting to pvmd phase 1.
*	We assign it a context, write in t-auth file to prove our ident,
*	then make d-auth file for task to prove itself.
*
*	TM_CONNECT
*	call {
*		int tdprotocol		// t-d protocol number compiled into libpvm
*		string authfile		// t-auth file pvmd must write
*	}
*	ret {
*		int tdprotocol		// pvmd's td protocol version
*		int acknack			// 1 if pvmd accepts connection
*		string authfile		// d-auth file task must write
*	}
*/

int
tm_connect(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int ver;						/* task's libpvm t-d proto version */
	char authfn[PVMTMPNAMLEN];	/* t-auth file name */
	int d;
	int cc;

	if (upkint(mp, &ver) || upkstr(mp, authfn, sizeof(authfn))) {
		pvmlogerror("tm_connect() bad msg format\n");
		goto bail;
	}

	/*
	*	if protocols are not compatible, send nack
	*	context will get flushed after reply is sent
	*/

	if (ver != TDPROTOCOL) {
		pvmlogprintf("tm_connect() t-d protocol mismatch (%d/%d)\n",
			ver, TDPROTOCOL);

		mp = replymessage(mp);
		pkint(mp, TDPROTOCOL);
		pkint(mp, 0);
		pkstr(mp, "");
		mp->m_flag |= MM_PRIO;
		mesg_to_task(tp, mp);
		pmsg_unref(mp);
		goto bail;
	}

#ifdef NOPROT
	tp->t_authnam = TALLOC(PVMTMPNAMLEN, char, "auth");
	(void)PVMTMPNAMFUN(tp->t_authnam);
#else

	/*
	*	write in t-auth file, create empty d-auth file that task
	*	must write and we'll check later
	*/

	if ((d = open(authfn, O_WRONLY, 0)) == -1) {
		pvmlogperror("tm_connect() can't open t-auth file");
		goto bail;
	}
	cc = write(d, authfn, 1);
	if (cc != 1) {
		if (cc == -1)
			pvmlogperror(authfn);
		pvmlogerror("tm_connect() can't write t-auth file\n");
	}
	(void)close(d);

	tp->t_authnam = TALLOC(PVMTMPNAMLEN, char, "auth");
	(void)PVMTMPNAMFUN(tp->t_authnam);

#ifndef IMA_OS2
	if ((tp->t_authfd = open(tp->t_authnam, O_RDONLY|O_CREAT|O_TRUNC, 0600))
#else
	if ((tp->t_authfd = open(tp->t_authnam, O_RDWR|O_CREAT|O_TRUNC, 0600))
#endif
	== -1) {
		pvmlogperror("tm_connect() can't create d-auth file");
		PVM_FREE(tp->t_authnam);
		tp->t_authnam = 0;
		goto bail;
	}

#endif

	/*
	*	task's turn to auth
	*/

	tp->t_flag |= TF_AUTH;

	mp = replymessage(mp);
	pkint(mp, TDPROTOCOL);
	pkint(mp, 1);
	pkstr(mp, tp->t_authnam);
	mp->m_flag |= MM_PRIO;
	mesg_to_task(tp, mp);
	pmsg_unref(mp);
	return 0;

bail:
	tp->t_flag |= TF_CLOSE;
	return 0;
}


/*	tm_conn2()
*
*	Task connecting to pvmd phase 2.
*	We check d-auth file and give it config info, attach to real context.
*
*	TM_CONN2
*	call {
*		int unixpid			// real pid
*		int cookie			// magic cookie inherited from pvmd (or 0)
*		(char *taskname)	// (optional) task name specification
*	}
*	ret {
*		int acknack			// 1 if pvmd accepts connection
*		int tid				// task tid
*		int ptid			// parent tid
*		int outtid			// output dst
*		int outctx
*		int outtag
*		int trctid			// trace dst
*		int trcctx
*		int trctag
*		int udpmax
*		int nativecode
*		string inaddr
*		int schedtid		// scheduler tid
*	}
*/

int
tm_conn2(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int tid;
	int pid;						/* real pid of task */
	int cookie;						/* cookie to identify task or 0 */
	struct task *tp2;				/* to search for existing context */
	struct pmsg *mp2;
	int cc;
	char c;
	char *taskname = (char *) NULL;

	if (upkint(mp, &pid) || upkint(mp, &cookie)) {
		pvmlogerror("tm_conn2() bad msg format\n");
		goto bail;
	}
	if (!cookie)
		cookie = pid;

	if (!(tp->t_flag & TF_AUTH)) {
		pvmlogprintf("tm_conn2() message from t%x, TF_AUTH not set\n",
				tp->t_tid);
		return 0;
	}

	/*
	*	check that task could write d-auth file
	*/

#ifndef NOPROT

	if ((cc = read(tp->t_authfd, &c, 1)) == -1) {
		pvmlogperror("tm_conn2() can't read d-auth file");
		return 0;
	}

	if (cc != 1) {
		pvmlogerror("tm_conn2() task didn't validate itself\n");
		goto bail;
	}

	(void)close(tp->t_authfd);
	tp->t_authfd = -1;
	(void)unlink(tp->t_authnam);

#endif

	PVM_FREE(tp->t_authnam);
	tp->t_authnam = 0;

	/*
	*	if task spawned by us, already has a context,
	*	else make it one
	*/

	if ((tp2 = task_findpid(cookie)) && !(tp2->t_flag & (TF_AUTH|TF_CONN))) {
		if (pvmdebmask & PDMTASK) {
			pvmlogprintf("tm_conn2() reconnect task t%x\n", tp2->t_tid);
		}
		tp->t_sched = tp2->t_sched;

#ifdef IMA_BEOLIN
	} else if ( (tp2 = mpp_find(tp))
			&& !(tp2->t_flag & (TF_AUTH|TF_CONN)) ) {
		if (pvmdebmask & PDMTASK) {
			pvmlogprintf(
					"tm_conn2() reconnect task t%x pid=%d name=%s\n",
					tp2->t_tid,tp2->t_pid, tp2->t_a_out );
		}
#endif

	} else {
		if ((tid = tid_new()) < 0) {
			pvmlogerror("tm_conn2() out of tids?\n");
			goto bail;		/* XXX should disconnect nicely */
		}
		if ((tp2 = task_new(tid)) == NULL) {
			pvmlogerror("tm_conn2() too many tasks?\n");
			goto bail;		/* XXX should disconnect nicely */
		}

		if ( !upkstralloc(mp, &taskname) ) {
			tp2->t_a_out = taskname;
		}

		if (pvmschedtid) {
			/* inform the scheduler about the new task */

			tp->t_sched = pvmschedtid;
			mp2 = mesg_new(0);
			pkint(mp2, 1);
			pkint(mp2, tid);
			mp2->m_tag = SM_EXECACK;		/* XXX yecch, overload */
			mp2->m_dst = pvmschedtid;
			sendmessage(mp2);
		}

		/* check for tracer */
		if (pvmtracer.trctid && pvmtracer.trctag) {
			tp2->t_trctid = pvmtracer.trctid;
			tp2->t_trcctx = pvmtracer.trcctx;
			tp2->t_trctag = pvmtracer.trctag;
			/* Note:  can't get the trace mask or trace buffer size
				to task here, but this will be handled using mailbox */

			/* trace start message */
			/* XXX Note:  we don't really know if we should send this
				because this task could have prohibited tracing
				by setting PvmSelfTraceTid to -1... */
			tev_send_newtask(
				pvmtracer.trctid, pvmtracer.trcctx, pvmtracer.trctag,
				tid, -1, -1, tp->t_a_out ? tp->t_a_out : "-" );
		}
		if (pvmtracer.outtid && pvmtracer.outtag) {
			tp2->t_outtid = pvmtracer.outtid;
			tp2->t_outctx = pvmtracer.outctx;
			tp2->t_outtag = pvmtracer.outtag;

			/* output start message */
			/* Note:  same deal here as for TEV_NEWTASK above... */
			mp2 = mesg_new(0);
			mp2->m_dst = pvmtracer.outtid;
			mp2->m_ctx = pvmtracer.outctx;
			mp2->m_tag = pvmtracer.outtag;
			pkint(mp2, tid);
			pkint(mp2, TO_NEW);
			pkint(mp2, -1);
			sendmessage(mp2);
		}

		if (pvmdebmask & PDMTASK) {
			pvmlogprintf("tm_conn2() new task t%x\n", tp2->t_tid);
		}
	}

	/*
	*	brundle-fly the contexts together
	*/

	tp2->t_sock = tp->t_sock;
	tp2->t_sad = tp->t_sad;
	tp2->t_salen = tp->t_salen;

#ifdef NOPROT
	if (!TIDISNODE(tp2->t_tid)) {
		if (pvmdebmask & PDMTASK) {
			pvmlogprintf( "Setting pid of t%x to %d, same as t%x\n",
					tp2->t_tid, tp->t_pid, tp->t_tid );
		}
		task_setpid(tp2, tp->t_pid);
	}
#else
	if (tp2->t_pid != pid)
		task_setpid(tp2, pid);
#endif

	tp2->t_rxp = tp->t_rxp;
	tp2->t_sched = tp->t_sched;
	tp->t_sock = -1;	/* tp will be freed by loclinput() */
	tp->t_rxp = 0;
	tp = tp2;
	if (cookie != pid)
		tp->t_flag &= ~TF_FORKD;

	/*
	*	kick it in the butt; it's ready to go
	*/

	tp->t_flag &= ~TF_AUTH;
	tp->t_flag |= TF_CONN;

	mp = replymessage(mp);
	pkint(mp, 1);
	pkint(mp, tp->t_tid);
	pkint(mp, tp->t_ptid);
	pkint(mp, tp->t_outtid);
	pkint(mp, tp->t_outctx);
	pkint(mp, tp->t_outtag);
	pkint(mp, tp->t_trctid);
	pkint(mp, tp->t_trcctx);
	pkint(mp, tp->t_trctag);
	pkint(mp, pvmudpmtu);
	pkint(mp, pvmmydsig);
	pkstr(mp, inadport_hex(&(hosts->ht_hosts[hosts->ht_local]->hd_sad)));
	pkint(mp, pvmschedtid);
	mp->m_dst = tp->t_tid;
	mp->m_flag |= MM_PRIO;
	sendmessage(mp);
	return 0;

bail:
	tp->t_flag |= TF_CLOSE;
	return 0;
}


/*	tm_exit()
*
*	Last message from a task.  This is the nice way of disconnecting.
*
*	TM_EXIT
*	call { }
*	ret { }
*/

int
tm_exit(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	mp = replymessage(mp);
	tp->t_flag |= TF_CLOSE;
	sendmessage(mp);
	if (!(tp->t_flag & TF_FORKD))
		task_cleanup(tp);
	return 0;
}


/*	tm_pstat()
*
*	Task wants status of another task.
*
*	TM_PSTAT
*	call {
*		int tid
*	}
*	ret {
*		int status
*	}
*/

int
tm_pstat(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int tid;
	struct hostd *hp;
	struct waitc *wp;

	/* unpack and sanity check tid */

	if (upkuint(mp, &tid)) {
		pvmlogerror("tm_pstat() bad msg format\n");
		return 0;
	}
	if (!TIDISTASK(tid)) {
		pvmlogprintf("tm_pstat() bad tid %x\n", tid);
		return 0;
	}

	/* nack if no such host */

	if (!(hp = tidtohost(hosts, tid))) {
		mp = replymessage(mp);
		pkint(mp, PvmNoTask);
		sendmessage(mp);
		return 0;
	}

	/* else make a wait context and send query */

	wp = wait_new(WT_PSTAT);
	wp->wa_tid = tp->t_tid;
	wp->wa_on = hp->hd_hostpart;
	wp->wa_mesg = replymessage(mp);

	mp = mesg_new(0);
	mp->m_dst = hp->hd_hostpart | TIDPVMD;
	mp->m_tag = DM_PSTAT;
	mp->m_wid = wp->wa_wid;
	pkint(mp, tid);
	sendmessage(mp);
	return 0;
}


/*	tm_addhost()
*
*	Task requesting to add hosts to virtual machine.  Exit point is
*	here or dm_addack().
*
*	TM_ADDHOST
*	call {
*		int nhosts
*		string names[nhosts]
*	}
*	ret {
*		int nhosts			// or error code
*		int narches			// if nhosts >= 0
*		{
*			int tid			// or error code
*			string name
*			string arch
*			int speed
*			int dsig
*		} [nhosts]
*	}
*/

int
tm_addhost(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int count;
	char *buf;
	struct waitc *wp;

	/* sanity check the message */

	if (upkint(mp, &count))
		goto bad;
	if (count < 1 || count > (tidhmask >> (ffs(tidhmask) - 1)))
		goto bad;
	while (count-- > 0) {
		if (upkstralloc(mp, &buf))
			goto bad;
		else
			PVM_FREE(buf);
	}

	/* make a wait channel for the task */

	wp = wait_new(WT_ADDHOST);
	wp->wa_tid = tp->t_tid;
	wp->wa_on = hosts->ht_hosts[hosts->ht_master]->hd_hostpart;
	wp->wa_mesg = replymessage(mp);

	/* forward message to master pvmd */

	mp->m_ref++;
	mp->m_src = pvmmytid;
	mp->m_dst = hosts->ht_hosts[hosts->ht_master]->hd_hostpart | TIDPVMD;
	mp->m_tag = DM_ADD;
	mp->m_wid = wp->wa_wid;
	sendmessage(mp);
	return 0;

bad:
	pvmlogprintf("tm_addhost() from t%x bad msg format\n", mp->m_src);
	return 0;
}


int
free_wait_spawn(wxp)
	struct waitc_spawn *wxp;
{
	int i;

	if (wxp->w_file)
		PVM_FREE(wxp->w_file);
	if (wxp->w_argv) {
		for (i = 0; i < wxp->w_argc; i++)
			if (wxp->w_argv[i])
				PVM_FREE(wxp->w_argv[i]);
		PVM_FREE(wxp->w_argv);
	}
	if (wxp->w_env) {
		for (i = 0; i < wxp->w_nenv; i++)
			if (wxp->w_env[i])
				PVM_FREE(wxp->w_env[i]);
		PVM_FREE(wxp->w_env);
	}
	if (wxp->w_ht)
		ht_free(wxp->w_ht);
	if (wxp->w_vec)
		PVM_FREE(wxp->w_vec);
	PVM_FREE(wxp);
	return 0;
}


/*	tm_spawn()
*
*	Task requesting to spawn other tasks.  Exit point for this
*	request is here or dm_execack().
*
*	TM_SPAWN
*	call {
*		string file
*		int flags
*		string where
*		int count
*		int nargs
*		string argv[nargs]
*		int outtid
*		int outctx
*		int outtag
*		int trctid
*		int trcctx
*		int trctag
*		int nenv
*		string env[nenv]
*	}
*	ret {
*		int ntids  // or nack
*		int tids[acknack]
*	}
*/

int
tm_spawn(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	char *where = 0;				/* location from req */
	struct waitc *wp;				/* 'seed' waitc */
	struct waitc_spawn *wxp = 0;	/* new task parameters */
	struct htab *htp;				/* set of usable hosts */
	struct hostd *hp;
	int hh;
	int i;
	char *wd = 0;
	char *wdenv = 0;
	char *ptr;
	int sz;

	/*
	* unpack spawn command from task
	*/

	wxp = TALLOC(1, struct waitc_spawn, "waix");
	BZERO((char*)wxp, sizeof(struct waitc_spawn));

	if (upkstralloc(mp, &wxp->w_file)
	|| upkint(mp, &wxp->w_flags)
	|| upkstralloc(mp, &where)
	|| upkint(mp, &wxp->w_veclen)
	|| upkint(mp, &wxp->w_argc))
		goto bad;

	if (wxp->w_veclen < 1)
		goto bad;

	wxp->w_argv = TALLOC(wxp->w_argc + 1, char*, "argv");
	BZERO((char*)wxp->w_argv, (wxp->w_argc + 1) * sizeof(char*));
	for (i = 0; i < wxp->w_argc; i++)
		if (upkstralloc(mp, &wxp->w_argv[i]))
			goto bad;

	if (upkuint(mp, &wxp->w_outtid)
	|| upkuint(mp, &wxp->w_outctx)
	|| upkuint(mp, &wxp->w_outtag)
	|| upkuint(mp, &wxp->w_trctid)
	|| upkuint(mp, &wxp->w_trcctx)
	|| upkuint(mp, &wxp->w_trctag))
		goto bad;

	/*
	* extract any working directory string from "where"
	*/

	ptr = where;
	while ( *ptr != ':' && *ptr != '\0' )
		ptr++;
	if ( *ptr == ':' ) {
		*ptr++ = '\0';    /* close off actual "where" string */
		wd = ptr;         /* save ptr to working directory */
	}

	/*
	* unpack environment from task
	*/

	if (upkuint(mp, &wxp->w_nenv))
		goto bad;
	sz = wxp->w_nenv + 1 + (wd ? 1 : 0);
	wxp->w_env = TALLOC(sz, char*, "env");
	BZERO((char*)wxp->w_env, sz * sizeof(char*));
	for (i = 0; i < wxp->w_nenv; i++)
		if (upkstralloc(mp, &wxp->w_env[i]))
			goto bad;

	/*
	* add extra env string for working directory
	*/

	if ( wd ) {
		sz = strlen(wd) + strlen("PVMSPAWNWD=") + 1;
		wdenv = TALLOC(sz, char, "wdenv");
		sprintf( wdenv, "PVMSPAWNWD=%s", wd );
		wxp->w_env[ (wxp->w_nenv)++ ] = STRALLOC( wdenv );
	}

	/*
	* make host set containing hosts (matching where option)
	*/

	if ((wxp->w_flags & (PvmTaskHost|PvmTaskArch)) && !where)
		goto bad;

	htp = ht_new(1);

	if (wxp->w_flags & PvmTaskHost) {			/* given host */
		if (hp = nametohost(hosts, where))
			ht_insert(htp, hp);

	} else {
		if (wxp->w_flags & PvmTaskArch) {		/* given arch */
			for (hh = hosts->ht_last; hh > 0; hh--)
				if ((hp = hosts->ht_hosts[hh])
				&& !strcmp(where, hp->hd_arch))
					ht_insert(htp, hp);

#ifdef IMA_BEOLIN
		/* local pvmd for now */
		} else if (wxp->w_flags & PvmMppFront) {
			hp = hosts->ht_hosts[hosts->ht_local];
			ht_insert(htp, hp);
#endif

		} else {						/* anywhere */
			ht_merge(htp, hosts);
		}
	}

	if (wxp->w_flags & PvmHostCompl) {
		for (hh = hosts->ht_last; hh > 0; hh--) {
			if (hh <= htp->ht_last && (hp = htp->ht_hosts[hh]))
				ht_delete(htp, hp);
			else
				if (hp = hosts->ht_hosts[hh])
					ht_insert(htp, hp);
		}
	}

	if (pvmdebmask & PDMTASK) {
		pvmlogerror("tm_spawn() host set:\n");
		ht_dump(htp);
	}

	if ( !(wxp->w_flags & PvmNoSpawnParent) ) {
		wxp->w_ptid = tp->t_tid;
	}
	else
		wxp->w_ptid = PvmParentNotSet;	/* indicate task unset parent */

	/*
	* assign each task to a host
	*/

	wxp->w_ht = htp;
	wxp->w_vec = TALLOC(wxp->w_veclen, int, "vec");
	BZERO((char*)wxp->w_vec, wxp->w_veclen * sizeof(int));
	wxp->w_togo = wxp->w_veclen;

	wp = wait_new(WT_SPAWN);
	wp->wa_tid = tp->t_tid;
	wp->wa_spec = (void*)wxp;
	wxp = 0;
	wp->wa_mesg = replymessage(mp);

	assign_tasks(wp);

	/* if already done, reply to task */

	if (wp->wa_peer == wp) {
		assign_tasks(wp);
	}

	wait_delete(wp);
	goto cleanup;

bad:
	pvmlogprintf("tm_spawn() from t%x bad msg format\n", mp->m_src);

cleanup:
	if (where)
		PVM_FREE(where);
	if (wxp)
		free_wait_spawn(wxp);
	return 0;
}


/*	assign_tasks()
*
*	This is only called when no replies are pending (at the beginning
*	of the operation or when all pvmds have checked back in).
*/

assign_tasks(wp)
	struct waitc *wp;		/* (any) waitc in peer group for this op */
{
	static int lasthh = -1;			/* for assigning hosts */

	struct waitc_spawn *wxp = (struct waitc_spawn*)wp->wa_spec;
	struct htab *htp;				/* set of hosts to use */
	int *vec;						/* result/status vector */
	int veclen;						/* length of vector */
	int count = 0;					/* num of tasks to be assigned */
	int nh;							/* num of hosts to assign tasks */
	int a = 0;						/* accum for finding hosts */
	int na = 0;						/* num tasks assigned to a host */
	struct waitc *wp2;
	struct hostd *hp;
	struct pmsg *mp;
	int t;
	int i;
	int tid;
	struct timeval now;

	if (!wxp)
		return 0;

	htp = wxp->w_ht;
	vec = wxp->w_vec;
	veclen = wxp->w_veclen;

	/*
	* if no hosts left, fill unassigned entries with PvmNoHost
	*/

	if (!htp->ht_cnt)
		for (t = veclen; t-- > 0; )
			if (!vec[t])
				vec[t] = PvmNoHost;

	/*
	* count tasks to be assigned, if none left reply to task
	*/

	for (t = veclen; t-- > 0; )
		if (!vec[t])
			count++;

	if (!count) {
		pkint(wp->wa_mesg, wxp->w_veclen);
		for (t = 0; t < wxp->w_veclen; t++) {
			tid = wxp->w_vec[t];
			pkint(wp->wa_mesg, tid);
			if (TIDISTASK(tid) && wxp->w_trctid > 0) {
				tev_send_spntask(
					wxp->w_trctid, wxp->w_trcctx, wxp->w_trctag,
					tid, wxp->w_ptid );
			}
			if (TIDISTASK(tid) && wxp->w_outtid > 0) {
				mp = mesg_new(0);
				mp->m_dst = wxp->w_outtid;
				mp->m_ctx = wxp->w_outctx;
				mp->m_tag = wxp->w_outtag;
				pkint(mp, tid);
				pkint(mp, TO_SPAWN);
				pkint(mp, wxp->w_ptid);
				sendmessage(mp);
			}
		}
		sendmessage(wp->wa_mesg);
		wp->wa_mesg = 0;

		free_wait_spawn(wxp);
		wp->wa_spec = 0;
		return 0;
	}

	/*
	* assign tasks to hosts
	*/

	nh = min(htp->ht_cnt, count);

	/* find first host to assign */

	if (lasthh == -1)
		lasthh = hosts->ht_local + 1;
	if (lasthh > htp->ht_last)
		lasthh = 0;
	while (!htp->ht_hosts[lasthh])
		if (++lasthh > htp->ht_last)
			lasthh = 1;
	hp = htp->ht_hosts[lasthh];

	for (t = 0; t < veclen && vec[t]; t++);

	while (t < veclen) {
/*
		pvmlogprintf("assign_tasks() %s <- %d\n", hp->hd_name, t);
*/

		vec[t] = hp->hd_hostpart;
		na++;

	/* when enough tasks for this host, move to next */

		if ((a += nh) >= count) {
			a -= count;

			wp2 = wait_new(WT_SPAWN);
			wp2->wa_tid = wp->wa_tid;
			wp2->wa_on = hp->hd_hostpart;
			wp2->wa_spec = wp->wa_spec;
			wp->wa_mesg->m_ref++;
			wp2->wa_mesg = wp->wa_mesg;
			LISTPUTBEFORE(wp, wp2, wa_peer, wa_rpeer);

			mp = mesg_new(0);
			pkint(mp, wxp->w_ptid);
			pkstr(mp, wxp->w_file);
			pkint(mp, wxp->w_flags);
			pkint(mp, na);
			pkint(mp, wxp->w_argc);
			for (i = 0; i < wxp->w_argc; i++)
				pkstr(mp, wxp->w_argv[i]);
			pkint(mp, wxp->w_outtid);
			pkint(mp, wxp->w_outctx);
			pkint(mp, wxp->w_outtag);
			pkint(mp, wxp->w_trctid);
			pkint(mp, wxp->w_trcctx);
			pkint(mp, wxp->w_trctag);
			pkint(mp, wxp->w_nenv);
			for (i = 0; i < wxp->w_nenv; i++)
				pkstr(mp, wxp->w_env[i]);
			pkint(mp, (t+1)-na);	/* start proc location 0..count-1 */
			pkint(mp, count);		/* how many are there in total */
			mp->m_dst = hp->hd_hostpart | TIDPVMD;
			mp->m_tag = DM_EXEC;
			mp->m_wid = wp2->wa_wid;

			do {
				if (++lasthh > htp->ht_last)
					lasthh = 1;
			} while (!htp->ht_hosts[lasthh]);

			sendmessage(mp);

			hp = htp->ht_hosts[lasthh];
			na = 0;
		}
		for (t++ ; t < veclen && vec[t]; t++);
	}
	return 0;
}


/*	tm_sendsig()
*
*	Task sending a signal to another task.
*
*	TM_SENDSIG
*	call {
*		int tid
*		int signum
*	}
*	ret { }
*/

int
tm_sendsig(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int tid;
	struct pmsg *mp2;

	if (upkuint(mp, &tid)) {
		pvmlogerror("tm_sendsig() bad msg format\n");
		return 0;
	}
	if (!TIDISTASK(tid)) {
		pvmlogprintf("tm_sendsig() bad tid %x\n", tid);
		return 0;
	}

	mp2 = replymessage(mp);
	sendmessage(mp2);

	mp->m_ref++;
	mp->m_src = pvmmytid;
	mp->m_dst = (tid & TIDHOST) | TIDPVMD;
	mp->m_tag = DM_SENDSIG;
	mp->m_wid = 0;
	sendmessage(mp);
	return 0;
}


/*	tm_config()
*
*	Task wants machine configuration.
*
*	TM_CONFIG
*	call { }
*	ret {
*		int nhosts
*		int narches
*		{
*			int tid
*			string name
*			string arch
*			int speed
*			int dsig
*		} [nhosts]
*	}
*/

int
tm_config(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int hh;
	struct hostd *hp;

	mp = replymessage(mp);
	pkint(mp, hosts->ht_cnt);
	pkint(mp, hosts->ht_narch);
	for (hh = 1; hh <= hosts->ht_last; hh++) {
		if (hp = hosts->ht_hosts[hh]) {
			pkint(mp, hp->hd_hostpart);
			pkstr(mp, hp->hd_name);
			pkstr(mp, hp->hd_arch ? hp->hd_arch : "");
			pkint(mp, hp->hd_speed);
			pkint(mp, hp->hd_dsig);
		}
	}
	sendmessage(mp);
	return 0;
}


/*	tm_halt()
*
*	Command from task to stop master pvmd.
*
*	TM_HALT
*	call { }
*	ret { }
*/

int
tm_halt(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	mp = mesg_new(0);
	mp->m_tag = DM_HALT;
	mp->m_dst = hosts->ht_hosts[hosts->ht_master]->hd_hostpart | TIDPVMD;
	sendmessage(mp);
	return 0;
}


/*	tm_task()
*
*	Task wants a list of tasks.
*	If where is a host or task tid, give a list of matching tasks.
*	If where is zero, give all running tasks.
*
*	TM_TASK
*	call {
*		int where
*	}
*	ret {
*		int errcode
*		{
*			int tid
*			int ptid
*			int hostpart
*			int flag
*			string a_out
*			int pid
*		} [count]    // implied
*	}
*/

int
tm_task(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int where;
	struct pmsg *mp2;
	struct waitc *wp;
	struct waitc *wp2 = 0;	/* master waitc of peer group */
	int hh;
	struct hostd *hp;

	if (upkuint(mp, &where)) {
		pvmlogerror("tm_task() bad msg format\n");
		return 0;
	}

	mp = replymessage(mp);
	mp->m_dst = tp->t_tid;
	mp->m_tag = TM_TASK;

	if (where) {		/* specific host or task requested */
		if (!(hp = tidtohost(hosts, where))) {
			pkint(mp, PvmNoHost);
			sendmessage(mp);
			return 0;
		}
		pkint(mp, 0);
		wp = wait_new(WT_TASK);
		wp->wa_mesg = mp;
		wp->wa_tid = tp->t_tid;
		wp->wa_on = hp->hd_hostpart;

		mp = mesg_new(0);
		mp->m_tag = DM_TASK;
		mp->m_dst = hp->hd_hostpart | TIDPVMD;
		mp->m_wid = wp->wa_wid;
		pkint(mp, where);
		sendmessage(mp);

	} else {			/* all tasks requested */
		pkint(mp, 0);

		wp2 = wait_new(WT_TASK);
		mp->m_ref++;
		wp2->wa_mesg = mp;
		wp2->wa_tid = tp->t_tid;

		mp2 = mesg_new(0);
		mp2->m_tag = DM_TASK;
		pkint(mp2, 0);

		for (hh = hosts->ht_last; hh > 0; hh--) {
			if (!hosts->ht_hosts[hh])
				continue;

			wp = wait_new(WT_TASK);
			mp->m_ref++;
			wp->wa_mesg = mp;
			wp->wa_tid = tp->t_tid;
			wp->wa_on = hosts->ht_hosts[hh]->hd_hostpart;

			LISTPUTBEFORE(wp2, wp, wa_peer, wa_rpeer);

			mp2->m_dst = hosts->ht_hosts[hh]->hd_hostpart | TIDPVMD;
			mp2->m_wid = wp->wa_wid;
			mp2->m_ref++;
			sendmessage(mp2);
		}
		pmsg_unref(mp2);
		pmsg_unref(mp);

		/* send message if all waiters are in */

		if (wp2->wa_peer == wp2) {
			mp->m_ref++;
			sendmessage(mp);
		}
		wait_delete(wp2);
	}
	return 0;
}


/*	tm_tickle()
*
*	Task wants to poke at pvmd.
*
*	TM_TICKLE
*	call {
*		int nargs
*		int args[nargs]
*	}
*	ret {
*		int nresult
*		int results[nresult]
*	}
*/

int
tm_tickle(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int nar;
	int arg[10];
	int i;
	char *p;
	struct hostd *hp;

	upkint(mp, &nar);
	if (nar < 1 || nar > 10) {
		pvmlogprintf("tm_tickle() bad msg format\n");
		return 0;
	}
	for (i = 0; i < nar; i++)
		upkint(mp, &arg[i]);
	while (i < sizeof(arg)/sizeof(arg[0]))
		arg[i++] = 0;

	pvmlogprintf("tm_tickle() #");
	for (i = 0; i < nar; i++)
		pvmlogprintf(" %d", arg[i]);
	pvmlogprintf("\n");

	mp = replymessage(mp);

	switch (arg[0]) {

	case 0:
		i_dump(1);
		pkint(mp, 0);
		break;

	case 1:
		ht_dump(hosts);
		pkint(mp, 0);
		break;

	case 2:
		task_dump();
		pkint(mp, 0);
		break;

	case 3:
		wait_dumpall();
		pkint(mp, 0);
		break;

	case 4:
		mb_dumpall();
		pkint(mp, 0);
		break;

	case 5:
		pkint(mp, 1);
		pkint(mp, pvmdebmask);
		break;

	case 6:
		pvmdebmask = arg[1];
		pvmlogprintf("tm_tickle() debug mask is %x (%s)\n",
				pvmdebmask, debug_flags(pvmdebmask));
		pkint(mp, 0);
		break;

	case 7:
		if (arg[1] > 0 && arg[1] < 50) {
			nopax = arg[1];
			pvmlogprintf("tm_tickle() nopax is %d\n", nopax);
		} else
			pvmlogprintf("tm_tickle() bogus nopax %d\n", arg[1]);
		pkint(mp, 0);
		break;

	case 8:
		pkint(mp, 1);
		if ((hp = tidtohost(hosts, arg[1]))
		&& hp != hosts->ht_hosts[hosts->ht_local]) {
			pvmlogprintf("tm_tickle() failing %s\n", hp->hd_name);
			hostfailentry(hp);
			ht_delete(hosts, hp);
			if (newhosts)
				ht_delete(newhosts, hp);
			pkint(mp, 1);
		} else {
			if (hp)
				pvmlogprintf("tm_tickle() can't fail %s\n", hp->hd_name);
			else
				pvmlogprintf("tm_tickle() no such host %x\n", arg[1]);
			pkint(mp, 0);
		}
		break;

	case 9:
#ifdef	STATISTICS
		dump_statistics();
		if (arg[1])
			reset_statistics();
#else
		pvmlogerror("tm_tickle() statistics not compiled in\n");
#endif
		pkint(mp, 0);
		break;

	case 100:
#ifdef SHMEM
		peer_dump();
		pidtid_dump();
#else
		pvmlogerror("tm_tickle() not using shared memory\n");
#endif
		pkint(mp, 0);
		break;

	case 20:
		{
			struct pmsg *mp2;
			int i, j;

			for (j = 0; j < arg[1]; j++) {
				mp2 = mesg_new(0);
				for (i = 0; i < 1000; i++)
					pkint(mp2, i);
				mp2->m_dst = arg[2];
				mp2->m_tag = DM_NULL;
				sendmessage(mp2);
			}
			pkint(mp, 0);
		}
		break;

	default:
		pvmlogprintf("tm_tickle() don't know #%d\n", arg[0]);
		pkint(mp, 0);
		break;
	}

	sendmessage(mp);
	return 0;
}


/*	tm_delhost()
*
*	Task requesting to add hosts to virtual machine.  Exit point is
*	here or dm_delhostack().
*
*	TM_DELHOST
*	call {
*		int nhosts
*		string names[nhosts]
*	}
*	ret {
*		int nhosts			// or negative for error
*		int status[nhosts]	// status of each host
*	}
*/

int
tm_delhost(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int count;
	char *buf;
	struct waitc *wp;

	/* sanity check the message */

	if (upkint(mp, &count))
		goto bad;
	if (count < 1 || count > (tidhmask >> (ffs(tidhmask) - 1)))
		goto bad;
	while (count-- > 0)
		if (upkstralloc(mp, &buf))
			goto bad;
		else
			PVM_FREE(buf);

	/* make a wait channel for the task */

	wp = wait_new(WT_DELHOST);
	wp->wa_tid = tp->t_tid;
	wp->wa_on = hosts->ht_hosts[hosts->ht_master]->hd_hostpart;
	wp->wa_mesg = replymessage(mp);

	/* forward message to master pvmd */

	mp->m_ref++;
	mp->m_src = pvmmytid;
	mp->m_dst = hosts->ht_hosts[hosts->ht_master]->hd_hostpart | TIDPVMD;
	mp->m_tag = DM_DELHOST;
	mp->m_wid = wp->wa_wid;
	sendmessage(mp);
	return 0;

bad:
	pvmlogprintf("tm_delhost() from t%x bad msg format\n", mp->m_src);
	return 0;
}


/*	tm_mca()
*
*	Task announces multicast address list for subsequent message.
*
*	TM_MCA
*	call {
*		int count			// number of addresses
*		int tids[count]		// addresses
*	}
*	// No Return
*/

int
tm_mca(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	struct mca *mcap;			/* mca descriptor */
	int ndst;					/* num of dst tids */
	int *dsts;					/* dst tids */
	int tid;
	int i, j;

	/*
	* unpack list of dst tids from message (expect it to be sorted).
	* discard tids to nonexistent foreign hosts.
	*/

	mcap = mca_new();
	mcap->mc_tid = TIDGID | tp->t_tid;
	upkint(mp, &ndst);
	dsts = TALLOC(ndst, int, "dsts");
	for ( i=0, j=0 ; i < ndst ; i++ ) {
		upkuint(mp, &tid);
		if ((tid & tidhmask) == myhostpart || tidtohost(hosts, tid))
			dsts[j++] = tid;
	}
	ndst = j;

	if (ndst < 1)
		goto noguys;

	/*
	* send DM_MCA messages containing tids to destination hosts
	* make list of destination hosts for us
	*/

	mcap->mc_dsts = TALLOC(ndst, int, "mcal");	/* XXX cheat, too much space */
	mcap->mc_ndst = 0;

	for (j = 0; j < ndst; ) {
		i = j;
		tid = dsts[i] & tidhmask;
		while (++j < ndst && tid == (dsts[j] & tidhmask)) ;
		mp = mesg_new(0);
		mp->m_dst = (tid |= TIDPVMD);
		mp->m_tag = DM_MCA;
		pkint(mp, mcap->mc_tid);
		pkint(mp, j - i);
		while (i < j)
			pkint(mp, dsts[i++]);
		sendmessage(mp);
		mcap->mc_dsts[mcap->mc_ndst++] = tid;
	}

noguys:
	PVM_FREE(dsts);

	/*
	* tag task descriptor with mca desc and send mca back to task
	*/

	tp->t_mca = mcap;

	if (pvmdebmask & PDMMESSAGE) {
		pvmlogprintf("tm_mca() made mca %x for t%x\n",
				mcap->mc_tid, tp->t_tid);
	}
	return 0;
}


/*	tm_notify()
*
*	Task wants to be notified when an event happens.
*
*	TM_NOTIFY
*	call {
*		int what			// event type and flags
*		int ctx				// message context to use in reply
*		int tag				// message tag to use in reply
*		int count			// number of addresses
*		int tids[count]		// addresses
*	}
*	// No Return
*/

int
tm_notify(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int what, flags, ctx, tag, count, tid;
	struct hostd *hp;
	struct pmsg *mp2;
	struct waitc *wp;
	int i;

	if (upkint(mp, &what)
	|| upkint(mp, &ctx)
	|| upkint(mp, &tag)
	|| upkint(mp, &count)) {
		pvmlogerror("tm_notify() bad msg format\n");
		return 0;
	}

	flags = what;
	what &= 0xff;
	switch (what) {

	/*
	* TaskExit: make a wait context that hangs out until the task is
	* cleaned up.
	*/

	case PvmTaskExit:
		for (i = 0; i < count; i++) {
			if (upkuint(mp, &tid)) {
				pvmlogerror("tm_notify() bad msg format\n");
				return 0;
			}

			if (flags & PvmNotifyCancel) {
				FORLIST (wp, waitlist, wa_link)
					if (wp->wa_kind == WT_TASKX
					&& wp->wa_on == tid
					&& wp->wa_tid == tp->t_tid
					&& wp->wa_mesg->m_ctx == ctx
					&& wp->wa_mesg->m_tag == tag)
						break;
				if (wp != waitlist) {
					sendmessage(wp->wa_mesg);
					wp->wa_mesg = 0;
					if ((hp = tidtohost(hosts, tid))
					&& hp->hd_hostpart != myhostpart)
						wp->wa_tid = 0;
					else
						wait_delete(wp);
				}

			} else {
				/* make reply message */

				mp2 = mesg_new(0);
				mp2->m_dst = tp->t_tid;
				mp2->m_ctx = ctx;
				mp2->m_tag = tag;
				mp2->m_wid = mp->m_wid;
				pkint(mp2, tid);

				/* if task doesn't exist, reply immediately */

				if (!(hp = tidtohost(hosts, tid))
				|| (hp->hd_hostpart == myhostpart && !task_find(tid)))
					sendmessage(mp2);

				else {

					/* otherwise make a wc for it */

					wp = wait_new(WT_TASKX);
					wp->wa_on = tid;
					wp->wa_tid = tp->t_tid;
					wp->wa_dep = mp->m_wid;
					wp->wa_mesg = mp2;

					/* and if not on this host, pass on the request */

					if (hp->hd_hostpart != myhostpart) {
						mp2 = mesg_new(0);
						pkint(mp2, what);
						pkint(mp2, tid);
						mp2->m_dst = hp->hd_hostpart | TIDPVMD;
						mp2->m_tag = DM_NOTIFY;
						mp2->m_wid = wp->wa_wid;
						sendmessage(mp2);
					}
				}
			}
		}
		break;

	case PvmHostDelete:
		for (i = 0; i < count; i++) {
			if (upkuint(mp, &tid)) {
				pvmlogerror("tm_notify() bad msg format\n");
				return 0;
			}
			if (flags & PvmNotifyCancel) {
				if (hp = tidtohost(hosts, tid)) {
					FORLIST (wp, waitlist, wa_link)
						if (wp->wa_kind == WT_HOSTF
						&& wp->wa_on == hp->hd_hostpart
						&& wp->wa_tid == tp->t_tid
						&& wp->wa_mesg->m_ctx == ctx
						&& wp->wa_mesg->m_tag == tag)
							break;
					if (wp != waitlist) {
						sendmessage(wp->wa_mesg);
						wp->wa_mesg = 0;
						wait_delete(wp);
					}
				}

			} else {
				mp2 = mesg_new(0);
				mp2->m_dst = mp->m_src;
				mp2->m_ctx = ctx;
				mp2->m_tag = tag;
				pkint(mp2, tid);
				if (hp = tidtohost(hosts, tid)) {
					wp = wait_new(WT_HOSTF);
					wp->wa_tid = tp->t_tid;
					wp->wa_on = hp->hd_hostpart;
					wp->wa_mesg = mp2;

				} else {
					sendmessage(mp2);
				}
			}
		}
		break;

	case PvmHostAdd:

		FORLIST (wp, waitlist, wa_link)
			if (wp->wa_kind == WT_HOSTA
			&& wp->wa_tid == tp->t_tid
			&& wp->wa_mesg->m_ctx == ctx
			&& wp->wa_mesg->m_tag == tag)
				break;

	/* if cancelling, delete possible existing waitc */

		if (count == 0 || (flags & PvmNotifyCancel)) {
			if (wp != waitlist)
				wait_delete(wp);

	/* otherwise, update existing waitc or create new one */
		} else {
			if (wp == waitlist) {
				wp = wait_new(WT_HOSTA);
				wp->wa_tid = tp->t_tid;
				wp->wa_on = tp->t_tid;
				wp->wa_mesg = mesg_new(0);
				wp->wa_mesg->m_dst = tp->t_tid;
				wp->wa_mesg->m_ctx = ctx;
				wp->wa_mesg->m_tag = tag;
			}
			wp->wa_count = count;
		}
		break;

	default:
		pvmlogprintf("tm_notify() unknown what=%d\n", what);
		break;
	}
	return 0;
}


/*	tm_mstat()
*
*	Task wants status of a host.
*
*	TM_MSTAT
*	call {
*		string hostname
*	}
*	ret {
*		int status
*	}
*/

int
tm_mstat(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	char *name;
	struct hostd *hp;
	struct waitc *wp;

	if (upkstralloc(mp, &name)) {
		pvmlogerror("tm_mstat() bad msg format\n");
		return 0;
	}

	hp = nametohost(hosts, name);

	PVM_FREE(name);

	if (!hp) {
		mp = replymessage(mp);
		pkint(mp, PvmNoHost);
		sendmessage(mp);
		return 0;
	}

	wp = wait_new(WT_MSTAT);
	wp->wa_tid = tp->t_tid;
	wp->wa_on = hp->hd_hostpart;
	wp->wa_mesg = replymessage(mp);

	mp = mesg_new(0);
	mp->m_dst = hp->hd_hostpart | TIDPVMD;
	mp->m_tag = DM_PSTAT;
	mp->m_wid = wp->wa_wid;
	pkint(mp, hp->hd_hostpart | TIDPVMD);
	sendmessage(mp);
	return 0;
}


/*	tm_db()
*
*	Task wants a database op.
*
*	TM_DB
*	call {
*		int opcode		// insert, delete, lookup
*		string name
*		int index
*		int data		// if 'insert'
*	}
*	ret {
*		int index		// and status
*		int data		// if 'lookup'
*	}
*/

int
tm_db(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	struct waitc *wp;

	wp = wait_new(WT_DB);
	wp->wa_tid = tp->t_tid;
	wp->wa_on = hosts->ht_hosts[hosts->ht_master]->hd_hostpart;
	wp->wa_mesg = replymessage(mp);

	mp->m_ref++;
	mp->m_src = pvmmytid;
	mp->m_dst = hosts->ht_hosts[hosts->ht_master]->hd_hostpart | TIDPVMD;
	mp->m_tag = DM_DB;
	mp->m_wid = wp->wa_wid;
	sendmessage(mp);
	return 0;
}


/*	tm_sched()
*
*	Task wants to register as scheduler.
*
*	TM_SCHED
*	call {
*	}
*	ret {
*		int oldschedtid
*		int tid			// of master pvmd
*		string name
*		string arch
*		int speed
*		int dsig
*	}
*/

int
tm_sched(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	struct pmsg *mp2;
	struct hostd *hp;

	mp2 = replymessage(mp);
	pkint(mp2, pvmschedtid);
	hp = hosts->ht_hosts[hosts->ht_master];
	pkint(mp2, hp->hd_hostpart);
	pkstr(mp2, hp->hd_name);
	pkstr(mp2, hp->hd_arch ? hp->hd_arch : "");
	pkint(mp2, hp->hd_speed);
	pkint(mp2, hp->hd_dsig);
	sendmessage(mp2);

	if (pvmdebmask & PDMSCHED) {
		pvmlogprintf("tm_sched() old t%x new t%x\n", pvmschedtid, tp->t_tid);
	}
	pvmschedtid = tp->t_tid;
	tp->t_flag |= TF_ISSCHED;
	return 0;
}


/*	tm_tasker()
*
*	Task registers as local task starter.
*
*	TM_TASKER
*	call {
*		int regme	// true to register, false to unregister
*	}
*	ret {
*		int code	// 0 if okay, else error
*	}
*/

int
tm_tasker(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int regme;
	struct pmsg *mp2;

	if (upkint(mp, &regme)) {
		pvmlogerror("tm_tasker() bad msg format\n");
		return 0;
	}
	mp2 = replymessage(mp);

	if (regme) {
		if (taskertid) {
			pkint(mp2, PvmAlready);

		} else {
			taskertid = tp->t_tid;
			tp->t_flag |= TF_ISTASKER;
			if (pvmdebmask & PDMTASK) {
				pvmlogprintf("tm_tasker() register t%x \n", tp->t_tid);
			}
			pkint(mp2, 0);
		}

	} else {
		if (taskertid == tp->t_tid) {
			taskertid = 0;
			tp->t_flag &= ~TF_ISTASKER;
			if (pvmdebmask & PDMTASK) {
				pvmlogprintf("tm_tasker() unregister t%x \n", tp->t_tid);
			}
			pkint(mp2, 0);

		} else {
			if (pvmdebmask & PDMTASK) {
				pvmlogprintf("tm_tasker() t%x tries to unregister?\n",
						tp->t_tid);
			}
			pkint(mp2, PvmNoTask);
		}
	}
	sendmessage(mp2);
	return 0;
}


/*	tm_hoster()
*
*	Task registers as slave pvmd starter.
*
*	TM_HOSTER
*		int regme	// true to register, false to unregister
*	call {
*	}
*	ret {
*		int code	// 0 if okay, else error
*	}
*/

int
tm_hoster(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int regme;
	struct pmsg *mp2;

	if (upkint(mp, &regme)) {
		pvmlogerror("tm_hoster() bad msg format\n");
		return 0;
	}

	mp2 = replymessage(mp);

	/* always wear 2 condoms, who knows if indices are enough... :) */
	if (hosts->ht_hosts[hosts->ht_local]->hd_hostpart
			!= hosts->ht_hosts[hosts->ht_master]->hd_hostpart) {
		pkint(mp2, PvmHostrNMstr);
		sendmessage(mp2);
		return 0;
	}

	if (regme) {
		if (hostertid) {
			pkint(mp2, PvmAlready);

		} else {
			hostertid = tp->t_tid;
			tp->t_flag |= TF_ISHOSTER;
			if (pvmdebmask & PDMSTARTUP) {
				pvmlogprintf("tm_hoster() register t%x \n", tp->t_tid);
			}
			pkint(mp2, 0);
		}

	} else {
		if (hostertid == tp->t_tid) {
			hostertid = 0;
			tp->t_flag &= ~TF_ISHOSTER;
			if (pvmdebmask & PDMSTARTUP) {
				pvmlogprintf("tm_hoster() unregister t%x \n", tp->t_tid);
			}
			pkint(mp2, 0);

		} else {
			if (pvmdebmask & PDMSTARTUP) {
				pvmlogprintf("tm_hoster() t%x tries to unregister?\n",
						tp->t_tid);
			}
			pkint(mp2, PvmNoTask);
		}
	}

	sendmessage(mp2);
	return 0;
}


/*	tm_tracer()
*
*	Task registers as task trace collector.
*
*	TM_TRACER
*	call {
*		int regme		// true to register, false to unregister
*		int tctx		// if regme true, message ctxt for trace events
*		int ttag		// if regme true, message tag for trace events
*		int octx		// if regme true, message ctxt for output events
*		int otag		// if regme true, message tag for output events
*		Pvmtmask tmask	// if regme true, trace mask for tasks
*		int tbuf		// if regme true, trace buffer size for tasks
*		int topt		// if regme true, trace options for tasks
*	}
*	ret {
*		int code	// 0 if okay, else error
*	}
*/

int
tm_tracer(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int regme;
	int tctx;
	int ttag;
	int octx;
	int otag;
	Pvmtmask tmask;
	int tbuf;
	int topt;
	struct pmsg *mp2;
	struct pmsg *mpd;
	int slconf;
	int hh;
	struct hostd *hp;
	char buf[512];

	if (upkint(mp, &regme)) {
		pvmlogerror("tm_tracer() bad msg format\n");
		return 0;
	}

	mp2 = replymessage(mp);

	slconf = 0;

	if (regme) {
		if (pvmtracer.trctid) {
			pkint(mp2, PvmAlready);

		} else {
			tp->t_flag |= TF_ISTRACER;
			upkint(mp, &tctx);
			upkint(mp, &ttag);
			upkint(mp, &octx);
			upkint(mp, &otag);
			upkstr(mp, tmask, TEV_MASK_LENGTH);
			upkint(mp, &tbuf);
			upkint(mp, &topt);
			pvmtracer.trctid = tp->t_tid;
			pvmtracer.trcctx = tctx;
			pvmtracer.trctag = ttag;
			pvmtracer.outtid = tp->t_tid;
			pvmtracer.outctx = octx;
			pvmtracer.outtag = otag;
			BCOPY(tmask,pvmtracer.tmask,TEV_MASK_LENGTH);
			pvmtracer.trcbuf = tbuf;
			pvmtracer.trcopt = topt;
			slconf++;
			if (pvmdebmask & PDMTRACE) {
				pvmlogprintf("tm_tracer() register t%x \n", tp->t_tid);
			}
			pkint(mp2, 0);
		}

	} else {
		if (pvmtracer.trctid == tp->t_tid) {
			tp->t_flag &= ~TF_ISTRACER;
			pvmtracer.trctid = 0;
			pvmtracer.trcctx = 0;
			pvmtracer.trctag = 0;
			pvmtracer.outtid = 0;
			pvmtracer.outctx = 0;
			pvmtracer.outtag = 0;
			TEV_MASK_INIT(pvmtracer.tmask);
			pvmtracer.trcbuf = 0;
			pvmtracer.trcopt = 0;
			slconf++;
			if (pvmdebmask & PDMTRACE) {
				pvmlogprintf("tm_tracer() unregister t%x \n",
					tp->t_tid);
			}
			pkint(mp2, 0);

		} else {
			if (pvmdebmask & PDMTRACE) {
				pvmlogprintf("tm_tracer() t%x tries to unregister?\n",
						tp->t_tid);
			}
			pkint(mp2, PvmNoTask);
		}
	}

	/* Update slave pvmds */
	if ( slconf ) {
		for (hh = hosts->ht_last; hh > 0; hh--) {
			if (hp = hosts->ht_hosts[hh]) {
				mpd = mesg_new(0);
				mpd->m_tag = DM_SLCONF;
				mpd->m_dst = hp->hd_hostpart | TIDPVMD;
				pkint(mpd, DM_SLCONF_TRACE);
				sprintf(buf, "%x %d %d %x %d %d %d %d %s",
					pvmtracer.trctid, pvmtracer.trcctx,
						pvmtracer.trctag,
					pvmtracer.outtid, pvmtracer.outctx,
						pvmtracer.outtag,
					pvmtracer.trcbuf, pvmtracer.trcopt,
					pvmtracer.tmask);
				pkstr(mpd, buf);
				sendmessage(mpd);
			}
		}
	}

	/* Send response to requesting task */
	sendmessage(mp2);

	return 0;
}


/*	tm_hostsync()
*
*	Task wants clock sample from host.
*
*	TM_HOSTSYNC
*	call {
*		int where	// pvmd tid
*	}
*	ret {
*		int status	// 0 or error code
*		int sec		// time of day clock
*		int usec
*	}
*/

int
tm_hostsync(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int where;
	struct hostd *hp;
	struct pmsg *mp2;
	struct waitc *wp;

	if (upkuint(mp, &where)) {
		pvmlogerror("tm_hostsync() bad msg format\n");
		return 0;
	}

	mp2 = replymessage(mp);

	if (!(hp = tidtohost(hosts, where))) {
		pkint(mp2, PvmNoHost);
		sendmessage(mp2);
		return 0;
	}

	wp = wait_new(WT_HOSTSYNC);
	wp->wa_tid = mp->m_src;
	wp->wa_on = hp->hd_hostpart;
	wp->wa_mesg = mp2;

	mp2 = mesg_new(0);
	mp2->m_dst = where | TIDPVMD;
	mp2->m_tag = DM_HOSTSYNC;
	mp2->m_wid = wp->wa_wid;

	sendmessage(mp2);
	return 0;
}


/*	tm_setopt()
*
*	Set task parameters
*
*	TM_SETOPT
*	call {
*		{
*			int what		// which option
*			string value	// option value
*		} []				// implied
*	}
*	ret {
*/

int
tm_setopt(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	int what;
	char *val;
	int x;

	while (!upkint(mp, &what) && !upkstralloc(mp, &val)) {
		switch (what) {

		case TS_OUTTID:
			x = pvmxtoi(val);
			change_output(tp, x, tp->t_outctx, tp->t_outtag);
			break;

		case TS_OUTCTX:
			x = pvmxtoi(val);
			change_output(tp, tp->t_outtid, x, tp->t_outtag);
			break;

		case TS_OUTTAG:
			x = pvmxtoi(val);
			change_output(tp, tp->t_outtid, tp->t_outctx, x);
			break;

		case TS_TRCTID:
			x = pvmxtoi(val);
			change_trace(tp, x, tp->t_trcctx, tp->t_trctag);
			break;

		case TS_TRCCTX:
			x = pvmxtoi(val);
			change_trace(tp, tp->t_trctid, x, tp->t_trctag);
			break;

		case TS_TRCTAG:
			x = pvmxtoi(val);
			change_trace(tp, tp->t_trctid, tp->t_trcctx, x);
			break;

		default:
			pvmlogprintf("tm_setopt() ? option %d val <%s>\n", what, val);
			break;
		}
		if (val)
			PVM_FREE(val);
	}
	mp = replymessage(mp);
	sendmessage(mp);
	return 0;
}


/*	change_output()
*
*	Task sets its output sink parameters.
*	If new values are different we have to send close and open
*	messages, because the collector won't know what's going on.
*/

int
change_output(tp, outtid, outctx, outtag)
	struct task *tp;
	int outtid, outctx, outtag;
{
	struct pmsg *mp;

	if (tp->t_outtid != outtid
	|| tp->t_outctx != outctx
	|| tp->t_outtag != outtag) {
		if (tp->t_outtid > 0) {
			mp = mesg_new(0);
			mp->m_dst = tp->t_outtid;
			mp->m_ctx = tp->t_outctx;
			mp->m_tag = tp->t_outtag;
			pkint(mp, tp->t_tid);
			pkint(mp, TO_EOF);
			sendmessage(mp);
		}
		if (pvmdebmask & PDMTASK) {
			pvmlogprintf("t%x changes output from <t%x %d %d> to <t%x %d %d>\n",
					tp->t_tid,
					tp->t_outtid, tp->t_outctx, tp->t_outtag,
					outtid, outctx, outtag);
		}
		tp->t_outtid = outtid;
		tp->t_outctx = outctx;
		tp->t_outtag = outtag;
		if (tp->t_outtid > 0) {
			mp = mesg_new(0);
			mp->m_dst = tp->t_outtid;
			mp->m_ctx = tp->t_outctx;
			mp->m_tag = tp->t_outtag;
			pkint(mp, tp->t_tid);
			pkint(mp, TO_SPAWN);
			pkint(mp, tp->t_ptid);
			sendmessage(mp);

			mp = mesg_new(0);
			mp->m_dst = tp->t_outtid;
			mp->m_ctx = tp->t_outctx;
			mp->m_tag = tp->t_outtag;
			pkint(mp, tp->t_tid);
			pkint(mp, TO_NEW);
			pkint(mp, tp->t_ptid);
			sendmessage(mp);
		}
	}
	return 0;
}


/*	tm_getopt()
*
*	Connect message for shared memory apps to get trace & output parameters.
*	XXX hack, will be merged into something else in v3.4.
*
*	TM_GETOPT
*	call { }
*	ret {
*		int tid				// task tid
*		int ptid			// parent tid
*		int outtid			// output dst
*		int outctx
*		int outtag
*		int trctid			// trace dst
*		int trcctx
*		int trctag
*		int udpmax
*		int nativecode
*		string inaddr
*		int schedtid		// scheduler tid
*	}
*/

int
tm_getopt(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	mp = replymessage(mp);
	pkint(mp, tp->t_tid);
	pkint(mp, tp->t_ptid);
	pkint(mp, tp->t_outtid);
	pkint(mp, tp->t_outctx);
	pkint(mp, tp->t_outtag);
	pkint(mp, tp->t_trctid);
	pkint(mp, tp->t_trcctx);
	pkint(mp, tp->t_trctag);
	pkint(mp, pvmudpmtu);
	pkint(mp, pvmmydsig);
	pkstr(mp, inadport_hex(&(hosts->ht_hosts[hosts->ht_local]->hd_sad)));
	pkint(mp, pvmschedtid);
	mp->m_flag |= MM_PRIO;  /* set priority handling for this message */
	sendmessage(mp);

#if defined(SHMEM)

	/* now change connection to TF_SHMCONN and try send packets that have
		gathered in this tasks sendq */
	tp->t_flag &= ~TF_PRESHMCONN;  
	tp->t_flag |= TF_SHMCONN;
	
	shm_wrt_pkts(tp); /* write  any waiting packets */
	
#endif
	return 0;
}


/*	change_trace()
*
*	Task sets its trace sink parameters.
*	If new values are different we have to send close and open
*	messages, because the collector won't know what's going on.
*/

int
change_trace(tp, trctid, trcctx, trctag)
	struct task *tp;
	int trctid, trcctx, trctag;
{
	struct pmsg *mp;

	if (tp->t_trctid != trctid
			|| tp->t_trcctx != trcctx
			|| tp->t_trctag != trctag) {
		if (tp->t_trctid > 0) {
			tev_send_endtask(
				tp->t_trctid, tp->t_trcctx, tp->t_trctag,
				tp->t_tid, tp->t_status,
				tp->t_utime.tv_sec, tp->t_utime.tv_usec,
				tp->t_stime.tv_sec, tp->t_stime.tv_usec );
		}
		if (pvmdebmask & PDMTASK) {
			pvmlogprintf( "t%x changes trace from <t%x %d %d> to <t%x %d %d>\n",
					tp->t_tid,
					tp->t_trctid, tp->t_trcctx, tp->t_trctag,
					trctid, trcctx, trctag);
		}
		tp->t_trctid = trctid;
		tp->t_trcctx = trcctx;
		tp->t_trctag = trctag;
		if (tp->t_trctid > 0) {
			tev_send_spntask(
				tp->t_trctid, tp->t_trcctx, tp->t_trctag,
				tp->t_tid, tp->t_ptid );
			tev_send_newtask(
				tp->t_trctid, tp->t_trcctx, tp->t_trctag,
				tp->t_tid, tp->t_ptid, 0 /* XXX lie */,
				tp->t_a_out ? tp->t_a_out : "" );
		}
	}
	return 0;
}


/*	tm_context()
*
*	Assign task a communication context, or free one in use
*
*	TM_CONTEXT
*	call {
*		int cid			// to free, or zero if requesting one
*	}
*	ret {
*		int cid			// new cid or status
*	}
*/

int
tm_context(tp, mp)
	struct task *tp;
	struct pmsg *mp;
{
	struct pmsg *mp2;
	struct ccon *cp;
	int cid;
	int cc;

	mp2 = replymessage(mp);
	if (upkuint(mp, &cid)) {
		cc = PvmBadParam;

	} else {
		if (cid == 0) {
			if (cp = ccon_new(tp))
				cc = cp->c_cid;
			else
				cc = PvmOutOfRes;

		} else {
			cc = PvmNotFound;
			if (tp->t_ccs) {
				FORLIST (cp, tp->t_ccs, c_peer) {
					if (cp->c_cid == cid) {
						LISTDELETE(cp, c_link, c_rlink);
						LISTDELETE(cp, c_peer, c_rpeer);
						PVM_FREE(cp);
						cc = 0;
						break;
					}
				}
			}
		}
	}
	pkint(mp2, cc);
	sendmessage(mp2);
	return 0;
}


/*	tm_shmconn()
 *
 *	Task connecting to pvmd under the shmem system.
 *	This is used to tie the task to the pvmd socket into the correct
 *	task struct.  This socket is a one-way only socket that is used
 *	by the task to kick the daemon when the task has put a message
 *	for it in its shared memory inbox segment.  The socket is also
 *	used to catch when the task dies....
 *	
 *	There is no d-auth file checking.
 *	Only data passed is the task's pid so we can match it with our
 *	own table.
 *
 *	TM_SHMCONN()
 *	call {
 *		int unixpid			// real pid
 *	}
 *	ret {
 *	}
 */

int
tm_shmconn(tp, mp)
	struct task *tp;
	struct pmsg *mp;	/* remember no return message */
{
	int tid;			
	int pid;						/* real pid of task */
	int ppid;						/* probable pid */
	struct task *tp2;				/* to search for real context */
	struct pmsg *mp2;
	int cc;
	char c;
	int dummy_true = 1;				/* not so harsh "always true"? */

	if ( upkint( mp, &ppid ) ) {
		pvmlogerror(
				"tm_shmconn() bad msg format. Missing Unix pid??\n" );
		goto bail;
	}

#ifndef	SHMEM
	if ( dummy_true ) {
		pvmlogerror( "tm_shmconn() shmem conn on non shmem sys? " );
		pvmlogerror( "Bit bucket...\n" );
		goto bail;
	}
#endif
	
	/*
	*	Task should already have a correct task struct.
	*	Because TM_SHMCONN is passed after the shmem connect phase
	*	in pvmbeatask() in lpvmshmem.c
	*/

	if ( tp2 = task_findpid( ppid ) )
	{
		if ( pvmdebmask & PDMTASK ) {
			pvmlogprintf(
					"tm_shmconn() reconnect task t%x via sockets\n",
					tp2->t_tid);
		}
		/* tp->t_sched = tp2->t_sched; */
	}
	else
	{
		pvmlogerror( "tm_shmconn() conn from non shmem connd task?\n");
		goto bail;		
	}

	/* 
	 * tp2 is now the real task.... 
	 * So we now update it with the correct socket descripter and then
	 * mark the shadow one as dead.
	 *
	 */

	tp2->t_sock = tp->t_sock;

	tp->t_sock = -1;	/* tp will be freed by loclinput() */
	tp->t_rxp = 0;
	tp->t_flag |= TF_CLOSE; /* stop work() complaining - dead socket? */

	/*
	 *	kick it in the butt; it's ready to go
	 */

	/* We are happy to mark it as a shared memory connected process */
	/* i.e. please use shmem (mpp_output) not my socket to talk to me */

	/* the PRESHMCONN flag indicates that messages with MM_PRIO can be
		sent over shared memory, but no others.  tm_getopt will
		set the flag to SHMCONN
	*/
	tp2->t_flag |= TF_PRESHMCONN;

	return 0;

bail:
	/* i.e. kill it! */
	tp->t_sock = -1;	/* tp will be freed by loclinput() */
	tp->t_rxp = 0;
	tp->t_flag |= TF_CLOSE;
	return 0;
}



syntax highlighted by Code2HTML, v. 0.9.1