static char rcsid[] =
	"$Id: lpvmmpp.c,v 1.10 1997/07/21 14:31:57 pvmsrc Exp $";

/*
 *         PVM version 3.4:  Parallel Virtual Machine System
 *               University of Tennessee, Knoxville TN.
 *           Oak Ridge National Laboratory, Oak Ridge TN.
 *                   Emory University, Atlanta GA.
 *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
 *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
 *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
 *                   (C) 1997 All Rights Reserved
 *
 *                              NOTICE
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby granted
 * provided that the above copyright notice appear in all copies and
 * that both the copyright notice and this permission notice appear in
 * supporting documentation.
 *
 * Neither the Institutions (Emory University, Oak Ridge National
 * Laboratory, and University of Tennessee) nor the Authors make any
 * representations about the suitability of this software for any
 * purpose.  This software is provided ``as is'' without express or
 * implied warranty.
 *
 * PVM version 3 was funded in part by the U.S. Department of Energy,
 * the National Science Foundation and the State of Tennessee.
 */

/*
 *	lpvmmpp.c
 *
 *	support routines for the MPP environment
 *
 */

#include <stdio.h>
#include <rpc/types.h>
#include <rpc/xdr.h>
#ifdef	SYSVSTR
#include <string.h>
#else
#include <strings.h>
#endif
#include <errno.h>

#include <pvm3.h>
#include "global.h"
#include "pvmalloc.h"
#include "pvmfrag.h"
#include "listmac.h"
#include "bfunc.h"
#include <pvmtev.h>
#include <pvmproto.h>
#include "tevmac.h"
#include "mppmsg.h"
#include "mppchunk.h"
#include "lmsg.h"
#include "pmsg.h"
#include "pvmmimd.h"
#include "global.h"
#include "lpvm.h"
#ifndef max
#define max(a,b)	((a)>(b)?(a):(b))
#endif
/***************
 **  Globals  **
 **           **
 ***************/

/* Indicate that we are able to receive directly into a users buffer */

extern struct msgid *pvm_inprecv;		/* from lpvm.c */
int pvmhostnode;

/***************
 **  Private  **
 **           **
 ***************/

static int pvmmynode;
static int pvmpartsize;
static int pvmmyndf;

static int pvmtidnmask = TIDNODE;
static int pvmtidpmask = TIDPTYPE;
static int pvmtidhmask = TIDHOST;
static int pvmmyptype;

static struct frag * pvm_frReady __ProtoGlarp__((
MSG_INFO_PTR, int, MSGFUNC_PTR, MPP_DIRECTI_PTR, int, int *, struct frag ** ));

static MPP_DIRECTI_PTR find_direct __ProtoGlarp__((MPP_DIRECTI_PTR, int, int ));
/* header of posted precvs */

static struct msgid *precvIds = (struct msgid *) NULL; 

static MSG_INFO_PTR nodefrags = (MSG_INFO_PTR) NULL; /* pre-posted recv bufs */
static MSG_INFO_PTR pvmdfrags = (MSG_INFO_PTR) NULL; /* pre-posted recv bufs */
static MPP_DIRECTI_PTR pvmddirect = (MPP_DIRECTI_PTR) NULL;
static MPP_DIRECTI_PTR peerdirect = (MPP_DIRECTI_PTR) NULL;

static struct ttpcb *peerpcbs = (struct ttpcb *) NULL;
static struct ttpcb *pvmdpcb = (struct ttpcb *) NULL;
static struct pmsg *rxpmsgs = (struct pmsg *) NULL;
static int inplaceDelay = 0;

struct pmsg *  midtobuf();
char * getenv();

/**************************
 **  Internal Functions  **
 **                      **
 **************************/


/* ------------ pvm_mppbeatask --------- */
/*	
*
*	Initialize libpvm, config process as a task.
*	This is called as the first step of each libpvm function so no
*	explicit initialization is required.
*
*	Returns 0 if okay, else error code.
*/

int
pvm_mpp_beatask(mytid, myptid, 
				outtid, outctx, outtag,
				trctid, trcctx, trctag,
				udpmtu, schedtid, topvmd)
int *mytid, *myptid; 
int *outtid, *outctx, *outtag;
int *trctid, *trcctx, *trctag;
int *udpmtu, *schedtid;
struct ttpcb **topvmd;
{
	char errtxt[64];
	char *p;
	char *s;

	int ac = 0;
	int cc;
	int hostid = 0;
	int i;
	int myhost;
	int mynode, mysetpart;
	int partsize, partid;
	int pvminfo[SIZEHINFO];		/* proto, hostpart, ptid, MTU, NDF */

	msgmid_t rmid;				/* msg ID returned by imsgrecv() */

	info_t minfo[MPPINFOSIZE];	/* info that might be returned by msgdone */
	
	MSGFUNC_PTR mfunc;

	if (pvmmytid != -1)			/* already configured */
		return 0;

	mfunc = pvm_hostmsgfunc();

	pvm_mpp_message_init(&mynode, &partsize, &pvmhostnode, &pvmmyptype);

	myhost = pvmhostnode;
	
	if (mynode < 0 || partsize < 0 || myhost < 0)
	{
		sprintf(errtxt, "mppbeatask(): bad init, node %d, part %d, host %d\n",
				mynode, partsize, myhost);
		pvmlogerror(errtxt);
		return PvmSysErr;
	} 	

	if (pvmdebmask & PDMPACKET) 
	{
		sprintf(errtxt," %d Posting receive for config message \n", mynode);
		pvmlogerror(errtxt);
	}

	/* Post receive for the configuration message */
	if ((*mfunc->imsgrecv)(hostid, MPPANY, PMTCONF, (char *) pvminfo,
			 sizeof(pvminfo), MPPANY, (int *) NULL, &rmid) < 0) 
	{
		pvmlogperror("beatask() recv pvminfo");
		return PvmSysErr;
	}

	if (pvmdebmask & PDMPACKET) {
		sprintf(errtxt," %d Receive posted for config message \n", mynode);
		pvmlogerror(errtxt);
	}

	if ((pvm_useruid = getuid()) == -1) {
		pvmlogerror("can't getuid()\n");
		return PvmSysErr;
	}

	pvmmyupid = getpid();

	/*
	*	initialize received-message list 
	*/

	rxpmsgs = TALLOC(1, struct pmsg, "pmsgs");
	BZERO((char*)rxpmsgs, sizeof(struct pmsg));
	rxpmsgs->m_link = rxpmsgs->m_rlink = rxpmsgs;

	if (pvmdebmask & PDMPACKET) {
		sprintf(errtxt," %d Waiting  for config message \n", mynode);
		pvmlogerror(errtxt);
	}
	while (!((*mfunc->msgdone)(0, &rmid, minfo)));

	if (pvmdebmask & PDMPACKET) {
		sprintf(errtxt," %d Got config message \n", mynode);
		pvmlogerror(errtxt);
	}

	if (pvminfo[0] != TDPROTOCOL) {
		sprintf(errtxt, "beatask() t-d protocol mismatch (%d/%d)\n",
			TDPROTOCOL, pvminfo[0]);
		pvmlogerror(errtxt);
		return PvmSysErr;
	}

	/* We now have the configuration message -- set up various parameters
		based on the message */

	pvmmynode = mynode;
	mysetpart = pvminfo[1];
	*myptid = pvminfo[2];
	*udpmtu = pvminfo[3];
	pvmmyndf = pvminfo[4];
	pvmpartsize = pvminfo[5];
	*outtid = pvminfo[6];
	*outtag = pvminfo[7];
	*outctx = pvminfo[8];
	*trctid = pvminfo[9];
	*trctag = pvminfo[10];
	*trcctx = pvminfo[11];
	*mytid = mysetpart + pvmmynode;
	pvmfrgsiz = pvmudpmtu;

	/* ---- set up the pre-allocated receive buffers ---- */

	pvmddirect = new_directstruct( NSBUFS, NRBUFS );
	peerdirect = new_vdirectstruct( pvmpartsize, NSBUFS, NRBUFS);

	pvmdfrags = init_recv_list(NSBUFS, PMTDBASE, MAXFRAGSIZE, 0, MPPANY, 
				pvm_hostmsgfunc());
	nodefrags = init_recv_list(NSBUFS, PMTPBASE, MAXFRAGSIZE, 0, MPPANY,
				pvm_nodemsgfunc()); 

	/* intialize the packet numbering for packets from daemon */
	fill_directstruct (pvmddirect, NRBUFS, pvm_tidtohost(pvmmytid), 
		0, PMTDBASE, 0, MPPANY);
	init_chunkostruct( pvmddirect->ordering, NSBUFS);

	/* intialize the packet numbering for packets from peers */
	for (i = 0; i < pvmpartsize; i ++)
	{
		fill_directstruct (peerdirect + i, NRBUFS, i, 
			0, PMTPBASE, 0, MPPANY);
		init_chunkostruct( (peerdirect+i)->ordering, NSBUFS);
	}

	/* ---- Create Task, pvmd  PCBs so that they can be found easily ----- */

	peerpcbs = TALLOC(pvmpartsize + 1, struct ttpcb, "pcbs");
	for (i = 0; i <= pvmpartsize; i ++)
	{
		BZERO((char *)(peerpcbs + i), sizeof(struct ttpcb));
		(peerpcbs + i) -> tt_tid = i;
		(peerpcbs + i) -> tt_state = TTOPEN;
		(peerpcbs + i) -> mpdirect = peerdirect + i;
	} 

	pvmdpcb = peerpcbs + pvmpartsize;	
	pvmdpcb -> mpdirect = pvmddirect;

	*topvmd = pvmdpcb;
	
	precvIds = msgid_new();
	precvIds->ms_link = precvIds->ms_rlink = precvIds;

	if (s = getenv("PVMINPLACEDELAY"))
	{
		inplaceDelay = atoi(s);
		pvmlogprintf("setting in place delay to %d \n", inplaceDelay);
	}
	else
#if !defined(IMA_PGON)
		inplaceDelay = 0;
#else
		inplaceDelay = 250;				/* uSec. Seems to work well */
#endif

	pvm_setopt(PvmRoute, PvmDontRoute); /* Deny direct routing */
	return 0;


	
}

/* ------ pvm_node_send ------- */
/* this routine sends a single array of data (a pvm fragment) to a process.
 * The semantics are such that the send is asynchronous with the send 
 * message id added to the sendmsg list.
 *
 * inputs:
 * 	cp 		- array of data to be sent
 *  len 	- length  of data
 *  ttpcbp	- pointer to the task control structure
 *  sendmsg - list of sendmsgs to add this one to (or merge with for PGONs)
 *	inPlaceHeader - if this is an inplace header, this is == cp
 *	inPlaceBodyLen - length of the inplace body that will follow this header
 *
 *	outputs:
 * 	  returns #bytes queued for sending 
 *	  sendmsg is initialized if *sendmsg == NULL, otherwise the message is 
 *				added to the list of outstanding send messages	
*/
int
pvm_node_send(cp, len, ttpcbp, smsglist, inPlaceHeader, inPlaceBodyLen)
	char *cp;				/* this is what we are supposed to be sending */
	int len;				/* this is the length of the frag */
	struct ttpcb * ttpcbp;	/* info about where this is going */
	struct msgid **smsglist; /* msgid for this send */
	char *inPlaceHeader;	/* This is an inplace header? */
	int inPlaceBodyLen;		/* Length of inplace fragment */
{
	int cc;
	int dtid;
	int mask;						/* used to mask different parts of tid */
	msgmid_t mid;
	int node;						/* destination node */
	int partid;						/* destination partition id */
	int tag;


	MSGFUNC_PTR	mfunc;	

#if defined(IMA_PGON)
	double dclock();
#endif
	static double lastClock;
	double delay;

	static int savelen = -1;/* save the length of inplace body for next call */
	
	MPP_DIRECTI_PTR tdirect;

	struct msgid *cmsgid = (struct msgid *) NULL;

	mask = TIDHOST;

	dtid = ttpcbp -> tt_tid;

	if (TIDISNODE(dtid) && (dtid & pvmtidhmask) == (pvmmytid & pvmtidhmask)
		&& (dtid & pvmtidpmask) == (pvmmytid & pvmtidpmask)) 
	{
		node = dtid & pvmtidnmask;
		partid = pvmmyptype;			/* send to node directly */
		mfunc = pvm_nodemsgfunc();		/* point to node-node routines */
	} 
	else 
	{
		node = pvmhostnode;
		partid = PVMDPTYPE;			/* send to pvmd first */
		mfunc = pvm_hostmsgfunc();		/* point to host-node routines */
	}

	
	tdirect = ttpcbp->mpdirect;

	if (inPlaceHeader)
	{
		savelen = inPlaceBodyLen;
#if defined(IMA_PGON)
		if (savelen > 4096 )
		{
			lastClock = dclock();	
			delay = (double) inplaceDelay / 1e6;
		}
#endif
	}

	if ( savelen >= 0  && inPlaceHeader == (char *) NULL) 
	{
		tag = pvmmynode;		/* send inplace body with my physical node # */
#if defined(IMA_PGON)
		if (savelen > 4096 )
			while ((dclock() - lastClock) < delay); 
#endif
		savelen = -1;		  	/* as the tag */
	}
	else
	{
		tag = tdirect->tagbase + tdirect->sseq;

		if (++(tdirect->sseq) >= tdirect->nbufs) 
				tdirect->sseq = 0;
	}

	{
		if ((cc = (*mfunc->imsgsend)(0,tag, cp, len, node, partid, &mid))
				&& (cc < 0)) 
		{
			pvmlogperror("node_send() IMSGSEND \n ");
			return PvmSysErr;
		}

		/* can't merge id's  add to the list */
		if (!(mfunc->msgmerge)  || !(*smsglist)) 
		{
			if ( (cmsgid = msgid_new()) == (struct msgid *) NULL)
			{
				pvmlogerror("node_send(): couldn't allocate send msgid\n");
				return PvmSysErr;
			}
			if (! (*smsglist))  	/* intialize the list */
			{
				*smsglist = cmsgid;
				cmsgid->ms_link = cmsgid->ms_rlink = cmsgid;
			}
			else				/* put at the end of the list of message id's */
				LISTPUTBEFORE(*smsglist, cmsgid, ms_link, ms_rlink);
		}
		else
		{
			cmsgid = *smsglist;
			cmsgid -> id = (*mfunc->msgmerge)(&(cmsgid->id), &mid);
		}

		if (cmsgid -> otid != node)
		{
			cmsgid -> id = mid;
			cmsgid -> otid = node;
			cmsgid -> complete = 0;
			cmsgid -> len += len;
			cmsgid -> mfunc = mfunc;
		}
	
	}
	return len;
}


/* ---------- pvm_node_mcast --------- */
/* This multicasts to tasks - it works takes an array of tids
 * and sends directly to tasks that are on the local partition using
 * pvm_send.
 * It returns a list of off-host tids and the number of off-host
 * tids. The off-host tids should then be sent to using pvm_mcast()
 * semantics.
*/
int
pvm_node_mcast(tids, count, code, offhtids, offhcnt)
int *tids; 	/* array of tids, on and off host */
int count;	/* #of tids */
int code;	/* msgtag tag */
int **offhtids; /* array of tids going off host, returns null if none */
int *offhcnt; /* # of tids going off host, returns 0 if none */
{

	int i;
	int mask = pvmtidhmask;		/* host */
	int nlocal = 0;
	int localidx = count;
	int nremote = 0;
	int *tmptids = (int *) NULL;
	*offhtids = (int *) NULL;
	*offhcnt = 0;	

	if (!tids || count < 0) 		/* no tids, or bogus count */
		return PvmOk;

	if (! (tmptids = TALLOC(count, int, "nmcast")))
	{
		pvmlogerror("node_mcast(): couldn't alloc memory\n");
		return PvmNoMem;
	}

	for (i = 0; i < count; i++)
	{
		if ( TIDISNODE(tids[i]) && (tids[i] & mask) == (pvmmytid & mask))
		{
			/* put the local tids starting at the back of the array */
			tmptids[--localidx] = tids[i];	/* tid local to us */
			nlocal ++;
		}
		else
		{
			/* put non-local tids at the front of the array */
			tmptids[nremote++] = tids[i];
		}

	}

	/* multicast to the local tasks */
	for (i = 0; i < nlocal; i++)
		pvm_send(tmptids[localidx++], code);      /* loop of sends for now */

	if (nlocal == count)
	{
		PVM_FREE(tmptids);
		tmptids = (int *) NULL;
	}
	else
	{
		*offhtids = tmptids;
		*offhcnt = nremote;	
	}
	
	return PvmOk;	
}


	
	


int
mpp_pvmendtask()
{

	if (pvmmytid != -1) 
	{
		pvmmytid = -1;

		pvm_mpp_message_stop();
	}
	
	return 0;
}


/* mpp-specific stuff on precv */
int
pvm_mppprecv(tid, tag, cp ,len, dt, rtid, rtag, rlen)
	int tid;
	int tag;
	void *cp;
	int len;
	int dt;
	int *rtid;
	int *rtag;
	int *rlen;
{
	char errtxt[64];

	int rbf;
	int cc = 0;
	int l;
	int x;
	long ad;

	static int first = 1;
	static struct msgid * thisprecv;

	/* allocate a new message id for this precv */

	if (first)
	{
		if ( !(thisprecv = msgid_new()))
		{
			pvmlogerror("mppprecv(): can't get memory \n");
			return PvmNoMem;
		}
		first = 0;
	}
 	

	thisprecv -> otid =  tid;
	thisprecv -> tag = tag;
	thisprecv -> ctxt = pvm_getcontext();
	thisprecv -> ubuf = cp;
	thisprecv -> len = len; 
	thisprecv -> complete = 0;

	pvm_inprecv = thisprecv;
	
	if (!cc) {
	
		cc = pvm_recv(tid, tag);

		if (cc >= 0) {

			if (!(thisprecv->complete)) /* receive went into pvm buffers */
			{		
				pvm_bufinfo(cc, &l, rtag, rtid);
				if 	(rlen)
					*rlen = l;
				if (l < len)
					len = l;
				if (pvmdebmask & PDMMESSAGE) {
					sprintf(errtxt, "precv() unpacking message len = %d\n", l);
					pvmlogerror(errtxt);
				}
				pvm_upkbyte((char *)cp, len, 1);
			}
			else /* receive went straight into user memory */
			{
				if(rlen)
					*rlen = thisprecv -> len;
				if (pvmdebmask & PDMMESSAGE) {
					sprintf(errtxt, "precv() short ckted into user buf len = %d\n", l);
					pvmlogerror(errtxt);
				}
			}
			cc = 0;
		}
	}

	pvm_inprecv = (struct msgid *) NULL;

	if (cc < 0)
		lpvmerr("pvm_mppprecv", cc);
	return cc;
}

/* ---------- pvm_mpppsend ------------- */
/* mpp_psend does short cuts on buffer management, 
	o This is a reasonable hack that greatly decreases (overhead)
	  latency.
	o It smacks the message structure directly. 
*/
int 
pvm_mpppsend(cp, len, tid, tag)
char *cp;
int len;
int tid;
int tag;
{

	static char nullusrmsg;
	static int first = 1;
	static int psbuf;
	static struct frag *dfrag;
	static struct pmsg *psmsg;
	static struct timeval ztv = { 0, 0 };
	int savebuf;
	int cc;

	/* First call to psend will create a regular pvm buffer for packing.
	 *	o	A new message is created as a by-product of pvm_mkbuf.
	 * 	o	A single (inplace) data buffer is created and put in the frag chain 
	 *	o	Later calls will simply twiddle the message buffer and the frag
	 */
	if (first)
	{
		psbuf = pvm_mkbuf(PvmDataInPlace); /* Make an inplace data buffer */
	    savebuf = pvm_setsbuf(psbuf);
		pvm_pkint(&cc, 1, 1);			/* create a single frag for msg */ 
		pvm_setsbuf(savebuf);
		psmsg = midtobuf(psbuf);		/* save a pointer to the msg struct */
		dfrag = psmsg->m_frag->fr_link;
		first = 0;
	}

	psmsg->m_ctx = pvmmyctx;			/* set the context */

	if (len)
	{
		dfrag->fr_buf = dfrag->fr_dat = cp;
		dfrag->fr_max = dfrag->fr_len = len;
	}
	else
	{
		dfrag->fr_buf = dfrag->fr_dat = &nullusrmsg;
		dfrag->fr_max = dfrag->fr_len = len;
	}

	cc = mroute(psbuf, tid, tag, &ztv); /* route the message */ 
	
	return cc;
}

	

	



/* ----------- pvm_readfrompvmd ----------- */
struct frag *
pvm_readfrompvmd()
{
	static int hostbuf = 0; 				/* buffer we are working on */

	static CHUNK_PTR readyFrags = (CHUNK_PTR) NULL;

	int mxbufs = NRBUFS;

	MSGFUNC_PTR mfunc;

	mfunc = pvm_hostmsgfunc();

	/* check to see if any fragments are ready to be processed */

	return  (struct frag *) pvm_chunkReady( pvmdfrags, mxbufs, mfunc, 
		pvmddirect, 1, &hostbuf, &readyFrags);   
}

/* ----------- pvm_readfrompeer ----------- */
struct frag *
pvm_readfrompeer()
{
	static int nodebuf = 0; 				/* buffer we are working on */

	static CHUNK_PTR readyFrags = (CHUNK_PTR) NULL;

	int mxbufs = NRBUFS;

	MSGFUNC_PTR mfunc;

	mfunc = pvm_nodemsgfunc();

	/* check to see if any fragments are ready to be processed */

	return  (struct frag *) pvm_chunkReady( nodefrags, mxbufs, mfunc, 
			peerdirect, pvmpartsize, &nodebuf, &readyFrags);   
}


/* ------- init_precvMsgs ----- */
struct msgid *
init_precvMsgs()
{
	return msgid_new();
}

/* ---------- mpp_ttpcb_find() ------- */
struct ttpcb *
mpp_ttpcb_find(dtid) 
int dtid;
{
	int i;

	if (TIDISNODE(dtid) && (dtid & pvmtidhmask) == (pvmmytid & pvmtidhmask)
		&& (dtid & pvmtidpmask) == (pvmmytid & pvmtidpmask)) 
	{
		i = dtid & pvmtidnmask;
		(peerpcbs + i)->tt_tid = dtid;
		return peerpcbs + i;
	}
	else
		return (struct ttpcb *) NULL;
}


/* ---------- pvm_mpp_pmsgs() ------- */
struct pmsg *
pvm_mpp_pmsgs()
{
	return rxpmsgs;
}

/* ----------- find_direct ---------- */
/* this is a hack to find the correct ordering structure for
	a node. 
*/
MPP_DIRECTI_PTR 
pvm_find_direct (dlist, nstruct, node)
MPP_DIRECTI_PTR dlist;
int nstruct;
int node;
{
	node = node & pvmtidnmask;  /* make sure this is a node */

	if (dlist == pvmddirect)
		return pvmddirect;
	else 
	{
		return dlist + node;
	}	
}
		 
struct msgid *
pvm_mpp_get_precvids()
{
    	return pvm_inprecv; 
}



syntax highlighted by Code2HTML, v. 0.9.1