static char rcsid[] =
"$Id: ddpro.c,v 1.48 2004/01/14 18:50:55 pvmsrc Exp $";
/*
* PVM version 3.4: Parallel Virtual Machine System
* University of Tennessee, Knoxville TN.
* Oak Ridge National Laboratory, Oak Ridge TN.
* Emory University, Atlanta GA.
* Authors: J. J. Dongarra, G. E. Fagg, M. Fischer
* G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
* P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
* (C) 1997 All Rights Reserved
*
* NOTICE
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby granted
* provided that the above copyright notice appear in all copies and
* that both the copyright notice and this permission notice appear in
* supporting documentation.
*
* Neither the Institutions (Emory University, Oak Ridge National
* Laboratory, and University of Tennessee) nor the Authors make any
* representations about the suitability of this software for any
* purpose. This software is provided ``as is'' without express or
* implied warranty.
*
* PVM version 3 was funded in part by the U.S. Department of Energy,
* the National Science Foundation and the State of Tennessee.
*/
/*
* ddpro.c
*
* Entry points for messages from network.
*
* $Log: ddpro.c,v $
* Revision 1.48 2004/01/14 18:50:55 pvmsrc
* Added new AIX5* arches.
* (Spanker=kohl)
*
* Revision 1.47 2002/04/16 15:06:00 pvmsrc
* WIN32 Fixes for Multiple Domains.
* - submitted by Jigen Zhou <jigen@icemcfd.com>.
* (Spanker=kohl)
*
* Revision 1.46 2002/02/21 23:18:40 pvmsrc
* Added new (not to be documented!) PVM_MAX_TASKS env var support.
* - for Mahir Lokvancic <mahir@math.ufl.edu>.
* - forcefully limits the number of tasks that can attach to a
* pvmd, required on Solaris in rare circumstances when hard
* FD_SETSIZE limit is reached, and all hell breaks loose...
* - check return for task_new() call, can now produce NULL ptr,
* indicating PvmOutOfRes...
* (Spanker=kohl)
*
* Revision 1.45 2001/09/26 23:35:19 pvmsrc
* Added new hd_vmid to hostd struct.
* - use to override local PVM_VMID settings with hostfile "id=" option
* (Spanker=kohl)
*
* Revision 1.44 2001/09/26 21:22:16 pvmsrc
* Added Handling for Optional Virtual Machine ID.
* - extra vmid comes through with SM_STHOST message (after wincmd).
* - instruct user to type VMID on remote pvmd stdin for manual startup
* - in phase1(), after potential rsh/rexec, write VMID env string
* to remote pvmd's stdin.
* (Spanker=kohl)
*
* Revision 1.43 2001/05/11 17:31:56 pvmsrc
* Cast getcwd() call, in case header file sucks.
* - eliminates warning message on compile...
* (Spanker=kohl)
*
* Revision 1.42 2001/04/23 14:16:13 pvmsrc
* Added new working directory option to pvm_spawn().
* - use "where" argument to cram in working directory,
* e.g. where = "msr.epm.ornl.gov:/home/user/project/bozo"
* - TM_SPAWN strips out working directory & creates PVMSPAWNWD env var
* - exectasks() (called by DM_EXEC or SM_EXEC) checks for env var
* and does a chdir() (even uses getcwd() to reset directory :-).
* - should not introduce any run-time incompatibility with older
* PVM releases.
* - PvmTaskHost or PvmTaskArch flags need not be used, i.e.
* pvm_spawn( "foo", 0, PvmTaskDefault, ":/tmp", 1, &tid ) works.
* (Spanker=kohl)
*
* Revision 1.41 2001/02/07 23:14:02 pvmsrc
* First Half of CYGWIN Check-ins...
* (Spanker=kohl)
*
* Revision 1.40 2000/06/16 16:27:32 pvmsrc
* DAMN. The seemingly cool and efficient tp->t_flag / TF_MBNOTIFY
* solution is not as cool as previously thought.
* - each pvmd only keeps a *local* tasks table...! D-OH!
* - need to just search the wait context list for an existing
* WT_TASKX mbox notify, as previously feared.
* - already checking wait context list for WT_RECVINFO, so this
* is not too terrible...
* - Damn, though...
* Removed TF_MBNOTIFY contant - now fricking useless.
* (Spanker=kohl)
*
* Revision 1.39 2000/06/13 22:37:56 pvmsrc
* In dm_db():
* - on successful insert, check whether task already had an mbox
* notify (for mb_tidy()) set up.
* - after first such notify / waitc created, set tp->t_flag
* TF_MBNOTIFY bit.
* - this avoids a HUGE number of redundant notifies for multiple
* inserts by the same task. D-Oh!
* (Spanker=kohl)
*
* Revision 1.38 2000/02/17 23:12:08 pvmsrc
* *** Changes for new BEOLIN port ***
* - MPP-like, similar to SP2, etc.
* - submitted by Paul Springer <pls@smokeymt.jpl.nasa.gov>.
* - format-checked & cleaned up by Jeembo... :-)
* (Spanker=kohl)
*
* Revision 1.37 2000/02/16 21:59:38 pvmsrc
* Fixed up #include <sys/types.h> stuff...
* - use <bsd/sys/types.h> for IMA_TITN...
* - #include before any NEEDMENDIAN #includes...
* (Spanker=kohl)
*
* Revision 1.36 2000/02/10 23:53:35 pvmsrc
* Added new PvmIPLoopback error code:
* - Master Host IP Address tied to Loopback.
* - check for this case in addhosts(), don't even TRY to add hosts...
* (Spanker=kohl)
*
* Revision 1.35 1999/08/19 15:39:22 pvmsrc
* Whoa... New wincmd stuff was whacking addhost protocol!
* - *always* pack in something for wincmd, else the attempt to unpack
* "possible" wincmd will snag start of next host's startup info...
* - damn.
* (Spanker=kohl)
*
* Revision 1.34 1999/07/08 18:59:50 kohl
* Fixed "Log" keyword placement.
* - indent with " * " for new CVS.
*
* Revision 1.33 1999/03/05 17:21:23 pvmsrc
* improved to work with registry/environment on NT/win95-98
* and devstudio 5/6
* (Spanker=sscott)
*
* Revision 1.32 1999/02/10 22:38:24 pvmsrc
* Fixed passing of pvmdebmask to new / slave pvmds...
* - need to do -d0x%x to insure interpreted as hex (not just -d%x).
* - report submitted by Paul Springer <pls@volcanoes.jpl.nasa.gov>.
* (Spanker=kohl)
*
* Revision 1.31 1999/01/28 18:55:30 pvmsrc
* Modified logic to allow alternate WIN32 pvmd path.
* - use new PVM_WINDPATH env var if set, else new WINPVMDPATH define.
* - append alternate WIN32 command path to end of SM_STHOST message,
* so smart hosters (and pvmd' running hoster()) can unpack it
* and use it if the default Unix pvmd path fails to start a pvmd.
* (Spanker=kohl)
*
* Revision 1.30 1998/10/02 15:43:55 pvmsrc
* Single source code merge of Win32 and Unix code.
* (Spanker=sscott)
*
* Revision 1.29 1998/02/23 22:51:24 pvmsrc
* Added AIX4SP2 stuff.
* (Spanker=kohl)
*
* Revision 1.28 1998/01/12 21:13:22 pvmsrc
* Replaced inline constants with new task output op defines.
* - TO_NEW == -2.
* - TO_SPAWN == -1.
* - TO_EOF == 0.
* (Spanker=kohl)
*
* Revision 1.27 1997/08/29 13:34:59 pvmsrc
* OS2 Port Submitted by Bohumir Horeni, horeni@login.cz.
* (Spanker=kohl)
*
* Revision 1.26 1997/08/26 22:55:25 pvmsrc
* Make sure wp->wa_mesg is cleared after sendmessage() call.
* - otherwise wait_delete() will re-free the pmsg struct causing
* major bogusness...
* (Spanker=kohl)
*
* Revision 1.25 1997/08/18 19:08:37 pvmsrc
* Cleaned up dm_pstatack() routine.
* - had leftover pmsg_dump() calls in it, was cluttering
* up /tmp/pvml.<uid>.
* (Spanker=kohl)
*
* Revision 1.24 1997/06/27 18:03:30 pvmsrc
* Integrated WIN32 changes.
*
* Revision 1.23 1997/06/17 19:04:26 pvmsrc
* Enhanced WT_SPAWN case in hostfailentry().
* - for PvmTaskDefault, if the host that the pvmd picked has failed,
* go ahead and retry spawning it on another host.
*
* Revision 1.22 1997/06/16 13:39:44 pvmsrc
* Updated exectasks() to pass additional information during spawn ops.
*
* Revision 1.21 1997/05/19 21:18:13 pvmsrc
* Awwww Shit. I did it.
* - crunchzap() sux, can't free buf in startack() until ac/av used.
*
* Revision 1.20 1997/05/12 20:28:17 pvmsrc
* Removed duplicate #includes...
*
* Revision 1.19 1997/05/07 21:22:07 pvmsrc
* AAAIIIEEEEEEEEE!!!!
* - removed all static-limited string unpacking:
* * replaced with use of:
* upkstralloc() / PVM_FREE() (for pvmd stuff).
* new pvmupkstralloc() / PVM_FREE() (for lpvm stuff).
* * manual allocation of local buffers for sprintf() & packing.
* * alternate static string arrays to replace fixed-length cases.
* * I hope this shit works... :-Q
*
* Revision 1.18 1997/05/02 13:43:43 pvmsrc
* call mpp_load for SP2MPI
*
* Revision 1.17 1997/04/30 21:25:54 pvmsrc
* SGI Compiler Warning Cleanup.
*
* Revision 1.16 1997/04/10 18:59:06 pvmsrc
* Damn. Stupid typo (as opposed to a smart one?).
*
* Revision 1.15 1997/04/10 18:46:56 pvmsrc
* Bug fix in dm_db():
* - return value for mb_lookup() call was overwriting req index,
* need to save req in case PvmMboxWaitForInfo...
*
* Revision 1.14 1997/04/10 18:32:03 pvmsrc
* Damn. Forgot the stupid PvmMboxWaitForInfo flag check... D-Oh!
*
* Revision 1.13 1997/04/10 17:53:19 pvmsrc
* Added handling of blocking (PvmMboxWaitForInfo) calls to pvm_recvinfo().
* - in dm_db(), on TMDB_GET, if not found create WT_RECVINFO.
* - in dm_db(), on TMDB_PUT, check for matching WT_RECVINFO & release.
* - in hostfailentry(), wipe out any pending WT_RECVINFOs if host
* where task resided is now toast.
*
* Revision 1.12 1997/04/09 19:37:56 pvmsrc
* Added class name and index args to pvmreset() routine:
* - allow systematic elimination of leftover persistents...
* - heh, heh, heh...
* In dm_db(), check for name (class) and req (index) match
* for TMDB_RESET case (""/-1 for all, resp).
*
* Revision 1.11 1997/04/09 14:36:06 pvmsrc
* PVM patches from the base 3.3.10 to 3.3.11 versions where applicable.
* Originals by Bob Manchek. Altered by Graham Fagg where required.
*
* Revision 1.10 1997/04/08 19:57:47 pvmsrc
* Promoted mbox static "classes" to public global "pvmmboxclasses".
* - so pvmd can spank mboxes in ddpro.c... :-Q
* - renamed everywhere, moved decl / extern to global.[ch].
*
* Revision 1.9 1997/04/08 19:42:55 pvmsrc
* *** Added new system reset protocol / wait linkage.
* - new DM_RESET / dm_reset() & DM_RESETACK / dm_resetack().
* - new WT_RESET wait type.
* - modified dm_db() to include new TMDB_RESET(nnr, noresets) option:
* * clean up mboxes, except for "no-reset" tasks.
* * for persistent mboxes, set up WT_RESET to remove mbox on task
* exit, propagate to task's host via DM_RESET.
* * on task exit, WT_RESET wipes mbox out, DM_RESETACK passes
* word on to master pvmd (if necessary).
*
* Revision 1.8 1997/03/07 14:05:11 pvmsrc
* Phixed Phunky Phil #endif.
* - no preceding tab on some archs.
*
* Revision 1.7 1997/03/06 20:55:34 pvmsrc
* add call to mpp_load when loading onto compute partition.
*
* Revision 1.6 1997/02/13 19:05:05 pvmsrc
* Fixed mbox cleanup problem:
* - in dm_db() for TMDB_PUT case, if successful create master PVMD
* notify on inserting task (if task not local, forward DM_NOTIFY).
* - then on task exit, call mb_tidy() if WT_TASKX notify wait context
* exists (in hostfailentry() and task_cleanup()), or if empty
* notify propagates back to master PVMD via DM_NOTIFYACK.
*
* Revision 1.5 1997/02/13 15:10:00 pvmsrc
* Removed unnecessary extern for struct waitc *waitlist.
* - now in global.h.
*
* Revision 1.4 1997/01/28 19:26:14 pvmsrc
* New Copyright Notice & Authors.
*
* Revision 1.3 1996/10/25 13:57:16 pvmsrc
* Replaced old #includes for protocol headers:
* - <pvmsdpro.h>, "ddpro.h", "tdpro.h"
* With #include of new combined header:
* - <pvmproto.h>
*
* Revision 1.2 1996/10/24 21:50:47 pvmsrc
* Moved #include "global.h" below other #include's for typing.
* Added #include <pvmtev.h> for TEV_* constants.
* Added extern struct Pvmtracer pvmtracer for tracer info.
* Modified exectasks():
* - if tracer present, adjust settings for trace & output collect,
* and munge the environment to replace values for PVMTMASK,
* PVMTRCBUF & PVMTRCOPT.
* - replaced inline code with call to new tev_send_newtask().
* - modified checking of trctid / outtid, check for < 0, not non-zero
* in case task denies external collect.
* Added handling of new DM_SLCONF_TRACE in dm_slconf():
* - tracer update configuration msg.
* - also added packing up of same in startack().
*
* Revision 1.1 1996/09/23 23:44:06 pvmsrc
* Initial revision
*
* Revision 1.15 1996/05/13 20:23:21 manchek
* a few fixes to manual startup code
*
* Revision 1.14 1995/07/24 18:23:14 manchek
* spelling problem
*
* Revision 1.13 1995/07/19 21:27:20 manchek
* use new function pvmnametag instead of [dts]mname
*
* Revision 1.12 1995/07/03 19:03:35 manchek
* if shared memory, get status from global table in dm_task before replying
*
* Revision 1.11 1995/06/16 16:11:34 manchek
* hack to pass output and trace sink to mpp_load for PGON
*
* Revision 1.10 1995/05/30 17:26:30 manchek
* added ifdefs for SP2MPI architecture
*
* Revision 1.9 1995/05/17 16:04:36 manchek
* added HF_OVERLOAD flag.
* changed global mytid to pvmmytid
*
* Revision 1.8 1995/02/01 21:03:07 manchek
* addhosts returns PvmDSysErr instead of -1 if something breaks.
* mpp_load is called with environment so it can be passed to tasks.
* dm_execack processes received tids in order instead of backwards
*
* Revision 1.7 1994/10/15 19:02:36 manchek
* cast message tags for comparison as integer.
* send output and trace open messages for dmp and shmem ports.
* check newhosts when deleting host
*
* Revision 1.6 1994/07/18 19:18:10 manchek
* hostfailentry() no longer matches pvmd' to t0.
* wa_dep wasn't propogated from hoststart to htupd
*
* Revision 1.5 1994/06/30 21:33:46 manchek
* addhosts() uses PVM_DPATH envar
*
* Revision 1.4 1994/06/21 18:29:21 manchek
* subscript arith in dmname() broke with opt
*
* Revision 1.3 1994/06/04 22:33:16 manchek
* missed resetting busyadding in hostfailentry(WT_HTUPD)
*
* Revision 1.2 1994/06/03 20:38:12 manchek
* version 3.3.0
*
* Revision 1.1 1993/08/30 23:26:47 manchek
* Initial revision
*
*/
#ifdef IMA_TITN
#include <bsd/sys/types.h>
#else
#include <sys/types.h>
#endif
#ifdef NEEDMENDIAN
#include <machine/endian.h>
#endif
#ifdef NEEDENDIAN
#include <endian.h>
#endif
#ifdef NEEDSENDIAN
#include <sys/endian.h>
#endif
#ifdef WIN32
#include "pvmwin.h"
#endif
#if defined(WIN32) || defined(CYGWIN)
#include "..\xdr\types.h"
#include "..\xdr\xdr.h"
#else
#include <rpc/types.h>
#include <rpc/xdr.h>
#endif
#ifdef WIN32
#include <sys/stat.h>
#include <time.h>
#include <direct.h>
#include <errno.h>
#else
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#endif
#ifdef SYSVSTR
#include <string.h>
#else
#include <strings.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <pvm3.h>
#include <pvmproto.h>
#include "pvmalloc.h"
#include "host.h"
#include "pmsg.h"
#include "waitc.h"
#include "task.h"
#include "listmac.h"
#if defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_CM5) \
|| defined(SHMEM) || defined(IMA_SP2MPI) || defined(IMA_AIX4SP2) \
|| defined(IMA_AIX5SP2) || defined(IMA_BEOLIN)
#include "pvmdmp.h"
#endif
#include "bfunc.h"
#include <pvmtev.h>
#include "msgbox.h"
#include "global.h"
void pvmbailout();
char *inadport_hex();
char *getenv();
void hex_inadport __ProtoGlarp__ (( char *, struct sockaddr_in * ));
void mesg_rewind __ProtoGlarp__ (( struct pmsg * ));
void tev_send_newtask();
char **colonsep();
char *varsub();
char *pvmnametag();
extern int pvmdebmask; /* from pvmd.c */
extern char **epaths; /* from pvmd.c */
extern char *debugger; /* from pvmd.c */
extern struct htab *filehosts; /* from pvmd.c */
extern struct htab *hosts; /* from pvmd.c */
extern int hostertid; /* from pvmd.c */
extern struct task *locltasks; /* from task.c */
extern char *myarchname; /* from pvmd.c */
extern int myhostpart; /* from pvmd.c */
extern int pvmmytid; /* from pvmd.c */
extern struct htab *newhosts; /* from pvmd.c */
extern int pprime; /* from pvmd.c */
extern int runstate; /* from pvmd.c */
extern int pvmschedtid; /* from pvmd.c */
extern struct Pvmtracer pvmtracer; /* from pvmd.c */
extern int tidhmask; /* from pvmd.c */
extern int tidlmask; /* from pvmd.c */
#ifndef IMA_WIN32_WATCOM
extern char **environ;
#endif
int busyadding = 0; /* lock for addhost op */
int hosterwid = 0; /* wid waiting on hoster */
/***************
** Private **
** **
***************/
int dm_add();
int dm_addack();
int dm_db();
int dm_dback();
int dm_delhost();
int dm_delhostack();
int dm_halt();
int dm_hostsync();
int dm_hostsyncack();
int dm_htcommit();
int dm_htdel();
int dm_htupd();
int dm_htupdack();
int dm_mca();
int dm_notify();
int dm_notifyack();
int dm_null();
int dm_pstat();
int dm_pstatack();
int dm_reset();
int dm_resetack();
int dm_sendsig();
int dm_slconf();
int dm_exec();
int dm_execack();
int dm_startack();
int dm_task();
int dm_taskack();
int dm_taskout();
int (*netswitch[])() = {
dm_add, dm_addack,
dm_exec, dm_execack,
dm_sendsig,
dm_htupd, dm_htupdack,
dm_htcommit,
dm_slconf,
dm_startack,
dm_task, dm_taskack,
dm_delhost, dm_delhostack,
dm_null,
dm_taskout,
dm_pstat, dm_pstatack,
dm_halt,
dm_mca,
dm_notify, dm_notifyack,
dm_db, dm_dback,
dm_reset, dm_resetack,
dm_htdel,
dm_hostsync, dm_hostsyncack,
};
/* hostfailentry()
*
* We've decided a remote pvmd has failed.
* Wake up any wait context waiting on that host (different action
* for each kind of wait).
* Send a HTDEL message to remaining hosts except us.
*/
int
hostfailentry(hp)
struct hostd *hp;
{
int hpart = hp->hd_hostpart;
struct waitc *wp, *wp2;
struct pmsg *mp;
struct pvmmentry *ep;
if (pvmdebmask & PDMHOST) {
pvmlogprintf("hostfailentry() host %s\n", hp->hd_name);
hd_dump(hp);
}
if (hp == hosts->ht_hosts[hosts->ht_master]) {
pvmlogerror("hostfailentry() lost master host, we're screwwwed\n");
pvmbailout(0);
}
/*
* if we're master pvmd, send HT_DEL message to all others
*/
if (hp->hd_hostpart && hosts->ht_master == hosts->ht_local) {
struct hostd *hp2;
int hh;
mp = mesg_new(0);
mp->m_tag = DM_HTDEL;
pkint(mp, hosts->ht_serial);
pkint(mp, hp->hd_hostpart);
for (hh = hosts->ht_last; hh > 0; hh--)
if (hh != hosts->ht_local
&& (hp2 = hosts->ht_hosts[hh]) && hp2 != hp) {
mp->m_ref++;
mp->m_dst = hp2->hd_hostpart | TIDPVMD;
sendmessage(mp);
}
pmsg_unref(mp);
/* inform the scheduler too */
if (pvmschedtid) {
mp = mesg_new(0);
mp->m_tag = SM_HOSTX;
mp->m_dst = pvmschedtid;
pkint(mp, hp->hd_hostpart | TIDPVMD);
sendmessage(mp);
}
}
for (wp = waitlist->wa_link; wp != waitlist; wp = wp->wa_link) {
if (wp->wa_on && (wp->wa_on & TIDHOST) == hpart) {
switch (wp->wa_kind) {
case WT_ADDHOST: /* the master must have died */
case WT_DELHOST: /* the master must have died */
pvmlogprintf("hostfailentry() can't deal with wait kind %d\n",
wp->wa_kind);
break;
case WT_HTUPD:
if (wp->wa_peer == wp) {
int hh;
mp = mesg_new(0);
mp->m_tag = DM_HTCOMMIT;
for (hh = hosts->ht_last; hh > 0; hh--)
if (hh != hosts->ht_local && (hp = hosts->ht_hosts[hh])) {
mp->m_ref++;
mp->m_dst = hp->hd_hostpart | TIDPVMD;
sendmessage(mp);
}
pmsg_unref(mp);
busyadding = 0;
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
}
break;
case WT_SPAWN:
{
struct waitc_spawn *wxp;
int retry = 0;
int v;
wxp = (struct waitc_spawn*)wp->wa_spec;
/* mark tasks assigned to this host as failed */
/* (or reset if PvmTaskDefault) */
for (v = wxp->w_veclen; v-- > 0; )
if (wxp->w_vec[v] == hp->hd_hostpart)
if (!(wxp->w_flags
& (PvmTaskHost|PvmTaskArch)))
{
wxp->w_vec[v] = 0;
retry++;
}
else
wxp->w_vec[v] = PvmHostFail;
ht_delete(wxp->w_ht, hp);
/* try assigning task again without failed host */
if ( retry )
assign_tasks(wp);
/* is this was the last wait, reply to task */
if (wp->wa_peer == wp)
assign_tasks(wp);
}
break;
case WT_TASK:
/* send message if we're the last waiter */
if (wp->wa_peer == wp) {
mp = wp->wa_mesg;
mp->m_ref++;
sendmessage(mp);
}
break;
case WT_HOSTSTART:
/* reply to waiter */
busyadding = 0;
if (wp->wa_spec) {
free_waitc_add((struct waitc_add *)wp->wa_spec);
wp->wa_spec = 0;
}
pkint(wp->wa_mesg, PvmDSysErr);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
break;
case WT_TASKX:
if (wp->wa_tid && wp->wa_mesg) {
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
}
mb_tidy(wp->wa_on);
break;
case WT_RESET:
if (wp->wa_tid && wp->wa_mesg) {
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
}
mb_tidy_reset(wp->wa_on);
break;
case WT_RECVINFO:
/* clean up pending recvinfo */
ep = (struct pvmmentry *) wp->wa_spec;
if ( ep->me_msg ) /* class name (overload :-Q) */
PVM_FREE( ep->me_msg );
PVM_FREE( ep );
break;
case WT_HOSTF:
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
break;
case WT_PSTAT:
case WT_MSTAT:
case WT_HOSTSYNC:
pkint(wp->wa_mesg, PvmHostFail);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
break;
default:
pvmlogprintf("hostfailentry() alien wait kind %d\n",
wp->wa_kind);
break;
}
wp2 = wp->wa_rlink;
wait_delete(wp);
wp = wp2;
}
}
return 0;
}
int
netentry(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int c = mp->m_tag;
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf(
"netentry() from host %s src t%x dst t%x tag %s wid %d\n",
hp->hd_name, mp->m_src, mp->m_dst, pvmnametag(c, (int *)0),
mp->m_wid);
/*
pvmhdump(mp->m_frag->fr_link->fr_dat, mp->m_frag->fr_link->fr_len,
"netentry() ");
*/
}
if (c < (int)DM_FIRST || c > (int)DM_LAST) {
pvmlogprintf("netentry() message from t%x with bogus code %s\n",
mp->m_src, pvmnametag(c, (int *)0));
goto bail;
}
c -= DM_FIRST;
(netswitch[c])(hp, mp);
bail:
pmsg_unref(mp);
return 0;
}
/*********************************
** Pvmd message entry points **
** **
*********************************/
/* hostids_new()
*
* Get a list of new host tids (ones not it user in host table).
* Wrap around to lowest number when we reach the maximum.
* Return as many as are available.
*/
int
hostids_new(num, tids)
int *num; /* count requested, returned */
int *tids; /* return tids */
{
static int lasthh = 1;
int oldhh;
int maxhostid = tidhmask >> (ffs(tidhmask) - 1);
int i;
oldhh = lasthh;
/* find next free hostid */
for (i = 0; i < *num; i++) {
if (++lasthh > maxhostid)
lasthh = 1;
while ((lasthh <= hosts->ht_last && hosts->ht_hosts[lasthh])) {
if (++lasthh > maxhostid)
lasthh = 1;
if (lasthh == oldhh)
goto done;
}
tids[i] = lasthh << (ffs(tidhmask) - 1);
/*
pvmlogprintf("hostids_new() tid %x\n", tids[i]);
*/
}
done:
return *num = i;
}
free_waitc_add(wxp)
struct waitc_add *wxp;
{
int i;
for (i = 0; i < wxp->w_num && wxp->w_hosts[i]; i++)
hd_unref(wxp->w_hosts[i]);
PVM_FREE(wxp->w_hosts);
PVM_FREE(wxp);
return 0;
}
/* addhosts()
*
* Add hosts requested with DM_ADD or SM_ADD. Call the hoster or
* start 'em.
*/
int
addhosts(mp, rmp)
struct pmsg *mp; /* the request message */
struct pmsg *rmp; /* reply message blank */
{
struct hostd *hp, *hp2;
struct pmsg *mp2;
struct waitc *wp = 0;
struct waitc_add *wxp = 0;
int i, j;
int count;
int ngood;
int ntid;
struct hostent *he;
int maxhostid = (tidhmask >> ffs(tidhmask) - 1);
int hh;
int pid;
int *tids;
char *winpvmdpath;
char *pvmdpath;
char *vmid;
char *buf;
int len;
/*
* have to lock this for 2 reasons:
* 1. system can't handle overlapping host table updates,
* 2. the new host tids aren't reserved
*/
if (busyadding) {
/*
pvmlogerror("addhosts() already adding new hosts\n");
*/
pkint(rmp, PvmAlready);
sendmessage(rmp);
return 0;
}
busyadding = 1;
/* sanity check count */
if (upkint(mp, &count) || count < 1 || count > maxhostid) {
pvmlogerror("addhosts() bad msg format\n");
goto bad;
}
/*
* make wait context, extract host list from message,
*/
wp = wait_new(WT_HOSTSTART);
wp->wa_tid = mp->m_src;
wp->wa_dep = mp->m_wid;
wxp = TALLOC(1, struct waitc_add, "waix");
wxp->w_num = count;
wxp->w_hosts = TALLOC(count, struct hostd *, "waiv");
BZERO((char*)wxp->w_hosts, count * sizeof(struct hostd *));
wp->wa_spec = (void *)wxp;
for (i = 0; i < count; i++) {
hp = hd_new(0);
wxp->w_hosts[i] = hp;
if (upkstralloc(mp, &buf)) {
pvmlogerror("addhosts() bad msg format\n");
goto bad;
}
if (parsehost(buf, hp)) {
hp->hd_err = PvmBadParam;
} else {
/* Set unspecified fields from hostfile if available */
if (filehosts &&
((hp2 = nametohost(filehosts, hp->hd_name))
|| (hp2 = filehosts->ht_hosts[0])))
applydefaults(hp, hp2);
}
PVM_FREE(buf);
}
/*
* verify we have a chance to add these babies...
* check whether our IP is "real" or just loopback
*/
hp = hosts->ht_hosts[hosts->ht_local];
if ( hp->hd_sad.sin_addr.s_addr == htonl(0x7f000001) ) {
/* damn, we're hosed. bail on host adds with new */
/* PvmIPLoopback error code... */
for (i = 0; i < count; i++) {
hp = wxp->w_hosts[i];
if (hp->hd_err)
continue;
hp->hd_err = PvmIPLoopback;
}
}
/*
* lookup IP addresses XXX we already have some of them
*/
ngood = 0;
for (i = 0; i < count; i++) {
hp = wxp->w_hosts[i];
if (hp->hd_err)
continue;
if (he = gethostbyname(hp->hd_aname ? hp->hd_aname : hp->hd_name)) {
BCOPY(he->h_addr_list[0], (char*)&hp->hd_sad.sin_addr,
sizeof(struct in_addr));
} else {
if (pvmdebmask & PDMSTARTUP) {
pvmlogprintf(
"start_slaves() can't gethostbyname: %s\n",
hp->hd_name);
}
hp->hd_err = PvmNoHost;
continue;
}
/* make sure it's not already configured */
if (!(hp->hd_flag & HF_OVERLOAD)) {
for (hh = hosts->ht_last; hh > 0; hh--)
if ((hp2 = hosts->ht_hosts[hh])
&& (hp2->hd_sad.sin_addr.s_addr == hp->hd_sad.sin_addr.s_addr)) {
hp->hd_err = PvmDupHost;
break;
}
if (hp->hd_err)
continue;
/* make sure new ones aren't duplicated */
for (j = i; j-- > 0; )
if (hp->hd_sad.sin_addr.s_addr
== wxp->w_hosts[j]->hd_sad.sin_addr.s_addr) {
hp->hd_err = PvmDupHost;
break;
}
if (hp->hd_err)
continue;
}
ngood++;
}
/*
* assign tids XXX these are unreserved until hosts are added...
*/
ntid = ngood;
tids = TALLOC(ngood, int, "xxx");
hostids_new(&ntid, tids);
if (ntid < ngood) {
pvmlogerror("addhosts() out of hostids\n");
ngood = ntid;
}
for (j = i = 0; i < count; i++) {
hp = wxp->w_hosts[i];
if (hp->hd_err)
continue;
if (j < ntid)
hp->hd_hostpart = tids[j++];
else
hp->hd_err = PvmOutOfRes;
}
PVM_FREE(tids);
/*
if (!ngood) {
XXX don't really need to send the message
}
*/
/* keep stub reply message to caller */
wp->wa_mesg = rmp;
/* make request message and send to hoster or pvmd' */
mp2 = mesg_new(0);
mp2->m_wid = wp->wa_wid;
pkint(mp2, ngood);
if (!(pvmdpath = getenv("PVM_DPATH")))
pvmdpath = PVMDPATH;
if (!(winpvmdpath = getenv("PVM_WINDPATH")))
winpvmdpath = WINPVMDPATH;
for (i = 0; i < count; i++) {
hp = wxp->w_hosts[i];
if (hp->hd_err)
continue;
pkint(mp2, hp->hd_hostpart);
pkstr(mp2, hp->hd_sopts ? hp->hd_sopts : "");
if (hp->hd_login) {
len = strlen(hp->hd_login)
+ strlen((hp->hd_aname ? hp->hd_aname : hp->hd_name))
+ 2;
buf = TALLOC( len, char, "hdl" );
sprintf(buf, "%s@%s", hp->hd_login,
(hp->hd_aname ? hp->hd_aname : hp->hd_name));
}
else
buf = STRALLOC( (hp->hd_aname
? hp->hd_aname : hp->hd_name) );
pkstr(mp2, buf);
PVM_FREE(buf);
/* default unix dpath */
len = strlen( (hp->hd_dpath ? hp->hd_dpath : pvmdpath) ) + 1
+ strlen( (hp->hd_sopts && !strcmp(hp->hd_sopts, "ms")
? "-S" : "-s") ) + 1
+ 2 + 16 + 1
+ 2 + strlen( hp->hd_name ) + 1
+ 5 * ( 16 + 1 );
buf = TALLOC( len, char, "hdall" );
(void)sprintf(buf, "%s %s -d0x%x -n%s %d %s %d",
(hp->hd_dpath ? hp->hd_dpath : pvmdpath),
(hp->hd_sopts && !strcmp(hp->hd_sopts, "ms")
? "-S" : "-s"),
pvmdebmask,
hp->hd_name,
hosts->ht_master,
inadport_hex(
&hosts->ht_hosts[hosts->ht_master]->hd_sad),
hosts->ht_hosts[hosts->ht_master]->hd_mtu);
(void)sprintf(buf + strlen(buf), " %d %s",
((hp->hd_hostpart & tidhmask) >> (ffs(tidhmask) - 1)),
inadport_hex(&hp->hd_sad));
pkstr(mp2, buf);
PVM_FREE(buf);
/* default WIN32 dpath - only if not set manually (a la dx=) */
if (!(hp->hd_dpath)){
len = strlen( winpvmdpath ) + 1
+ strlen( (hp->hd_sopts && !strcmp(hp->hd_sopts, "ms")
? "-S" : "-s") ) + 1
+ 2 + 16 + 1
+ 2 + strlen( hp->hd_name ) + 1
+ 5 * ( 16 + 1 );
buf = TALLOC( len, char, "hdallwin" );
(void)sprintf(buf, "%s %s -d0x%x -n%s %d %s %d",
winpvmdpath,
(hp->hd_sopts && !strcmp(hp->hd_sopts, "ms")
? "-S" : "-s"),
pvmdebmask,
hp->hd_name,
hosts->ht_master,
inadport_hex(
&hosts->ht_hosts[hosts->ht_master]->hd_sad),
hosts->ht_hosts[hosts->ht_master]->hd_mtu);
(void)sprintf(buf + strlen(buf), " %d %s",
((hp->hd_hostpart & tidhmask) >> (ffs(tidhmask) - 1)),
inadport_hex(&hp->hd_sad));
pkstr(mp2, buf);
PVM_FREE(buf);
}
/* be sure to pack SOMETHING, dammit */
else
pkstr(mp2, "");
/* Include VMID (If Any) */
if (hp->hd_vmid)
pkstr(mp2, hp->hd_vmid);
else if (vmid = getenv("PVM_VMID"))
pkstr(mp2, vmid);
/* be sure to pack SOMETHING, dammit */
else
pkstr(mp2, "");
}
mp2->m_tag = SM_STHOST;
#if !defined(WIN32) && !defined(IMA_OS2)
if (hostertid) {
hosterwid = wp->wa_wid;
mp2->m_dst = hostertid;
wp->wa_on = hostertid;
sendmessage(mp2);
} else {
wp->wa_on = TIDPVMD;
if (pid = fork()) { /* still us */
if (pid == -1) {
/* nack request if can't fork */
pvmlogperror("addhosts() fork");
goto bad;
} else {
pprime = pid;
pmsg_unref(mp2);
}
} else { /* pvmd' to do the startup */
beprime();
mesg_rewind(mp2);
hoster(mp2);
}
}
#else /* WIN32 */
if (!hostertid) { /* no hoster yet */
hostertid = tid_new(); /* give him a tid so that we can send
him the message */
if (start_hoster(hostertid) == -1) {
hostertid=0; /* you did not make it buddy */
pvmlogperror("addhosts(): could not start hoster \n");
goto bad;
}
}
hosterwid = wp->wa_wid;
mp2->m_dst = hostertid;
wp->wa_on = hostertid;
sendmessage(mp2);
#endif
return 0;
bad:
if (wxp)
free_waitc_add(wxp);
if (wp) {
wp->wa_mesg = 0; /* shared with rmp */
wait_delete(wp);
}
busyadding = 0;
pkint(rmp, PvmDSysErr);
sendmessage(rmp);
return 0;
}
#ifdef WIN32
/* start_hoster()
*
* forkexec the hoster process which
* will start other pvmd's.
* acts like pvmd'
*/
int
start_hoster(reserved_tid)
int reserved_tid;
{
char hosterpath[128];
(void) strcpy(hosterpath,(char *) pvmgetroot());
(void) strcat(hosterpath,"\\bin\\WIN32\\hoster.exe");
if (hostexectasker(hosterpath,reserved_tid))
return -1;
return 0;
}
int
hostexectasker(file, tid)
char *file;
int tid;
{
int tids = 0; /* tid from hosterforkexec */
struct pmsg *rmp;
struct task *tp = 0;
int err = 0; /* error code from forkexec */
rmp = mesg_new(0);
rmp->m_dst = pvmmytid;
rmp->m_src = pvmmytid;
rmp->m_tag = DM_EXECACK;
if (err = hosterforkexec(tid,file, &tp)) {
tids = err;
} else {
tp->t_ptid = 0;
tp->t_outtid = 0;
tp->t_trctid = 0;
tp->t_sched = 0;
tids = tp->t_tid;
}
pkint(rmp, 1);
pkint(rmp, tids);
sendmessage(rmp);
return err;
}
extern char **environ;
int
hosterforkexec(tid, name, tpp)
int tid; /* tid given by hosterexectask */
char *name; /* filename */
struct task **tpp; /* return task context */
{
int pid = -1; /* task pid */
char *argv[2];
struct task *tp; /* new task context */
char *expected_pid;
char buf[32];
char *myenv[100];
HANDLE hpid;
char **p, **q;
struct stat sb;
extern int *ptr_nfp; /* XXX fix this */
SECURITY_ATTRIBUTES saPipe;
PROCESS_INFORMATION pi;
STARTUPINFO si; /* for CreateProcess call */
if (stat(name, &sb) == -1)
return PvmNoFile;
if ((tp = task_new(tid)) == NULL) {
pvmlogperror("forkexec_hoster() too many tasks?\n");
return PvmOutOfRes;
}
p = myenv;
q = environ;
while (*q) {
*p++ = *q++;
}
/* copy all the environment for socket stuff and more */
expected_pid=malloc(20 * sizeof(char));
sprintf(expected_pid, "PVMEPID=%d", *ptr_nfp);
*p++ = expected_pid;
*p=0;
pvmputenv(expected_pid);
argv[0]=name;
argv[1]=0;
saPipe.nLength = sizeof(SECURITY_ATTRIBUTES);
saPipe.lpSecurityDescriptor = NULL;
saPipe.bInheritHandle = FALSE;
memset(&si, 0, sizeof(si));
si.cb = sizeof(si);
if (0) {
pid = CreateProcess(name, /* filename */
NULL, /* full command line for child */
NULL, /* process security descriptor */
NULL, /* thread security descriptor */
FALSE, /* inherit handles? */
DETACHED_PROCESS, /* creation flags */
NULL, /* inherited environment address */
NULL, /* startup dir; NULL = start in current */
&si, /* pointer to startup info (input) */
&pi); /* pointer to process info (output) */
/* fprintf(stderr,"yup \n"); */
}
else
pid = _spawnve(_P_NOWAIT,name,argv,myenv);
if (pid == -1) {
pvmlogperror("forkexec_hoster() _spawnve");
/* task_free(&tp); */
pvmbailout(0);
return PvmOutOfRes;
}
task_setpid(tp,*ptr_nfp);
*ptr_nfp=*ptr_nfp + 1;
tp->t_flag |= TF_FORKD;
tp->t_a_out = STRALLOC(name);
*tpp = tp;
return 0;
}
#endif
/* dm_add()
*
* (Master pvmd) gets request to add new hosts.
*DocThis
* DM_ADD(wid) {
* int count
* string name[count]
* }
*EndDocThis
*/
int
dm_add(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct pmsg *rmp;
rmp = mesg_new(0);
rmp->m_dst = mp->m_src;
rmp->m_ctx = mp->m_ctx;
rmp->m_tag = DM_ADDACK;
rmp->m_wid = mp->m_wid;
addhosts(mp, rmp);
return 0;
}
/* dm_addack()
*
* Reply to DM_ADD operation.
*
* DM_ADDACK(wid_rtn) {
* int nhosts // or error code
* int narches // if nhosts >= 0
* {
* int tid // or error code
* string name
* string arch
* int mtu
* int speed
* } [nhosts]
* }
*/
int
dm_addack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
if (!(wp = wait_get(hp, mp, WT_ADDHOST)))
return 0;
pmsg_packbody(wp->wa_mesg, mp);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
wait_delete(wp);
return 0;
}
/* exectasks()
*
* Start tasks requested with DM_EXEC or SM_EXEC.
* Call the tasker or start 'em.
* We try to fork/exec given number of tasks. On first error, fill
* remainder of tid array with error code.
* Return tid array to caller.
*/
int
exectasks(mp, rmp, schtid)
struct pmsg *mp; /* the request message */
struct pmsg *rmp; /* reply message blank */
int schtid; /* scheduler for new tasks */
{
struct pmsg *mp2; /* reply message hdl */
int i;
struct timeval now;
struct waitc_spawn *wxp; /* new task parameters */
int munge_tenv = 0;
char tmp[255];
char *wd = 0;
char *savewd = 0;
wxp = TALLOC(1, struct waitc_spawn, "waix");
BZERO((char*)wxp, sizeof(struct waitc_spawn));
/* unpack message */
if (upkuint(mp, &wxp->w_ptid)
|| upkstralloc(mp, &wxp->w_file)
|| upkint(mp, &wxp->w_flags)
|| upkint(mp, &wxp->w_veclen)
|| upkint(mp, &wxp->w_argc))
goto bad;
if (wxp->w_veclen < 1)
goto bad;
wxp->w_vec = TALLOC(wxp->w_veclen, int, "tids");
wxp->w_argc += 2;
wxp->w_argv = TALLOC(wxp->w_argc + 1, char*, "argv");
BZERO((char*)wxp->w_argv, (wxp->w_argc + 1) * sizeof(char*));
wxp->w_argv++;
wxp->w_argv[0] = wxp->w_file;
wxp->w_file = 0;
wxp->w_argv[--wxp->w_argc] = 0;
for (i = 1; i < wxp->w_argc; i++)
if (upkstralloc(mp, &wxp->w_argv[i]))
goto bad;
if (upkuint(mp, &wxp->w_outtid)
|| upkuint(mp, &wxp->w_outctx)
|| upkuint(mp, &wxp->w_outtag)
|| upkuint(mp, &wxp->w_trctid)
|| upkuint(mp, &wxp->w_trcctx)
|| upkuint(mp, &wxp->w_trctag)
|| upkuint(mp, &wxp->w_nenv))
goto bad;
wxp->w_hosttotal = wxp->w_veclen; /* this is the task count! */
if (pvmtracer.trctid) {
if (!(wxp->w_trctid) && pvmtracer.trctag) {
wxp->w_trctid = pvmtracer.trctid;
wxp->w_trcctx = pvmtracer.trcctx;
wxp->w_trctag = pvmtracer.trctag;
munge_tenv++;
}
}
if (pvmtracer.outtid) {
if (!(wxp->w_outtid) && pvmtracer.outtag) {
wxp->w_outtid = pvmtracer.outtid;
wxp->w_outctx = pvmtracer.outctx;
wxp->w_outtag = pvmtracer.outtag;
}
}
wxp->w_env = TALLOC((wxp->w_nenv + 1), char*, "env");
BZERO(wxp->w_env, (wxp->w_nenv + 1) * sizeof(char*));
for (i = 0; i < wxp->w_nenv; i++)
if (upkstralloc(mp, &wxp->w_env[i]))
goto bad;
if ( upkuint(mp, &wxp->w_instance) || upkuint(mp, &wxp->w_outof))
goto bad;
/* check for spawn working directory */
for (i = 0; i < wxp->w_nenv; i++)
if ( !strncmp( "PVMSPAWNWD=", wxp->w_env[i],
strlen("PVMSPAWNWD=") ) )
wd = STRALLOC( wxp->w_env[i] + strlen("PVMSPAWNWD=") );
/* munge env for tracing stuff */
if ( munge_tenv ) {
sprintf( tmp, "PVMTMASK=%s", pvmtracer.tmask );
pvmenvinsert( &(wxp->w_env), tmp );
sprintf( tmp, "PVMTRCBUF=%d", pvmtracer.trcbuf );
pvmenvinsert( &(wxp->w_env), tmp );
sprintf( tmp, "PVMTRCOPT=%d", pvmtracer.trcopt );
pvmenvinsert( &(wxp->w_env), tmp );
for ( wxp->w_nenv = 0 ; wxp->w_env[ wxp->w_nenv ] ;
(wxp->w_nenv)++ );
}
/* call ppi_load to get tasks running */
wxp->w_sched = schtid;
/* change to desired working directory (if specified) */
if (wd) {
savewd = (char *) getcwd( (char *) NULL, 255 );
chdir( wd );
}
#if defined(IMA_PGON) || defined(IMA_SP2MPI) || defined(IMA_AIX4SP2) \
|| defined(IMA_AIX5SP2) || defined(IMA_BEOLIN)
if (!(wxp->w_flags & PvmMppFront))
{
mpp_load(wxp);
}
else
#endif
{
ppi_load(wxp);
}
/* go back to original directory (if getcwd() was successful) */
if (savewd)
chdir( savewd );
for (i = 0; i < wxp->w_veclen; i++) {
if (wxp->w_vec[i] > 0) {
if (wxp->w_trctid > 0) {
tev_send_newtask(
wxp->w_trctid, wxp->w_trcctx, wxp->w_trctag,
wxp->w_vec[i], wxp->w_ptid, wxp->w_flags,
wxp->w_argv[0] );
}
if (wxp->w_outtid > 0) {
mp2 = mesg_new(0);
mp2->m_dst = wxp->w_outtid;
mp2->m_ctx = wxp->w_outctx;
mp2->m_tag = wxp->w_outtag;
pkint(mp2, wxp->w_vec[i]);
pkint(mp2, TO_NEW);
pkint(mp2, wxp->w_ptid);
sendmessage(mp2);
}
}
}
pkint(rmp, wxp->w_veclen);
for (i = 0; i < wxp->w_veclen; i++)
pkint(rmp, wxp->w_vec[i]);
sendmessage(rmp);
goto cleanup;
bad:
pvmlogprintf("exectasks() from t%x bad msg format\n", mp->m_src);
cleanup:
if (wxp->w_argv)
*--wxp->w_argv = 0;
free_wait_spawn(wxp);
if (wd)
PVM_FREE(wd);
return 0;
}
/* dm_exec()
*
* Request to start task processes.
*
* DM_EXEC(wid) {
* int ptid
* string file
* int flags
* int count
* int nargs
* string argv[nargs]
* int outtid
* int outctx
* int outtag
* int trctid
* int trcctx
* int trctag
* int nenv
* string env[nenv]
* }
*/
int
dm_exec(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct pmsg *rmp;
hp = hp;
rmp = mesg_new(0);
rmp->m_dst = mp->m_src;
rmp->m_ctx = mp->m_ctx;
rmp->m_tag = DM_EXECACK;
rmp->m_wid = mp->m_wid;
exectasks(mp, rmp, pvmschedtid);
return 0;
}
/* dm_execack()
*
* Reply to DM_EXEC op.
*
* DM_EXECACK(wid_rtn) {
* int count
* int tids[count]
* }
*/
int
dm_execack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
struct waitc_spawn *wxp;
int rcnt; /* num of tids+errors returned */
int tid;
int v;
int err = 0;
int i;
if (!(wp = wait_get(hp, mp, WT_SPAWN)))
return 0;
wxp = (struct waitc_spawn*)wp->wa_spec;
if (upkint(mp, &rcnt))
goto bad;
v = wxp->w_veclen;
/*
* unpack tids and place in result vector where hostpart is now
*/
/*
pvmlogprintf("dm_execack() hp %x vec len %d, repl len %d\n",
hp->hd_hostpart, v, rcnt);
*/
i = 0;
while (rcnt-- > 0) {
if (upkint(mp, &tid))
goto bad;
if (tid < 0)
err++;
while (i < v && wxp->w_vec[i] != hp->hd_hostpart)
i++;
if (i == v) {
pvmlogerror("dm_execack() tids don't fit result vector?\n");
wait_delete(wp);
return 0;
}
wxp->w_vec[i++] = tid;
}
if (err)
ht_delete(wxp->w_ht, hp);
/*
* if everyone has checked in, either restart the failed ones
* or reply to the caller.
*/
if (wp->wa_peer == wp)
assign_tasks(wp);
wait_delete(wp);
return 0;
bad:
pvmlogprintf("dm_execack() from 0x%x bad msg format\n", mp->m_src);
wait_delete(wp);
return 0;
}
/* dm_sendsig()
*
* Request to send signal to local task.
*
* DM_SENDSIG {
* int tid
* int signum
* }
*/
int
dm_sendsig(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int tid;
int signum;
struct task *tp;
hp = hp;
if (upkuint(mp, &tid) || upkint(mp, &signum)) {
pvmlogerror("dm_sendsig() bad msg format\n");
return 0;
}
if (tp = task_find(tid)) {
ppi_kill(tp, signum);
} else
if (pvmdebmask & (PDMTASK|PDMAPPL)) {
pvmlogprintf("dm_sendsig() signal for t%x scrapped\n", tid);
}
return 0;
}
/* dm_htupd()
*
* Host table update phase 1 - (Non-master) pvmd is notified of new
* host table.
*
* DM_HTUPD(wid) {
* int serial
* int master
* int console
* int count
* {
* int index
* string name
* string arch
* string hexipaddr
* int mtu
* int speed
* int dsig
* } [count]
* }
*/
int
dm_htupd(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int count; /* number of hosts in message */
int hh;
char buf[16]; /* for converting sockaddr */
struct pmsg *mp2;
/* unpack new host table params */
newhosts = ht_new(1);
newhosts->ht_local = hosts->ht_local;
upkint(mp, &newhosts->ht_serial);
upkint(mp, &newhosts->ht_master);
upkint(mp, &newhosts->ht_cons);
/* add current hosts to the table */
ht_merge(newhosts, hosts);
/* unpack new hosts and add to table */
/* XXX if we find a host already in table we should kill it with
XXX hostfail and put the new one in its place */
upkint(mp, &count);
while (count-- > 0) {
upkint(mp, &hh);
hp = hd_new(hh);
upkstralloc(mp, &hp->hd_name);
upkstralloc(mp, &hp->hd_arch);
upkstr(mp, buf, sizeof(buf));
hex_inadport(buf, &hp->hd_sad);
upkint(mp, &hp->hd_mtu);
upkint(mp, &hp->hd_speed);
upkint(mp, &hp->hd_dsig);
ht_insert(newhosts, hp);
hd_unref(hp);
}
if (pvmdebmask & PDMHOST) {
pvmlogerror("dm_htupd() new host table:\n");
ht_dump(newhosts);
}
runstate = PVMDHTUPD;
/* reply to sender that we have new host table */
mp2 = mesg_new(0);
mp2->m_dst = mp->m_src;
mp2->m_ctx = mp->m_ctx;
mp2->m_tag = DM_HTUPDACK;
mp2->m_wid = mp->m_wid;
sendmessage(mp2);
return 0;
}
/* dm_htupdack()
*
* Reply to DM_HTUPD op.
*
* DM_HTUPDACK(wid_rtn) { }
*/
int
dm_htupdack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
if (!(wp = wait_get(hp, mp, WT_HTUPD)))
return 0;
/* is this is the last host checking in, send ht commit */
if (wp->wa_peer == wp) {
int hh;
mp = mesg_new(0);
mp->m_tag = DM_HTCOMMIT;
for (hh = hosts->ht_last; hh > 0; hh--)
if (hh != hosts->ht_local && (hp = hosts->ht_hosts[hh])) {
mp->m_ref++;
mp->m_dst = hp->hd_hostpart | TIDPVMD;
sendmessage(mp);
}
pmsg_unref(mp);
busyadding = 0;
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
}
wait_delete(wp);
return 0;
}
/* ht_diff()
*
* Generates a host table containing entries in ht2 but not in ht1.
*/
struct htab *
ht_diff(htp2, htp1)
struct htab *htp2; /* more */
struct htab *htp1; /* less */
{
struct htab *htp;
int hh;
htp = ht_new(1);
for (hh = htp2->ht_last; hh > 0; hh--)
if (htp2->ht_hosts[hh] && !htp2->ht_hosts[hh]->hd_err
&& (hh > htp1->ht_last || !htp1->ht_hosts[hh]))
ht_insert(htp, htp2->ht_hosts[hh]);
return htp;
}
/* gotnewhosts()
*
* Used to wake up any waitcs of kind WT_HOSTA.
* Sends a message containing count and list of new d-tids.
*/
gotnewhosts(htp2, htp1)
struct htab *htp2; /* new host table */
struct htab *htp1; /* old host table */
{
struct pmsg *mp;
struct htab *htp;
struct waitc *wp, *wp2;
int hh;
mp = 0;
for (wp = waitlist->wa_link; wp != waitlist; wp = wp2) {
wp2 = wp->wa_link;
if (wp->wa_kind == WT_HOSTA) {
if (!mp) {
mp = mesg_new(0);
htp = ht_diff(htp2, htp1);
pkint(mp, htp->ht_cnt);
for (hh = htp->ht_last; hh > 0; hh--)
if (htp->ht_hosts[hh])
pkint(mp, htp->ht_hosts[hh]->hd_hostpart);
ht_free(htp);
}
mp->m_ref++;
mp->m_dst = wp->wa_mesg->m_dst;
mp->m_ctx = wp->wa_mesg->m_ctx;
mp->m_tag = wp->wa_mesg->m_tag;
sendmessage(mp);
if (wp->wa_count != -1 && --wp->wa_count < 1)
wait_delete(wp);
}
}
if (mp)
pmsg_unref(mp);
return 0;
}
/* dm_htcommit()
*
* Host table update phase 2 - commit to new host table.
*
* DM_HTCOMMIT { }
*/
int
dm_htcommit(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct htab *htp;
if (hp != hosts->ht_hosts[hosts->ht_master]) {
pvmlogprintf("dm_htcommit() from t%x (not master)?\n",
mp->m_src);
}
if (newhosts) {
htp = hosts;
hosts = newhosts;
newhosts = 0;
if (pvmdebmask & PDMHOST) {
pvmlogprintf(
"dm_htcommit() committing from host table serial %d to %d\n",
htp->ht_serial, hosts->ht_serial);
}
gotnewhosts(hosts, htp);
ht_free(htp);
runstate = PVMDNORMAL;
} else {
pvmlogerror("dm_htcommit() no new host table pending?\n");
}
return 0;
}
/* dm_slconf()
*
* Pvmd gets config info (from master).
* One of these should arrive before a pvmd is fully up, even
* before the host table update, to set private params.
* More may be used later to set random knobs.
*
* DM_SLCONF {
* {
* int fieldtype // from ddpro.h
* string value
* } [] // implied
* }
*/
int
dm_slconf(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int t; /* field type */
char *s, *s2;
mp = mp;
hp = hp;
while (!upkint(mp, &t) && !upkstralloc(mp, &s)) {
switch (t) {
case DM_SLCONF_EP:
if (pvmdebmask & (PDMTASK|PDMSTARTUP)) {
pvmlogprintf("dm_slconf() ep<%s>\n", s);
}
epaths = colonsep(varsub(s));
PVM_FREE(s);
break;
case DM_SLCONF_BP:
if (pvmdebmask & PDMSTARTUP) {
pvmlogprintf("dm_slconf() bp<%s>\n", s);
}
debugger = varsub(s);
PVM_FREE(s);
break;
case DM_SLCONF_WD:
if (pvmdebmask & (PDMTASK|PDMSTARTUP)) {
pvmlogprintf("dm_slconf() wd<%s>\n", s);
}
s2 = varsub(s);
if (chdir(s2) == -1)
pvmlogperror(s2);
PVM_FREE(s);
PVM_FREE(s2);
break;
case DM_SLCONF_SCHED:
if (pvmdebmask & (PDMSCHED|PDMSTARTUP)) {
pvmlogprintf("dm_slconf() sched<t%x>\n", pvmschedtid);
}
pvmschedtid = pvmxtoi(s);
break;
case DM_SLCONF_TRACE: {
Pvmtmask tmask;
int ttid, tctx, ttag, otid, octx, otag, tbuf, topt;
if (pvmdebmask & (PDMTRACE|PDMSTARTUP)) {
pvmlogprintf("dm_slconf() tracer<t%x>\n",
pvmtracer.trctid);
}
if (sscanf(s, "%x %d %d %x %d %d %d %d %s",
&ttid, &tctx, &ttag, &otid, &octx, &otag,
&tbuf, &topt, tmask) != 9) {
pvmlogprintf("dm_slconf() bogus string<%s>\n", s);
}
else {
pvmtracer.trctid = ttid;
pvmtracer.trcctx = tctx;
pvmtracer.trctag = ttag;
pvmtracer.outtid = otid;
pvmtracer.outctx = octx;
pvmtracer.outtag = otag;
pvmtracer.trcbuf = tbuf;
pvmtracer.trcopt = topt;
BCOPY(tmask,pvmtracer.tmask,TEV_MASK_LENGTH);
}
break;
}
default:
pvmlogprintf("dm_slconf() ? type %d val <%s>\n", t, s);
PVM_FREE(s);
break;
}
}
return 0;
}
/* startack()
*
* Take results from pvmd' or hoster. Update the machine config
* if any were successful. Reply to the DM_ADD or SM_ADD request.
*/
int
startack(wp, mp)
struct waitc *wp; /* wait context on hoster */
struct pmsg *mp;
{
struct hostd *hp;
struct pmsg *mp2;
struct waitc *wp2; /* seed waitc for htupd peer group */
struct waitc *wp3;
int count; /* num of new hosts */
int happy; /* num of happy new hosts */
struct waitc_add *wxp;
char *av[16]; /* for reply parsing */
int ac;
int ver;
int i, j;
int t;
int hh;
char *buf;
char buf2[256];
/*
* unpack startup results, update hosts in wait context
*/
wxp = (struct waitc_add *)wp->wa_spec;
count = wxp->w_num;
upkint(mp, &j);
while (j-- > 0) {
if (upkuint(mp, &t) || upkstralloc(mp, &buf)) {
pvmlogerror("startack() bad message format\n");
pkint(wp->wa_mesg, PvmDSysErr);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
wait_delete(wp);
busyadding = 0;
return 0;
}
for (i = count; i-- > 0 && wxp->w_hosts[i]->hd_hostpart != t; ) ;
if (i < 0) {
pvmlogprintf("startack() what? some random tid %x\n", t);
pkint(wp->wa_mesg, PvmDSysErr);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
wait_delete(wp);
busyadding = 0;
PVM_FREE(buf);
return 0;
}
hp = wxp->w_hosts[i];
ac = sizeof(av)/sizeof(av[0]);
if (crunchzap(buf, &ac, av) || ac != 5) {
pvmlogprintf("startack() host %s expected version, got \"%s\"\n",
hp->hd_name, buf);
if (!(hp->hd_err = errnamecode(buf)))
hp->hd_err = PvmCantStart;
PVM_FREE(buf);
continue;
}
ver = atoi(av[0]);
if (ver != DDPROTOCOL) {
pvmlogprintf(
"slave_exec() host %s d-d protocol mismatch (%d/%d)\n",
hp->hd_name, ver, DDPROTOCOL);
hp->hd_err = PvmBadVersion;
continue;
}
hp->hd_arch = STRALLOC(av[1]);
hex_inadport(av[2], &hp->hd_sad);
hp->hd_mtu = atoi(av[3]);
hp->hd_dsig = atoi(av[4]);
PVM_FREE(buf);
}
/*
* update reply message to add-host requestor
*/
mp2 = wp->wa_mesg;
pkint(mp2, count);
pkint(mp2, 0); /* XXX narches = 0 for now */
for (i = 0; i < count; i++) {
hp = wxp->w_hosts[i];
if (hp->hd_err) {
pkint(mp2, hp->hd_err);
pkstr(mp2, "");
pkstr(mp2, "");
pkint(mp2, 0);
pkint(mp2, 0);
} else {
pkint(mp2, hp->hd_hostpart);
pkstr(mp2, hp->hd_name);
pkstr(mp2, hp->hd_arch);
pkint(mp2, hp->hd_speed);
pkint(mp2, hp->hd_dsig);
}
}
/*
* delete broken ones, done now if none succeeded,
* otherwise done when host table update is complete.
*/
for (j = i = 0; i < count; i++)
if (!wxp->w_hosts[i]->hd_err) {
hp = wxp->w_hosts[i];
wxp->w_hosts[i] = 0;
wxp->w_hosts[j++] = hp;
} else {
hd_unref(wxp->w_hosts[i]);
wxp->w_hosts[i] = 0;
}
count = j;
if (count < 1) {
busyadding = 0;
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
free_waitc_add(wxp);
wait_delete(wp);
return 0;
}
wp2 = wait_new(WT_HTUPD);
wp2->wa_dep = wp->wa_dep;
wp2->wa_mesg = wp->wa_mesg;
wp->wa_mesg = 0;
/*
* make next host table
*/
newhosts = ht_new(1);
newhosts->ht_serial = hosts->ht_serial + 1;
newhosts->ht_master = hosts->ht_master;
newhosts->ht_cons = hosts->ht_cons;
newhosts->ht_local = hosts->ht_local;
for (i = 0; i < count; i++)
ht_insert(newhosts, wxp->w_hosts[i]);
free_waitc_add(wxp);
wait_delete(wp);
wp = 0;
runstate = PVMDHTUPD;
/*
* send DM_SLCONF message to each new host
*/
for (hh = newhosts->ht_last; hh > 0; hh--)
if (hp = newhosts->ht_hosts[hh]) {
mp2 = mesg_new(0);
mp2->m_tag = DM_SLCONF;
mp2->m_dst = hp->hd_hostpart | TIDPVMD;
if (hp->hd_epath) {
pkint(mp2, DM_SLCONF_EP);
pkstr(mp2, hp->hd_epath);
}
if (hp->hd_bpath) {
pkint(mp2, DM_SLCONF_BP);
pkstr(mp2, hp->hd_bpath);
}
if (hp->hd_wdir) {
pkint(mp2, DM_SLCONF_WD);
pkstr(mp2, hp->hd_wdir);
}
if (pvmschedtid) {
sprintf(buf2, "%x", pvmschedtid);
pkint(mp2, DM_SLCONF_SCHED);
pkstr(mp2, buf2);
}
if (pvmtracer.trctid || pvmtracer.outtid) {
sprintf(buf2, "%x %d %d %x %d %d %d %d %s",
pvmtracer.trctid, pvmtracer.trcctx,
pvmtracer.trctag,
pvmtracer.outtid, pvmtracer.outctx,
pvmtracer.outtag,
pvmtracer.trcbuf, pvmtracer.trcopt,
pvmtracer.tmask);
pkint(mp2, DM_SLCONF_TRACE);
pkstr(mp2, buf2);
}
sendmessage(mp2);
}
/*
* create host table update message containing all current hosts
* plus new ones, send to each new host.
*/
mp2 = mesg_new(0);
mp2->m_tag = DM_HTUPD;
pkint(mp2, newhosts->ht_serial);
pkint(mp2, newhosts->ht_master);
pkint(mp2, newhosts->ht_cons);
pkint(mp2, hosts->ht_cnt + newhosts->ht_cnt);
for (hh = hosts->ht_last; hh > 0; hh--)
if (hp = hosts->ht_hosts[hh]) {
pkint(mp2, hh);
pkstr(mp2, hp->hd_name);
pkstr(mp2, hp->hd_arch);
pkstr(mp2, inadport_hex(&hp->hd_sad));
pkint(mp2, hp->hd_mtu);
pkint(mp2, hp->hd_speed);
pkint(mp2, hp->hd_dsig);
}
for (hh = newhosts->ht_last; hh > 0; hh--)
if (hp = newhosts->ht_hosts[hh]) {
pkint(mp2, hh);
pkstr(mp2, hp->hd_name);
pkstr(mp2, hp->hd_arch);
pkstr(mp2, inadport_hex(&hp->hd_sad));
pkint(mp2, hp->hd_mtu);
pkint(mp2, hp->hd_speed);
pkint(mp2, hp->hd_dsig);
}
for (hh = newhosts->ht_last; hh > 0; hh--)
if (hp = newhosts->ht_hosts[hh]) {
mp2->m_ref++;
mp2->m_dst = hp->hd_hostpart | TIDPVMD;
wp3 = wait_new(WT_HTUPD);
wp3->wa_dep = wp2->wa_dep;
wp2->wa_mesg->m_ref++;
wp3->wa_mesg = wp2->wa_mesg;
wp3->wa_on = hp->hd_hostpart;
LISTPUTBEFORE(wp2, wp3, wa_peer, wa_rpeer);
mp2->m_wid = wp3->wa_wid;
sendmessage(mp2);
}
pmsg_unref(mp2);
/*
* create host table update message containing happy new hosts,
* send to each old host.
*/
mp2 = mesg_new(0);
mp2->m_tag = DM_HTUPD;
pkint(mp2, newhosts->ht_serial);
pkint(mp2, newhosts->ht_master);
pkint(mp2, newhosts->ht_cons);
pkint(mp2, newhosts->ht_cnt);
for (hh = newhosts->ht_last; hh > 0; hh--)
if (hp = newhosts->ht_hosts[hh]) {
pkint(mp2, hh);
pkstr(mp2, hp->hd_name);
pkstr(mp2, hp->hd_arch);
pkstr(mp2, inadport_hex(&hp->hd_sad));
pkint(mp2, hp->hd_mtu);
pkint(mp2, hp->hd_speed);
pkint(mp2, hp->hd_dsig);
}
for (hh = hosts->ht_last; hh > 0; hh--)
if (hh != hosts->ht_local && (hp = hosts->ht_hosts[hh])) {
mp2->m_ref++;
mp2->m_dst = hp->hd_hostpart | TIDPVMD;
wp3 = wait_new(WT_HTUPD);
wp3->wa_dep = wp2->wa_dep;
wp2->wa_mesg->m_ref++;
wp3->wa_mesg = wp2->wa_mesg;
wp3->wa_on = hp->hd_hostpart;
LISTPUTBEFORE(wp2, wp3, wa_peer, wa_rpeer);
mp2->m_wid = wp3->wa_wid;
sendmessage(mp2);
}
pmsg_unref(mp2);
/*
* update our host table
*/
gotnewhosts(newhosts, hosts);
/* XXX returning to normal state right here is a hack, we should
XXX wait for all the htupdacks to come back but we need the
XXX regular message service, hostfail entry, etc. */
for (hh = newhosts->ht_last; hh > 0; hh--)
if ((hp = newhosts->ht_hosts[hh]) && !hp->hd_err)
ht_insert(hosts, hp);
hosts->ht_serial = newhosts->ht_serial;
if (pvmdebmask & PDMHOST) {
pvmlogerror("startack() committing to new host table:\n");
ht_dump(hosts);
}
runstate = PVMDNORMAL;
ht_free(newhosts);
newhosts = 0;
/* if peered waitcs on htupdack already finished, send the reply */
if (wp2->wa_peer == wp2) {
busyadding = 0;
sendmessage(wp2->wa_mesg);
wp2->wa_mesg = 0;
}
wait_delete(wp2);
return 0;
}
/* dm_startack()
*
* The results come back from the pvmd' hoster to pvmd[master].
*
* DM_STARTACK(wid_rtn) {
* int count
* {
* int tid
* string result // line from slave pvmd or error token
* } [count]
* }
*/
int
dm_startack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp; /* wait context on pvmd' */
if (!(wp = wait_get(hp, mp, WT_HOSTSTART)))
return 0;
finack_to_host(hp);
startack(wp, mp);
return 0;
}
/* dm_task()
*
* (Remote or local) pvmd requests information about local task(s).
*
* DM_TASK(wid) {
* int where // 0 for all, hostpart, or full tid
* }
*/
int
dm_task(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct task *tp;
struct pmsg *mp2;
int where;
hp = hp;
if (upkuint(mp, &where)) {
pvmlogerror("dm_task() bad msg format\n");
return 0;
}
#ifdef SHMEM
mpp_setstatus(0); /* get status from global info table */
#endif
/* pack list of local tasks and reply to waiter */
mp2 = mesg_new(0);
mp2->m_dst = mp->m_src;
mp2->m_tag = DM_TASKACK;
mp2->m_wid = mp->m_wid;
if (where & tidlmask) {
if (tp = task_find(where)) {
pkint(mp2, tp->t_tid);
pkint(mp2, tp->t_ptid);
pkint(mp2, myhostpart);
pkint(mp2, tp->t_flag);
pkstr(mp2, tp->t_a_out ? tp->t_a_out : "");
pkint(mp2, tp->t_pid);
}
} else {
for (tp = locltasks->t_link; tp != locltasks; tp = tp->t_link) {
pkint(mp2, tp->t_tid);
pkint(mp2, tp->t_ptid);
pkint(mp2, myhostpart);
pkint(mp2, tp->t_flag);
pkstr(mp2, (tp->t_a_out ? tp->t_a_out : ""));
pkint(mp2, tp->t_pid);
}
}
sendmessage(mp2);
return 0;
}
/* dm_taskack()
*
* Reply to DM_TASK op.
*
* DM_TASKACK(wid_rtn) {
* {
* int tid
* int ptid
* int hostpart
* int flag
* string a_out
* int pid
* } [] // implied
* }
*/
int
dm_taskack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
struct pmsg *mp2;
int i;
char *p;
if (!(wp = wait_get(hp, mp, WT_TASK)))
return 0;
/* append data to waiting message */
mp2 = wp->wa_mesg;
while (!upkint(mp, &i)) {
pkint(mp2, i); /* tid */
upkint(mp, &i); /* ptid */
pkint(mp2, i);
upkint(mp, &i); /* host */
pkint(mp2, i);
upkint(mp, &i); /* flag */
pkint(mp2, i);
upkstralloc(mp, &p); /* a.out name */
pkstr(mp2, p);
PVM_FREE(p);
upkint(mp, &i); /* pid */
pkint(mp2, i);
}
/* send message if we're the last waiter */
if (wp->wa_peer == wp) {
mp2->m_ref++;
sendmessage(mp2);
}
wait_delete(wp);
return 0;
}
/* dm_delhost()
*
* (Master pvmd) gets request to delete hosts.
*
* DM_DELHOST(wid) {
* int count
* string names[count]
* }
*/
int
dm_delhost(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int count;
char *buf;
struct pmsg *mp2; /* DELHOSTACK message */
struct pmsg *mp3; /* HTDEL message */
struct htab *ht_del; /* hosts to delete */
struct htab *ht_save; /* remaining hosts */
int hh;
/* XXX danger, this doesn't check if already doing a host add/delete */
/* sanity check count */
if (upkint(mp, &count)) {
pvmlogerror("dm_delhost() bad msg format\n");
return 0;
}
if (count < 1 || count > (tidhmask >> (ffs(tidhmask) - 1))) {
pvmlogerror("dm_delhost() bad count\n");
return 0;
}
/*
* read host names from message, generate delete and save sets
* and a DELHOSTACK reply message with result code for each host.
* set SHUTDOWN flag for each host in delete set.
*/
ht_del = ht_new(1);
ht_save = ht_new(1);
ht_merge(ht_save, hosts);
mp2 = mesg_new(0);
mp2->m_tag = DM_DELHOSTACK;
mp2->m_wid = mp->m_wid;
mp2->m_dst = mp->m_src;
mp3 = mesg_new(0);
mp3->m_tag = DM_HTDEL;
pkint(mp3, hosts->ht_serial);
pkint(mp2, count);
while (count-- > 0) {
upkstralloc(mp, &buf);
if (hp = nametohost(hosts, buf)) {
if (tidtohost(ht_del, hp->hd_hostpart)) {
pkint(mp2, PvmDupHost);
} else {
if (hp->hd_hostpart == myhostpart)
pkint(mp2, PvmBadParam);
else {
ht_insert(ht_del, hp);
ht_delete(ht_save, hp);
pkint(mp3, hp->hd_hostpart);
fin_to_host(hp);
pkint(mp2, 0);
}
}
} else
pkint(mp2, PvmNoHost);
PVM_FREE(buf);
}
/*
* send HTDEL message to all hosts in ht_save set except us
*/
for (hh = ht_save->ht_last; hh > 0; hh--)
if (hh != hosts->ht_local && (hp = ht_save->ht_hosts[hh])) {
mp3->m_ref++;
mp3->m_dst = hp->hd_hostpart | TIDPVMD;
sendmessage(mp3);
}
pmsg_unref(mp3);
/* reply to host that requested DELHOST operation */
sendmessage(mp2);
ht_free(ht_del);
ht_free(ht_save);
return 0;
}
/* dm_delhostack()
*
* Reply to DM_DELHOST operation.
*
* DM_DELHOSTACK(wid_rtn) {
* int count // or negative for error
* int status[count] // status of each host
* }
*/
int
dm_delhostack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
if (!(wp = wait_get(hp, mp, WT_DELHOST)))
return 0;
pmsg_packbody(wp->wa_mesg, mp);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
wait_delete(wp);
return 0;
}
/* dm_null()
*
* No-op message.
*
* DM_NULL { }
*/
int
dm_null(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
hp = hp;
mp = mp;
/*
pvmlogprintf("dm_null() from %s\n", hp->hd_name);
*/
return 0;
}
/* dm_taskout()
*
* Stdout data from a task.
*
* DM_TASKOUT {
* int tid
* int length
* char data[length]
* }
*/
int
dm_taskout(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int tid;
int l;
char *p, *q, c;
char buf2[32];
char buf[4100]; /* XXX a bit bigger than in pvmd.c`loclstout() */
hp = hp;
if (upkuint(mp, &tid) || upkint(mp, &l) || l < 1)
return 0;
/* unpack data, leaving room at head of buffer */
p = buf + 32;
if (l > sizeof(buf) - (p - buf) - 2)
l = sizeof(buf) - (p - buf) - 2;
upkbyte(mp, p, l);
/* ends with "\n\0" */
if (p[l - 1] != '\n')
p[l++] = '\n';
p[l] = 0;
sprintf(buf2, "[t%x] ", tid);
l = strlen(buf2);
while (*p) {
for (q = p; *q++ != '\n'; ) ;
c = *q;
*q = 0;
BCOPY(buf2, p - l, l);
pvmlogerror(p - l);
*q = c;
p = q;
}
return 0;
}
/* dm_pstat()
*
* (Remote or local) pvmd requests status of a local task.
*
* DM_PSTAT(wid) {
* int tid
* }
*/
int
dm_pstat(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int tid;
struct pmsg *mp2;
hp = hp;
upkuint(mp, &tid);
if (tid == pvmmytid || task_find(tid))
tid = 0;
else
tid = PvmNoTask;
mp2 = mesg_new(0);
mp2->m_dst = mp->m_src;
mp2->m_tag = DM_PSTATACK;
mp2->m_wid = mp->m_wid;
pkint(mp2, tid);
sendmessage(mp2);
return 0;
}
/* dm_pstatack()
*
* Reply to DM_PSTAT op.
*
* DM_PSTATACK(wid_rtn) {
* int status
* }
*/
int
dm_pstatack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
if (!(wp = wait_get(hp, mp, 0)))
return 0;
pmsg_packbody(wp->wa_mesg, mp);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
wait_delete(wp);
return 0;
}
/* dm_halt()
*
* Task has requested whole machine to halt.
*
* DM_HALT { }
*/
int
dm_halt(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int hh;
pvmlogprintf("dm_halt() from (%s), halting...\n", hp->hd_name);
for (hh = hosts->ht_last; hh > 0; hh--) {
if (hh == hosts->ht_local || !(hp = hosts->ht_hosts[hh]))
continue;
finack_to_host(hp);
}
runstate = PVMDHALTING;
return 0;
}
/* dm_mca()
*
* Remote pvmd defines a new (single-use) multicast address, to be used
* by a subsequent message.
*
* DM_MCA {
* int gtid // multicast tid
* int count // number of addresses
* int tids[count] // addresses
* }
*/
int
dm_mca(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct mca *mcap;
int i;
/* unpack struct mca from message */
mcap = mca_new();
upkuint(mp, &mcap->mc_tid);
upkint(mp, &mcap->mc_ndst);
mcap->mc_dsts = TALLOC(mcap->mc_ndst, int, "mcad");
for (i = 0; i < mcap->mc_ndst; i++)
upkuint(mp, &mcap->mc_dsts[i]);
/* put on list of mcas of src host */
LISTPUTBEFORE(hp->hd_mcas, mcap, mc_link, mc_rlink);
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("dm_mca() mca %x from %s\n", mcap->mc_tid, hp->hd_name);
}
return 0;
}
/* dm_notify()
*
* Remote pvmd requests to be notified on an event.
*
* DM_NOTIFY(wid) {
* int what // event type, currenty only PvmTaskExit
* int tid // address
* }
*/
int
dm_notify(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int what, tid;
struct waitc *wp;
struct pmsg *mp2;
hp = hp;
upkint(mp, &what);
upkuint(mp, &tid);
if (what != PvmTaskExit) {
pvmlogprintf("dm_notify() what = %d?\n", what);
return 0;
}
mp2 = mesg_new(0);
mp2->m_dst = mp->m_src;
mp2->m_tag = DM_NOTIFYACK;
mp2->m_wid = mp->m_wid;
pkint(mp2, tid);
if (task_find(tid)) {
wp = wait_new(WT_TASKX);
wp->wa_on = tid;
wp->wa_tid = mp->m_src;
wp->wa_dep = mp->m_wid;
wp->wa_mesg = mp2;
} else {
sendmessage(mp2);
}
return 0;
}
/* dm_notifyack()
*
* Reply to DM_NOTIFY op.
*
* DM_NOTIFYACK(wid_rtn) { }
*/
int
dm_notifyack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
hp = hp;
if (!(wp = wait_get((struct hostd*)0, mp, 0)))
return 0;
if (wp->wa_tid && wp->wa_mesg) {
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
}
if ( wp->wa_tid == pvmmytid )
mb_tidy(wp->wa_on);
wait_delete(wp);
return 0;
}
/* dm_db()
*
* (Remote or local) pvmd requests a database op.
*
* DM_DB(wid) {
* int opcode // TMDB_PUT, TMDB_REMOVE, TMDB_GET, TMDB_NAMES
* int tid // owner task (XXX we're just taking its word, hahaha)
* string name
* int index
* int flags
* msg data // if TMDB_PUT
* }
*/
int
dm_db(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int opcode; /* op requested */
int tid;
int req; /* index requested */
int flags;
char *name = 0; /* class name */
struct pmsg *mp2 = 0; /* reply */
struct pmsg *mp3 = 0; /* data message */
struct waitc *wp, *wp2; /* wait ctx ptrs (notify, recvinfo */
struct pmsg *mp4 = 0; /* notify forward message */
struct hostd *hp2; /* remote notify host */
struct pvmmclass *np, *np2; /* reset pointers */
struct pvmmentry *ep, *ep2; /* reset pointers */
int *noresets; /* noreset tids */
int nnr; /* # of noreset tasks */
int found;
int cc;
int i;
int notified;
hp = hp;
if (upkint(mp, &opcode) || upkint(mp, &tid)
|| upkstralloc(mp, &name) || upkint(mp, &req) || upkint(mp, &flags))
goto badformat;
mp2 = mesg_new(0);
mp2->m_dst = mp->m_src;
mp2->m_tag = DM_DBACK;
mp2->m_wid = mp->m_wid;
switch (opcode) {
case TMDB_PUT:
mp3 = mesg_new(0);
if (pmsg_unpack(mp, mp3))
goto badformat;
if ((req = mb_insert(tid, name, req, flags, mp3)) < 0)
pmsg_unref(mp3);
else {
/* check for any pending requests for this mbox entry */
notified = 0;
for (wp = waitlist->wa_link; wp != waitlist; wp = wp2) {
wp2 = wp->wa_link;
if (wp->wa_kind == WT_RECVINFO) {
ep = (struct pvmmentry *) wp->wa_spec;
if ( !strcmp( (char *) ep->me_msg, name ) ) {
cc = mb_lookup(ep->me_tid, (char *) ep->me_msg,
ep->me_ind, ep->me_flags, &mp3);
if ( cc != PvmNotFound ) {
pkint(wp->wa_mesg, cc);
if (mp3) {
pmsg_pack(wp->wa_mesg, mp3);
pmsg_unref(mp3);
}
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
PVM_FREE(ep->me_msg);
PVM_FREE(ep);
wait_delete(wp);
}
}
}
/* check if task needs mbox notify for mb_tidy()... */
else if (wp->wa_kind == WT_TASKX) {
if ( wp->wa_on == tid && wp->wa_tid == pvmmytid )
notified++;
}
}
/* create mbox notify for mb_tidy() cleanup... */
if ( !notified ) {
/* dummy notify for clean up */
wp = wait_new(WT_TASKX);
wp->wa_on = tid;
wp->wa_tid = pvmmytid;
wp->wa_dep = 0;
wp->wa_mesg = (struct pmsg *) NULL;
/* pass on to non-master host */
hp2 = tidtohost(hosts, tid);
if ( hp2 && hp2->hd_hostpart != myhostpart ) {
mp4 = mesg_new(0);
pkint(mp4, PvmTaskExit);
pkint(mp4, tid);
mp4->m_dst = hp2->hd_hostpart | TIDPVMD;
mp4->m_tag = DM_NOTIFY;
mp4->m_wid = wp->wa_wid;
sendmessage(mp4);
}
}
}
pkint(mp2, req);
break;
case TMDB_REMOVE:
req = mb_delete(tid, name, req, flags);
pkint(mp2, req);
break;
case TMDB_GET:
cc = mb_lookup(tid, name, req, flags, &mp3);
if ( cc == PvmNotFound && (flags & PvmMboxWaitForInfo) ) {
ep = me_new(req);
ep->me_tid = tid;
ep->me_msg = (struct pmsg *) name; /* XXX ouch, overload */
ep->me_flags = flags;
wp = wait_new(WT_RECVINFO);
wp->wa_on = tid;
wp->wa_tid = pvmmytid;
wp->wa_dep = mp->m_wid;
wp->wa_mesg = mp2;
wp->wa_spec = (void *) ep;
return 0;
} else {
pkint(mp2, cc);
if (mp3) {
pmsg_pack(mp2, mp3);
pmsg_unref(mp3);
}
}
break;
case TMDB_NAMES:
pkint(mp2, 0);
req = mb_names(tid, name, mp2);
break;
case TMDB_RESET:
if ( upkint(mp, &nnr) )
goto badformat;
noresets = TALLOC( nnr, int, "int" );
for ( i=0 ; i < nnr ; i++ ) {
if ( upkint(mp, &(noresets[i])) ) {
PVM_FREE(noresets);
goto badformat;
}
}
pkint(mp2, 0);
for (np = pvmmboxclasses->mc_link; np != pvmmboxclasses;
np = np2) {
np2 = np->mc_link;
/* If name passed in, only wipe mboxes in that class */
if ( *name == '\0' || !strcmp( np->mc_name, name ) ) {
for (ep = np->mc_ent->me_link; ep != np->mc_ent; ep = ep2) {
ep2 = ep->me_link;
/* If index passed in, only wipe that mbox */
/* -1 == All Entries */
if ( req < 0 || req == ep->me_ind ) {
/* Check for Persistency (that thorn in my "side" :-) */
if ( ep->me_flags & PvmMboxPersistent ) {
/* Task Already Gone? Spank It. */
if (!(ep->me_tid)) {
me_free(np, ep);
if (np2->mc_rlink != np)
break;
}
/* Check for No-Reset Task */
/* (Only if actually killing tasks...) */
else if ( flags ) { /* killtasks */
for ( i=0, found=0 ; i < nnr && !found ; i++ )
if ( noresets[i] == ep->me_tid )
found++;
/* Not a No-Reset, It WILL die soon. */
/* Wait for cleanup. */
if ( !found ) {
wp = wait_new(WT_RESET);
wp->wa_on = ep->me_tid;
wp->wa_tid = pvmmytid;
wp->wa_dep = 0;
wp->wa_mesg = (struct pmsg *) NULL;
/* pass on to non-master host */
hp2 = tidtohost(hosts, ep->me_tid);
if ( hp2 && hp2->hd_hostpart != myhostpart )
{
mp4 = mesg_new(0);
pkint(mp4, ep->me_tid);
mp4->m_dst = hp2->hd_hostpart | TIDPVMD;
mp4->m_tag = DM_RESET;
mp4->m_wid = wp->wa_wid;
sendmessage(mp4);
}
}
}
}
}
}
}
}
PVM_FREE(noresets);
break;
default:
goto badformat;
}
sendmessage(mp2);
PVM_FREE(name);
return 0;
badformat:
pvmlogerror("dm_db() bad msg format\n");
if (name)
PVM_FREE(name);
if (mp2)
pmsg_unref(mp2);
return 0;
}
/* dm_dback()
*
* Reply to DM_DB op.
*
* DM_DBACK(wid_rtn) {
* int status // and index
* if TMDB_PUT, TMDB_REMOVE, TMDB_GET
* msg data // if TMDB_GET
* else if TMDB_NAMES
* string names[] // list of names, length implied
* }
*/
int
dm_dback(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
if (!(wp = wait_get(hp, mp, WT_DB)))
return 0;
pmsg_packbody(wp->wa_mesg, mp);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
wait_delete(wp);
return 0;
}
/* dm_reset()
*
* Remote pvmd requests to be notified when task is reset.
* - for mbox cleanup...
*
* DM_RESET(wid) {
* int tid // task id of persistent mbox owner
* }
*/
int
dm_reset(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int tid;
struct waitc *wp;
struct pmsg *mp2;
hp = hp;
upkuint(mp, &tid);
mp2 = mesg_new(0);
mp2->m_dst = mp->m_src;
mp2->m_tag = DM_RESETACK;
mp2->m_wid = mp->m_wid;
pkint(mp2, tid);
if (task_find(tid)) {
wp = wait_new(WT_RESET);
wp->wa_on = tid;
wp->wa_tid = mp->m_src;
wp->wa_dep = mp->m_wid;
wp->wa_mesg = mp2;
} else {
sendmessage(mp2);
}
return 0;
}
/* dm_resetack()
*
* Reply to DM_RESET op.
*
* DM_RESETACK(wid_rtn) { }
*/
int
dm_resetack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
hp = hp;
if (!(wp = wait_get((struct hostd*)0, mp, 0)))
return 0;
if (wp->wa_tid && wp->wa_mesg) {
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
}
mb_tidy_reset(wp->wa_on);
wait_delete(wp);
return 0;
}
int
dm_htdel(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
int serial;
int tid;
if (hp != hosts->ht_hosts[hosts->ht_master]) {
pvmlogprintf("dm_htdel() from t%x (not master)?\n", mp->m_src);
return 0;
}
if (upkint(mp, &serial)) {
pvmlogerror("dm_htdel() bad format\n");
return 0;
}
if (serial != hosts->ht_serial) {
pvmlogprintf("dm_htdel() for serial %d, current is %d?\n",
serial, hosts->ht_serial);
return 0;
}
while (!upkuint(mp, &tid)) {
if (hp = tidtohost(hosts, tid)) {
if (pvmdebmask & PDMHOST) {
pvmlogprintf("dm_htdel() host %s\n", hp->hd_name);
}
hostfailentry(hp);
ht_delete(hosts, hp);
if (newhosts)
ht_delete(newhosts, hp);
}
}
return 0;
}
/* dm_hostsync()
*
* Request time of day clock sample
*
* DM_HOSTSYNC(wid) { }
*/
int
dm_hostsync(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct pmsg *mp2;
struct timeval now;
mp = mp;
hp = hp;
mp2 = mesg_new(0);
mp2->m_dst = mp->m_src;
mp2->m_tag = DM_HOSTSYNCACK;
mp2->m_wid = mp->m_wid;
gettimeofday(&now, (struct timezone *)0);
pkint(mp2, (int)now.tv_sec);
pkint(mp2, (int)now.tv_usec);
sendmessage(mp2);
return 0;
}
/* dm_hostsyncack()
*
* Clock sample comes back
*
* DM_HOSTSYNCACK(wid_rtn) {
* int sec
* int usec
* }
*/
int
dm_hostsyncack(hp, mp)
struct hostd *hp;
struct pmsg *mp;
{
struct waitc *wp;
int i;
if (!(wp = wait_get(hp, mp, WT_HOSTSYNC)))
return 0;
pkint(wp->wa_mesg, 0);
upkuint(mp, &i);
pkint(wp->wa_mesg, i);
upkuint(mp, &i);
pkint(wp->wa_mesg, i);
sendmessage(wp->wa_mesg);
wp->wa_mesg = 0;
wait_delete(wp);
return 0;
}
struct mca *
mca_new()
{
struct mca *mcap;
if (mcap = TALLOC(1, struct mca, "mca")) {
mcap->mc_link = mcap->mc_rlink = mcap;
mcap->mc_tid = mcap->mc_ndst = 0;
mcap->mc_dsts = 0;
}
return mcap;
}
void
mca_free(mcap)
struct mca *mcap;
{
LISTDELETE(mcap, mc_link, mc_rlink);
if (mcap->mc_dsts)
PVM_FREE(mcap->mc_dsts);
PVM_FREE(mcap);
}
struct pmsg *
mesg_new(master)
int master;
{
struct pmsg *mp;
if (mp = pmsg_new(master)) {
mp->m_src = pvmmytid;
pmsg_setenc(mp, 0x10000000); /* PvmDataDefault */
(mp->m_codef->enc_init)(mp);
}
return mp;
}
syntax highlighted by Code2HTML, v. 0.9.1