static char rcsid[] =
"$Id: pvmdmimd.c,v 1.14 2002/02/21 23:19:41 pvmsrc Exp $";
/*
* PVM version 3.4: Parallel Virtual Machine System
* University of Tennessee, Knoxville TN.
* Oak Ridge National Laboratory, Oak Ridge TN.
* Emory University, Atlanta GA.
* Authors: J. J. Dongarra, G. E. Fagg, M. Fischer
* G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
* P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
* (C) 1997 All Rights Reserved
*
* NOTICE
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby granted
* provided that the above copyright notice appear in all copies and
* that both the copyright notice and this permission notice appear in
* supporting documentation.
*
* Neither the Institutions (Emory University, Oak Ridge National
* Laboratory, and University of Tennessee) nor the Authors make any
* representations about the suitability of this software for any
* purpose. This software is provided ``as is'' without express or
* implied warranty.
*
* PVM version 3 was funded in part by the U.S. Department of Energy,
* the National Science Foundation and the State of Tennessee.
*/
/*
* pvmdmimd.c
*
* MPP interface.
*
* void mpp_init(int argc, char **argv):
* Initialization. Create a table to keep track of active nodes.
* argc, argv: passed from main.
*
* int mpp_load( struct waitc_spawn *wxp )
*
* Load executable onto nodes; create new entries in task table,
* encode node number and process type into task IDs, etc.
*
* Construction of Task ID:
*
* 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0
* +-+-+-----------------------+-+-----+--------------------------+
* |s|g| host index |n| prt | node # (16384) |
* +-+-+-----------------------+-+-----+--------------------------+
*
* The "n" bit is set for node task but clear for host task.
*
* flags: exec options;
* name: executable to be loaded;
* argv: command line argument for executable
* count: number of tasks to be created;
* tids: array to store new task IDs;
* ptid: parent task ID.
*
* mpp_new(int count, int ptid):
* Allocate a set of nodes. (called by mpp_load())
* count: number of nodes; ptid: parent task ID.
*
* int mpp_output():
* Send all pending packets to nodes via native send. Node number
* and process type are extracted from task ID.
*
* int mpp_mcast(int src, struct pkt pp, int tids[], int ntask):
* Global send.
* src: source task ID;
* pp: packet;
* tids: list of destination task IDs;
* ntask: how many.
*
* int mpp_probe():
* Probe for pending packets from nodes (non-blocking). Returns
* 1 if packets are dectected, otherwise 0.
*
* void mpp_input():
* Receive pending packets (from nodes) via native recv.
*
* struct task *mpp_find(int pid):
* Find a task in task table by its Unix pid.
*
* void mpp_free(struct task *tp):
* Remove node/process-type from active list.
* tp: task pointer.
*
* $Log: pvmdmimd.c,v $
* Revision 1.14 2002/02/21 23:19:41 pvmsrc
* Added new (not to be documented!) PVM_MAX_TASKS env var support.
* - for Mahir Lokvancic <mahir@math.ufl.edu>.
* - forcefully limits the number of tasks that can attach to a
* pvmd, required on Solaris in rare circumstances when hard
* FD_SETSIZE limit is reached, and all hell breaks loose...
* - check return for task_new() call, can now produce NULL ptr,
* indicating PvmOutOfRes...
* (Spanker=kohl)
*
* Revision 1.13 2001/09/25 21:19:28 pvmsrc
* Minor TMPNAMFUN()/tmpnam() cleanup.
* - moved macro def to pvm3.h, renamed PVMTNPMAN().
* - same for LEN_OF_TMP_NAM -> PVMTMPNAMLEN.
* - mostly a huge waste of time, since *both* tmpnam() & mktemp()
* produce the same "dangerous" warning message in Linux/gcc...
* - damn.
* (Spanker=kohl)
*
* Revision 1.12 2000/02/17 21:10:44 pvmsrc
* Cleaned up comments... mpp_load() args...
* (Spanker=kohl)
*
* Revision 1.11 1999/07/08 19:00:28 kohl
* Fixed "Log" keyword placement.
* - indent with " * " for new CVS.
*
* Revision 1.10 1997/10/01 15:36:06 pvmsrc
* Removed unnecessary #include "fromlib.h" header.
* - all consts now included in pvm3.h...
* - header file eliminated.
* (Spanker=kohl)
*
* Revision 1.9 1997/08/27 20:18:58 pvmsrc
* Added blank args (0,0,0) in mpp_load() to make it call forkexec correctly.
* Protocol needs to be changed to allow taskers to access IBM poe directly.
* (Spanker=fagg)
*
* Revision 1.8 1997/07/09 13:54:58 pvmsrc
* Fixed Author Header.
*
* Revision 1.7 1997/06/02 13:48:38 pvmsrc
* Moved #include host.h above #include waitc.h.
* Removed old mesg.h include... gone baby.
*
* Revision 1.6 1997/05/06 20:14:36 pvmsrc
* Catch stdout/stderr correctly and redirect to outtid,tag,ctx
*
* Revision 1.5 1997/05/05 20:08:56 pvmsrc
* Pass outctx and trcctx to mpp tasks.
*
* Revision 1.4 1997/05/02 13:52:50 pvmsrc
* Start up MPI jobs correctly and get them configured.
*
* Revision 1.3 1997/03/25 15:52:21 pvmsrc
* PVM patches from the base 3.3.10 to 3.3.11 versions where applicable.
* Originals by Bob Manchek. Altered by Graham Fagg where required.
* -IP enabled over switch
* -RMPOOL env can be used instead of host list
*
* Revision 1.2 1997/01/28 19:30:57 pvmsrc
* New Copyright Notice & Authors.
*
* Revision 1.1 1996/09/23 23:15:09 pvmsrc
* Initial revision
*
* Revision 1.3 1996/05/14 14:35:59 manchek
* inc'd changes from chulho@kgn.ibm.com
*
* Revision 1.2 1995/07/25 17:41:27 manchek
* mpp_output returns int
*
* Revision 1.1 1995/05/30 17:23:56 manchek
* Initial revision
*
* Revision 1.3 1994/06/03 20:54:24 manchek
* version 3.3.0
*
* Revision 1.2 1993/12/20 15:39:47 manchek
* patch 6 from wcj
*
* Revision 1.1 1993/08/30 23:35:09 manchek
* Initial revision
*
*/
#include <stdlib.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifdef SYSVSTR
#include <string.h>
#define CINDEX(s,c) strchr(s,c)
#else
#include <strings.h>
#define CINDEX(s,c) index(s,c)
#endif
#include <pvm3.h>
#include <pvmproto.h>
#include "global.h"
#include "host.h"
#include "waitc.h"
#include "pvmalloc.h"
#include "pkt.h"
#include "task.h"
#include "listmac.h"
#include "pvmdmp.h"
#include "pvmmimd.h"
#include "bfunc.h"
#define MPICOMM "/usr/bin/poe"
#define MPIOPT1 "-procs"
#define MPIOPT2 "-euilib"
#define MPIOPT3 "-hfile"
#define MPIOPT4 "-rmpool"
#define MPIOPARG2 "us" /* options: -procs # -euilib us */
#define MPIOPARG3 "ip" /* Enable IP over switch */
#define MPIARGC 7 /* number of command line arguments */
char *getenv();
/* Global */
extern int pvmdebmask; /* from pvmd.c */
extern char **epaths; /* from pvmd.c */
extern int myhostpart; /* from pvmd.c */
extern int tidhmask; /* from pvmd.c */
extern int ourudpmtu; /* from pvmd.c */
extern struct htab *hosts; /* from pvmd.c */
extern struct task *locltasks; /* from task.c */
int tidtmask = TIDPTYPE; /* mask for ptype field of tids */
int tidnmask = TIDNODE; /* mask for node field of tids */
/* private */
static int myndf = 0;
static struct nodeset *busynodes; /* active nodes; ordered by proc type */
static char pvmtxt[512]; /* scratch for error log */
static int ptypemask; /* mask; we use these bits of ptype in tids */
static char nodefile[PVMTMPNAMLEN]; /* tmp node file */
static char **nodelist = 0; /* default poe node list */
static int partsize = 0; /* number of nodes allocated */
static int hostfileused = TRUE; /* Check if MP_HOSTFILE used */
static char defaultpool[64]="1"; /* default MP_POOL if not set */
static char mpiadapter[]={MPIOPARG2}; /* default User-Space */
static int sp2pvminfo[SIZEHINFO];
void
mpp_init(argc, argv)
int *argc;
char **argv;
{
struct stat sb;
char *hfn; /* host file name */
char nname[128]; /* node name */
FILE *hfp;
int i;
if ((hfn = getenv("LOADLBATCH"))) {
if (strcmp(hfn, "yes") == 0) {
if ((hfn = getenv("LOADL_PROCESSOR_LIST"))) {
sprintf(pvmtxt,
"LOADL_PROCESSOR_LIST=%s.\n",hfn);
pvmlogperror(pvmtxt);
hostfileused=FALSE;
for (hfn;*hfn!='\0';hfn++) {
if (*hfn == ' ')
partsize++;
}
} else {
sprintf(pvmtxt,
"LOADL_PROCESSOR_LIST=%s - is not set\n",hfn);
pvmlogperror(pvmtxt);
exit(PvmHostFail);
}
} else {
sprintf(pvmtxt,"LOADLBATCH=%s - not set to yes\n",hfn);
pvmlogperror(pvmtxt);
exit(PvmHostFail);
}
} else if ((hfn = getenv("MP_PROCS"))) {
if ((partsize = atoi(hfn)) < 1) {
sprintf(pvmtxt,"MP_PROCS=%d must be >= to 1\n",partsize);
pvmlogperror(pvmtxt);
exit(PvmHostFail);
}
hostfileused=FALSE;
if ((hfn = getenv("MP_RMPOOL"))) {
if (atoi(hfn) < 0) {
sprintf(pvmtxt,
"MP_RMPOOL=%d must be >= to 0\n",defaultpool);
pvmlogperror(pvmtxt);
exit(PvmHostFail);
}
strcpy(defaultpool,hfn);
}
} else if ((hfn = getenv("MP_HOSTFILE")))
{
if (!(hfp = fopen(hfn, "r"))) {
sprintf(pvmtxt, "sp2hostfile() fopen %s\n", hfn);
pvmlogperror(pvmtxt);
exit(PvmHostFail);
}
hostfileused=TRUE;
while (fscanf(hfp, "%s", nname) != EOF)
partsize++;
nodelist = TALLOC(partsize, char*, "nname");
rewind(hfp);
for (i = 0; i < partsize; i++) {
fscanf(hfp, "%s", nname);
nodelist[i] = STRALLOC(nname);
}
fclose(hfp);
} else {
pvmlogerror("mpp_init() no POE host file.\n");
pvmlogerror("mpp_init() MP_PROCS, MP_RMPOOL or MP_HOSTFILE must be set.\n");
exit(PvmHostFail);
}
if ((hfn = getenv("MP_EUILIB"))) {
if (strcmp(hfn, "ip") == 0) {
strcpy(mpiadapter,MPIOPARG3); /* IP over switch */
}
}
sprintf(pvmtxt, "%d nodes allocated.\n", partsize);
pvmlogerror(pvmtxt);
busynodes = TALLOC(1, struct nodeset, "nsets");
BZERO((char*)busynodes, sizeof(struct nodeset));
busynodes->n_link = busynodes;
busynodes->n_rlink = busynodes;
ptypemask = tidtmask >> (ffs(tidtmask) - 1);
}
/* create tmp poe host file from default */
char *
sp2hostfile(first, count)
int first; /* first node in the set */
int count; /* number of nodes requested */
{
int i;
FILE *tmpfp;
if (partsize < count) {
sprintf(pvmtxt, "sp2hostfile() need at least %d nodes\n", count+1);
pvmlogperror(pvmtxt);
return (char *)0;
}
(void)PVMTMPNAMFUN(nodefile);
if (!(tmpfp = fopen(nodefile, "w"))) {
sprintf(pvmtxt, "sp2hostfile() fopen %s", nodefile);
pvmlogperror(pvmtxt);
return (char *)0;
}
if (pvmdebmask & PDMNODE) {
sprintf(pvmtxt, "sp2hostfile() POE host file: %s\n", nodefile);
pvmlogerror(pvmtxt);
}
for (i = first; i < count + first; i++)
fprintf(tmpfp, "%s\n", nodelist[i]);
fclose(tmpfp);
return nodefile;
}
/*
* find a set of free nodes from nodelist; assign ptype sequentially,
* only tasks spawned together get the same ptype
*/
struct nodeset *
mpp_new(count, ptid)
int count; /* number of nodes requested */
int ptid; /* parent's tid */
{
struct nodeset *sp, *newp, *sp2;
int last = -1;
int ptype = 0;
if (!(newp = TALLOC(1, struct nodeset, "nsets"))) {
pvmlogerror("mpp_new() can't get memory\n");
pvmbailout(0);
}
BZERO((char*)newp, sizeof(struct nodeset));
newp->n_size = count;
for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) {
if (sp->n_first - last > count)
break;
last = sp->n_first + sp->n_size - 1;
/*
if (sp->n_link == busynodes && partsize - last > count)
break;
*/
if (ptype <= sp->n_ptype)
ptype = sp->n_ptype + 1;
}
if (sp == busynodes && partsize - last <= count) {
pvmlogerror("mpp_new() not enough nodes in partition\n");
PVM_FREE(newp);
return (struct nodeset *)0;
}
for (sp2 = busynodes->n_link; sp2 != busynodes; sp2 = sp2->n_link)
if ((sp2->n_ptype & ptypemask) == (ptype & ptypemask))
break;
if (sp2 != busynodes || ptype == NPARTITIONS) {
for (ptype = 0; ptype < NPARTITIONS; ptype++) {
for (sp2 = busynodes->n_link; sp2 != busynodes; sp2 = sp2->n_link)
if ((sp2->n_ptype & ptypemask) == (ptype & ptypemask))
break;
if (sp2 == busynodes)
break;
}
if (ptype == NPARTITIONS) {
pvmlogerror("mpp_new() out of ptypes: too many spawns\n");
return (struct nodeset *)0;
}
}
done:
if (pvmdebmask & PDMNODE) {
sprintf(pvmtxt, "mpp_new() %d nodes %d ... ptype=%d ptid=%x\n",
count, last+1, ptype, ptid);
pvmlogerror(pvmtxt);
}
newp->n_first = last + 1;
if (hostfileused) {
if (!sp2hostfile(newp->n_first, count)) {
PVM_FREE(newp);
return (struct nodeset *)0;
}
}
newp->n_ptype = ptype;
newp->n_ptid = ptid;
newp->n_alive = count - 1;
LISTPUTBEFORE(sp, newp, n_link, n_rlink);
return newp;
}
/*
* remove node/ptype from active list; if tid is the last to go, shutdown
* pvmhost's socket, but do not destroy the node set because pvmhost may
* not exit immediately. To avoid a race condition, let mpp_output()
* do the cleanup.
*/
void
mpp_free(tp)
struct task *tp;
{
struct nodeset *sp;
int ptype;
struct task *tp2;
int tid = tp->t_tid;
if (!TIDISNODE(tid))
return;
ptype = TIDTOTYPE(tid);
tp->t_txq = 0; /* don't free pvmhost's txq */
for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) {
if ((sp->n_ptype & ptypemask) == ptype) {
if (pvmdebmask & PDMNODE) {
sprintf(pvmtxt, "mpp_free() t%x type=%ld alive=%d\n",
tid, sp->n_ptype, sp->n_alive);
pvmlogerror(pvmtxt);
}
if (--sp->n_alive == 0) {
if (tp2 = task_find(sp->n_ptid)) {
tp2->t_flag |= TF_CLOSE;
if (tp2->t_sock != -1) {
/*
wrk_fds_delete(tp2->t_sock, 3);
(void)close(tp2->t_sock);
tp2->t_sock = -1;
*/
shutdown(tp2->t_sock, 1);
/* close stdout after pvmhost dies */
tp2->t_out = tp->t_out;
}
}
/*
LISTDELETE(sp, n_link, n_rlink);
PVM_FREE(sp);
*/
}
tp->t_out = -1; /* don't free shared stdout if alive > 0 */
return;
}
}
sprintf(pvmtxt, "mpp_free() t%x not active\n", tid);
pvmlogerror(pvmtxt);
return;
}
/* load executable onto the given set of nodes */
int
mpp_load( wxp )
struct waitc_spawn *wxp;
{
int flags = 0; /* exec options */
char *name; /* executable */
char **argv; /* arg list (argv[-1] must be there) */
int count; /* how many */
int *tids; /* array to store new tids */
int ptid; /* parent task ID */
int nenv; /* length of environment */
char **envp; /* environment strings */
int ptypepart; /* type field */
int j;
struct task *tp;
struct pkt *hosttxq; /* out-going queue of pvmhost */
int err = 0;
struct nodeset *sp;
char c[128]; /* buffer to store count, name.host */
char **ar = 0;
int nargs;
char **ep, **eplist;
char path[MAXPATHLEN];
struct stat sb;
char **av;
int hostout; /* stdout of pvmhost */
struct hostd *hp = hosts->ht_hosts[hosts->ht_local];
int hostpid; /* Unix pid of pvmhost */
char htid[128]; /* buffer to store pvmhost tid */
char *p;
static char *nullep[] = { "", 0 };
/* -- initialize some variables from the waitc_spawn struct -- */
name = wxp->w_argv[0];
argv = wxp->w_argv;
count = wxp->w_veclen;
tids = wxp->w_vec;
ptid = wxp->w_ptid;
nenv = wxp->w_nenv;
envp = wxp->w_env;
eplist = CINDEX(name, '/') ? nullep : epaths;
for (ep = eplist; *ep; ep++) {
/* search for file */
(void)strcpy(path, *ep);
if (path[0])
(void)strcat(path, "/");
(void)strncat(path, name, sizeof(path) - strlen(path) - 1);
if (stat(path, &sb) == -1
|| ((sb.st_mode & S_IFMT) != S_IFREG)
|| !(sb.st_mode & S_IEXEC)) {
if (pvmdebmask & PDMTASK) {
sprintf(pvmtxt, "mpp_load() stat failed <%s>\n", path);
pvmlogerror(pvmtxt);
}
continue;
}
if (!(sp = mpp_new(count+1, ptid))) {
err = PvmOutOfRes;
goto done;
}
ptypepart = (sp->n_ptype << (ffs(tidtmask) - 1)) | TIDONNODE;
if (argv)
for (nargs = 0; argv[nargs]; nargs++);
else
nargs = 0;
/* ar[-1], poe, -procs, #, -euilib, us, -hfile fname */
nargs += MPIARGC + 1;
av = TALLOC(nargs + 1, char*, "argv");
av++; /* reserve room for debugger */
BZERO((char*)av, nargs * sizeof(char*));
av[0] = MPICOMM;
av[1] = path;
av[--nargs] = 0;
if (hostfileused) {
av[--nargs] = nodefile;
av[--nargs] = MPIOPT3;
} else {
av[--nargs] = defaultpool;
av[--nargs] = MPIOPT4;
}
av[--nargs] = mpiadapter;
av[--nargs] = MPIOPT2;
sprintf(c, "%d", count+1);
av[--nargs] = c;
av[--nargs] = MPIOPT1;
for (j = 2; j < nargs; j++)
av[j] = argv[j - 1]; /* poe name argv -procs # -euilib us */
/*
if ((sock = mksock()) == -1) {
err = PvmSysErr;
goto done;
}
*/
if (flags & PvmTaskDebug)
av++; /* pdbx name -procs # -euilib us */
/* if (err = forkexec(flags, av[0], av, 0, (char **)0, &tp))
*/
if (err = forkexec(flags, av[0], av, 0, (char **)0, 0,
0, 0, &tp))
goto done;
tp->t_ptid = ptid;
PVM_FREE(tp->t_a_out);
sprintf(c, "%s.host", name);
tp->t_a_out = STRALLOC(c);
sp->n_ptid = tp->t_tid; /* pvmhost's tid */
hosttxq = tp->t_txq;
hostout = tp->t_out;
hostpid = tp->t_pid;
tp->t_out = -1;
sprintf(htid, "PVMHTID=%d", tp->t_tid);
p = STRALLOC(htid);
pvmputenv(p);
sp2pvminfo[0] = TDPROTOCOL;
sp2pvminfo[1] = myhostpart + ptypepart;
sp2pvminfo[2] = ptid;
sp2pvminfo[3] = MAXFRAGSIZE;
sp2pvminfo[4] = myndf;
sp2pvminfo[5] = partsize;
sp2pvminfo[6] = wxp->w_outtid;
sp2pvminfo[7] = wxp->w_outtag;
sp2pvminfo[8] = wxp->w_outctx;
sp2pvminfo[9] = wxp->w_trctid;
sp2pvminfo[10] = wxp->w_trctag;
sp2pvminfo[11] = wxp->w_trcctx;
/*
if (sockconn(sock, tp, pvminfo) == -1) {
err = PvmSysErr;
task_free(tp);
goto done;
}
*/
/* XXX task may not be on same host; can't do auth with tmp file */
tp->t_flag |= TF_CONN;
if (pvmdebmask & PDMTASK) {
sprintf(pvmtxt, "mpp_load() %d type=%d ptid=%x t%x...\n",
count, sp->n_ptype, ptid, myhostpart + ptypepart);
pvmlogerror(pvmtxt);
}
/* create new task structs */
for (j = 0; j < count; j++) {
if ((tp = task_new(myhostpart + ptypepart + j)) == NULL) {
err = PvmOutOfRes;
goto done;
}
tp->t_a_out = STRALLOC(name);
tp->t_ptid = ptid;
tp->t_flag |= TF_CONN; /* no need for the auth crap */
tids[j] = tp->t_tid;
PVM_FREE(tp->t_txq);
tp->t_txq = hosttxq; /* node tasks share pvmhost's txq */
tp->t_out = hostout; /* and stdout */
tp->t_pid = hostpid; /* pvm_kill should kill pvmhost */
tp->t_outtid = wxp->w_outtid; /* catch stdout/stderr */
tp->t_outtag = wxp->w_outtag;
tp->t_outctx = wxp->w_outctx;
}
return 0;
}
if (pvmdebmask & PDMTASK) {
sprintf(pvmtxt, "mpp_load() didn't find <%s>\n", name);
pvmlogerror(pvmtxt);
}
err = PvmNoFile;
done:
for (j = 0; j < count; j++)
tids[j] = err;
return err;
}
#if 0
/* kill poe process */
void
sp2kill(a_out, node)
char *a_out;
int node;
{
FILE *fp;
int i;
char nname[128]; /* node name */
char comm[512]; /* command to issue */
char *hfn; /* host file name */
struct stat sb;
char *av[8]; /* for rsh args */
int ac;
int pid = -1; /* pid of rsh */
node++;
if ((hfn = getenv("MP_HOSTFILE")) || stat(hfn = "host.list", &sb) != -1) {
if (fp = fopen(hfn, "r")) {
for (i = 0; i < node; i++)
fscanf(fp, "%s", nname);
fclose(hfn);
if ((pid = fork()) == -1) {
pvmlogperror("sp2kill() fork");
return;
}
if (!pid) {
ac = 0;
av[ac++] = "/usr/bin/rsh";
av[ac++] = nname;
av[ac++] = "poekill";
av[ac++] = a_out;
av[ac++] = 0;
for (i = getdtablesize(); --i > 2; )
(void)close(i);
execvp(av[0], av);
_exit(1);
}
} else
pvmlogperror("sp2kill() fopen");
return;
}
pvmlogerror("sp2kill() no host file");
}
#endif /*0*/
void
mpp_kill(tp, signum)
struct task *tp;
int signum;
{
if (TIDISNODE(tp->t_tid)) {
if (signum == SIGTERM || signum == SIGKILL) {
/* sp2kill(tp->t_a_out, tp->t_tid & tidnmask); */
(void)kill(tp->t_pid, signum);
mpp_free(tp);
task_cleanup(tp);
task_free(tp);
} else {
sprintf(pvmtxt,"mpp_kill() signal %d to node t%x ignored\n",
signum, tp->t_tid);
pvmlogerror(pvmtxt);
}
} else
(void)kill(tp->t_pid, signum);
}
/*
* Add pvmhost's socket to wfds if there are packets waiting to
* be sent to a related node task. Node tasks have no sockets;
* they share pvmhost's packet queue (txq). Pvmhost simply
* forwards any packets it receives to the appropriate node.
*/
int
mpp_output(dummy1, dummy2)
struct task *dummy1;
struct pkt *dummy2;
{
struct nodeset *sp, *sp2;
struct task *tp;
int ptype;
for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link)
if ((tp = task_find(sp->n_ptid))) {
if (tp->t_txq->pk_link->pk_buf && tp->t_sock != -1)
wrk_fds_add(tp->t_sock, 2);
} else {
if (sp->n_alive) {
sprintf(pvmtxt, "mpp_output() pvmhost %d died!\n", sp->n_ptype);
pvmlogerror(pvmtxt);
/* clean up tasks it serves */
ptype = sp->n_ptype & ptypemask;
for (tp = locltasks->t_link; tp != locltasks; tp = tp->t_link)
if (TIDISNODE(tp->t_tid) && TIDTOTYPE(tp->t_tid) == ptype) {
tp->t_txq = 0;
tp = tp->t_rlink;
task_cleanup(tp->t_link);
task_free(tp->t_link);
}
}
/* pvmhost has died, destroy the node set */
sp2 = sp;
sp = sp->n_rlink;
LISTDELETE(sp2, n_link, n_rlink);
PVM_FREE(sp2);
}
return 0;
}
/* replace tm_connect and tm_conn2 */
int
mpp_conn(tp, tp2)
struct task *tp;
struct task *tp2;
{
struct nodeset *sp;
int pvminfo[SIZEHINFO]; /* host info */
int ptypepart; /* type field */
if (pvmdebmask & PDMNODE) {
sprintf(pvmtxt, "mpp_conn() pvmhost %x", tp2->t_tid);
pvmlogerror(pvmtxt);
}
tp2->t_sock = tp->t_sock;
tp2->t_sad = tp->t_sad;
tp2->t_salen = tp->t_salen;
tp2->t_flag |= TF_CONN;
tp->t_sock = -1;
for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link)
if (sp->n_ptid == tp2->t_tid)
break;
if (sp == busynodes) {
pvmlogerror("mpp_conn() task is not pvmhost\n");
return -1;
}
ptypepart = (sp->n_ptype << (ffs(tidtmask) - 1)) | TIDONNODE;
if (write(tp2->t_sock, sp2pvminfo, sizeof(sp2pvminfo))
!= sizeof(sp2pvminfo)) {
pvmlogperror("mpp_conn() write");
return -1;
}
return 0;
}
#if 0
/*
* Create socket to talk to pvmhost.
* Return socket descriptor if successful, -1 otherwise.
*/
int
mksock()
{
struct hostd *hp = hosts->ht_hosts[hosts->ht_local];
struct sockaddr_in sin;
char buf[128];
char *p;
int cc;
int sock;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
pvmlogperror("mksock() socket");
return -1;
}
sin = hp->hd_sad;
if (bind(sock, (struct sockaddr*)&sin, sizeof(sin)) == -1) {
pvmlogperror("mksock() bind");
(void)close(sock);
return -1;
}
cc = sizeof(sin);
if (getsockname(sock, (struct sockaddr*)&sin, &cc) == -1) {
pvmlogperror("mksock() getsockname");
(void)close(sock);
return -1;
}
if (listen(sock, 1) == -1) {
pvmlogperror("mksock() listen");
(void)close(sock);
return -1;
}
p = inadport_hex(&sin);
sprintf(buf, "PVMSOCK=%s", p);
p = STRALLOC(buf);
pvmputenv(p);
return sock;
}
/*
* Wait for connect request from pvmhost and establish connection.
* Return 0 if successful, -1 otherwise.
* Close listening socket.
*/
int
sockconn(sock, tp, hinfo)
int sock; /* listening post */
struct task *tp; /* pvm host */
int hinfo[]; /* host info to pass along */
{
int i;
if ((tp->t_sock = accept(sock, (struct sockaddr*)&tp->t_sad,
&tp->t_salen)) == -1) {
pvmlogperror("sockconn() accept");
return -1;
} else {
if (pvmdebmask & (PDMPACKET|PDMTASK)) {
sprintf(pvmtxt, "sockconn() accept from %s sock %d\n",
inadport_decimal(&tp->t_sad), tp->t_sock);
pvmlogerror(pvmtxt);
}
close(sock);
#ifndef NOSOCKOPT
i = 1;
if (setsockopt(tp->t_sock, IPPROTO_TCP, TCP_NODELAY,
(char*)&i, sizeof(int)) == -1)
pvmlogperror("sockconn() setsockopt");
#endif
}
if (write(tp->t_sock, hinfo, SIZEHINFO*sizeof(int))
!= SIZEHINFO*sizeof(int)) {
pvmlogperror("sockconn: write");
return -1;
}
if ((i = fcntl(tp->t_sock, F_GETFL, 0)) == -1)
pvmlogperror("sockconn: fcntl");
else {
i |= O_NDELAY;
(void)fcntl(tp->t_sock, F_SETFL, i);
}
wrk_fds_add(tp->t_sock, 1);
return 0;
}
#endif /*0*/
syntax highlighted by Code2HTML, v. 0.9.1