static char rcsid[] =
"$Id: lpvm.c,v 1.69 2004/01/14 18:50:55 pvmsrc Exp $";
/*
* PVM version 3.4: Parallel Virtual Machine System
* University of Tennessee, Knoxville TN.
* Oak Ridge National Laboratory, Oak Ridge TN.
* Emory University, Atlanta GA.
* Authors: J. J. Dongarra, G. E. Fagg, M. Fischer
* G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
* P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
* (C) 1997 All Rights Reserved
*
* NOTICE
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby granted
* provided that the above copyright notice appear in all copies and
* that both the copyright notice and this permission notice appear in
* supporting documentation.
*
* Neither the Institutions (Emory University, Oak Ridge National
* Laboratory, and University of Tennessee) nor the Authors make any
* representations about the suitability of this software for any
* purpose. This software is provided ``as is'' without express or
* implied warranty.
*
* PVM version 3 was funded in part by the U.S. Department of Energy,
* the National Science Foundation and the State of Tennessee.
*/
/*
* lpvm.c
*
* Libpvm core for unix environment.
*
* $Log: lpvm.c,v $
* Revision 1.69 2004/01/14 18:50:55 pvmsrc
* Added new AIX5* arches.
* (Spanker=kohl)
*
* Revision 1.68 2003/02/04 20:03:04 pvmsrc
* Got rid of Jigen's goofy my_errno...
* - never resolved anywhere...
* (Spanker=kohl)
*
* Revision 1.67 2002/04/16 15:06:01 pvmsrc
* WIN32 Fixes for Multiple Domains.
* - submitted by Jigen Zhou <jigen@icemcfd.com>.
* (Spanker=kohl)
*
* Revision 1.66 2002/02/18 19:18:33 pvmsrc
* Added "|| defined(IMA_SUN4SOL2)" to "const void *i, *j" #if
* for int_compare() / qsort()...
* (needed for sure for 64 bit) Solaris)
* (Spanker=kohl)
*
* Revision 1.65 2001/12/07 16:06:09 pvmsrc
* Oops! Some new Unix fixes & features break Windows (of course!).
* - #ifdef-ed away fcntl() calls for non-blocking sockets / stdin.
* (one for bug fix in lpvm.c, one for check_ext_input() in pvmd.c)
* (Spanker=kohl)
*
* Revision 1.64 2001/09/25 21:20:17 pvmsrc
* Minor TMPNAMFUN()/tmpnam() cleanup.
* - moved macro def to pvm3.h, renamed PVMTNPMAN().
* - same for LEN_OF_TMP_NAM -> PVMTMPNAMLEN.
* - mostly a huge waste of time, since *both* tmpnam() & mktemp()
* produce the same "dangerous" warning message in Linux/gcc...
* - damn.
* (Spanker=kohl)
*
* Revision 1.63 2001/09/25 17:28:09 pvmsrc
* Fixed leftover tmpnam() usage -> use TMPNAMFUN() instead...
* - also got rid of "double" call to tmpnam()...
* (Spanker=kohl)
*
* Revision 1.62 2001/05/11 19:40:50 pvmsrc
* Fixed direct-routing bug, removed blocking socket deadlock.
* - fix submitted by Kamil Iskra <kamil@wins.uva.nl> (as well as
* <jc@esi.fr>, Dick van Albada <dick@wins.uva.nl>, and
* Zeger Hendrikse <zegerh@wins.uva.nl>).
* (Spanker=kohl)
*
* Revision 1.61 2001/05/11 19:21:29 pvmsrc
* Added "&& errno != EAGAIN" to select() return code check in mxfer().
* - reported by "Johannes Hennecke" <Johannes.Hennecke@ELSA.de>
* (Spanker=kohl)
*
* Revision 1.60 2001/05/11 17:32:26 pvmsrc
* Eliminated references to sys_errlist & sys_nerr.
* - unnecessary, and we're whacking that crap anyway.
* (Spanker=kohl)
*
* Revision 1.59 2001/02/07 23:14:04 pvmsrc
* First Half of CYGWIN Check-ins...
* (Spanker=kohl)
*
* Revision 1.58 2000/06/15 17:51:49 pvmsrc
* Fixed bug in WIN32 direct routing.
* - stupid #endif in the wrong place, pvm_fd_add() call whacked.
* - turned back on direct routing default and setopt.
* (Spanker=kohl)
*
* Revision 1.57 2000/02/17 23:12:10 pvmsrc
* *** Changes for new BEOLIN port ***
* - MPP-like, similar to SP2, etc.
* - submitted by Paul Springer <pls@smokeymt.jpl.nasa.gov>.
* - format-checked & cleaned up by Jeembo... :-)
* (Spanker=kohl)
*
* Revision 1.56 2000/02/16 21:59:40 pvmsrc
* Fixed up #include <sys/types.h> stuff...
* - use <bsd/sys/types.h> for IMA_TITN...
* - #include before any NEEDMENDIAN #includes...
* (Spanker=kohl)
*
* Revision 1.55 2000/02/07 22:22:07 pvmsrc
* Hack to help with select()/fd_sets in WIN32:
* - fd_set is *NOT* a bit field in WIN32, is a circular buffer.
* - must check for !FD_ISSET() before FD_SET() else duplicate
* entries possible, and fd_set buffer overflow/overwrite.
* - similary, FD_ISSET() check before FD_CLR().
* - patch submitted by "Bruce W. Church" <bwc1@cornell.edu>.
* (Spanker=kohl)
*
* Revision 1.54 1999/11/08 17:44:29 pvmsrc
* SGI compiler cleanup.
* (Spanker=kohl)
*
* Revision 1.53 1999/07/08 18:59:53 kohl
* Fixed "Log" keyword placement.
* - indent with " * " for new CVS.
*
* Revision 1.52 1999/03/15 19:05:58 pvmsrc
* Replaced sleep() calls with pvmsleep().
* - Unix / Win32, C / Fortran compat...
* (Spanker=kohl)
*
* Revision 1.51 1999/03/04 22:09:33 pvmsrc
* New SHMD #ifdefs for pvm_psend() & pvm_precv().
* More direct conn debugging output.
* (Spanker=kohl)
*
* Revision 1.50 1999/02/12 17:43:38 pvmsrc
* Improved error messages in mksocs() for connect().
* - dump out the socket file being tried, in case some BOZOS (like
* the people at Harris :-) put an echo statement into the
* pvm3/lib/pvmd script... D-OH!
* (Spanker=kohl)
*
* Revision 1.49 1998/11/20 20:04:00 pvmsrc
* Changes so that win32 will compile & build. Also, common
* Changes so that compiles & builds on NT. Also
* common source on win32 & unix.
* (Spanker=sscott)
*
* Revision 1.48 1998/10/02 15:43:58 pvmsrc
* Single source code merge of Win32 and Unix code.
* (Spanker=sscott)
*
* Revision 1.47 1998/09/30 18:08:32 pvmsrc
* Fixed (I think :-) the direct connection initialization protocol.
* - if two tasks tried to direct connect to each at the same time,
* the CONREQ control messages could cross, causing each task
* to fail on a connect().
* - instead, independent of Unix Domain / TCP (local / remote)
* socket, *always* have greater (numerically) task ID handle
* the direct connection, and have lower task ID just drop the
* CONREQ on the floor.
* - seems to work after extensive testing, both local & remote,
* here's hoping... :-)
* (Spanker=kohl)
*
* Revision 1.46 1998/09/23 15:25:23 pvmsrc
* Need to make char *username non-extern here.
* - username storage has to be somewhere for pvmd & libpvm.
* - pvmwin.c can't have the storage, because it wouold clash with
* pvmd.c, which needs to define username for non-WIN32 archs...
* - beat me with a shovel dammit.
* (Spanker=kohl)
*
* Revision 1.45 1998/08/13 18:32:45 pvmsrc
* Added special case for AIX4SP2 arch with SOCKLENISUINT.
* - on AIX 4.3 systems, I guess unsigned int is really
* unsigned int, and not size_t... D-Oh!
* - probably need a better constant name (or two) here...
* (Spanker=kohl)
*
* Revision 1.44 1998/07/24 17:11:44 pvmsrc
* Cleaned up use of SOCKLENISUINT / oslen.
* - use oslen for every socket-related call:
* * bind(), recvfrom(), getsockname() and accept().
* (Spanker=kohl)
*
* Revision 1.43 1998/03/02 19:27:32 pvmsrc
* Fixed PGON typo seg fault parentheses d-oh. (Phil P.)
* (Spanker=kohl)
*
* Revision 1.42 1998/02/23 22:51:29 pvmsrc
* Added AIX4SP2 stuff.
* (Spanker=kohl)
*
* Revision 1.41 1998/01/23 20:28:41 pvmsrc
* Changed arguments to mxinput so that it will return number of frags
* read and the number of messages.
* mxfer will now continue reading frags if called by probe or nrecv and
* mxinput returns frags > 0 but messages == 0.
* (Spanker=phil)
*
* Revision 1.40 1997/12/31 20:50:02 pvmsrc
* Cleaned Up System Message Handlers.
* - current send / recv buffers now saved before invocation of
* message handler functs.
* - removed manual rbf = setrbuf(mid) saving & resetting in
* handlers...
* (Spanker=kohl)
*
* Revision 1.39 1997/12/18 21:22:53 pvmsrc
* Applied perf patch for IBMs from Anders Alund / Nils Jonsson (PDC).
* - nissej@pdc.kth.se
* (Spanker=kohl)
*
* Revision 1.38 1997/12/01 19:20:36 pvmsrc
* Replaced #ifdef IMA_OS2 fd_set declarations:
* - new #ifdef FDSETNOTSTRUCT.
* - choose between "fd_set foo" and "struct fd_set foo"...
* (Spanker=kohl)
*
* Revision 1.37 1997/11/04 23:20:00 pvmsrc
* Cleaned up fd_set stuff (hopefully).
* Removed unused CINDEX() define.
* (Spanker=kohl)
*
* Revision 1.36 1997/10/24 15:17:46 pvmsrc
* Added TEV_DID_RCX to trace events for Receive Message Context.
* - in pvm_recv(), pvm_trecv(), pvm_nrecv(), and pvm_precv().
* (Spanker=kohl)
*
* Revision 1.35 1997/10/24 14:29:23 pvmsrc
* Added TEV_DID_MCX / pvmmyctx trace event info to:
* - pvm_send(), pvm_mcast(), pvm_recv(), pvm_trecv(), pvm_nrecv().
* - pvm_psend(), pvm_precv().
* (Spanker=kohl)
*
* Revision 1.34 1997/09/22 21:13:25 pvmsrc
* Added new pvmsettaskname() linkage (for shell-spawned tasks only!).
* - call pvmsettaskname() before joining PVM, sends task name
* (stored in new char *pvmmytaskname global) to pvmd in
* TM_CONN2 message.
* - appears in trace events and console ps.
* (Spanker=kohl)
*
* Revision 1.33 1997/09/10 21:37:43 pvmsrc
* putting socket startup to pvmbeatask. Markus
* (Spanker=fischer)
*
* Revision 1.32 1997/08/29 13:35:06 pvmsrc
* OS2 Port Submitted by Bohumir Horeni, horeni@login.cz.
* (Spanker=kohl)
*
* Revision 1.31 1997/08/06 22:43:10 pvmsrc
* Added new SGI6 and SGIMP6 arches.
*
* Revision 1.30 1997/06/27 18:04:17 pvmsrc
* Integrated WIN32 changes.
*
* Revision 1.29 1997/06/12 20:10:37 pvmsrc
* Made sure all communications for TC_* task control messages
* use the SYSCTX_TC system context.
* - some messages being sent in default context... D-Oh...
*
* Revision 1.28 1997/05/07 21:19:23 pvmsrc
* swapped logic around SOCKLENISINT so that it is now
* the default path and SOCKLENISUINT is the compile-time
* selected path.
*
* Now, AIX 4.1 will have to remove SOCKLENISUINT from
* their AIXxxx.def configure files.
*
* Revision 1.27 1997/05/07 18:37:13 pvmsrc
* AIX 3.2 vs 4.1 vs 4.2 problems resolved (I hope)
* Define oslen as size_t with compile flag SOCKLENISINT
* for AIX 4.1 to explicitly set to int.
* To use flag - set in conf/AIX46K.def.
* Tested with cc and gcc on all 3 OS versions.
*
* Revision 1.26 1997/05/05 15:19:40 pvmsrc
* context was not being set in pvmmcast.
*
* Revision 1.25 1997/05/01 14:11:46 pvmsrc
* SGI Compiler Warning Cleanup.
*
* Revision 1.24 1997/04/30 21:26:00 pvmsrc
* SGI Compiler Warning Cleanup.
*
* Revision 1.23 1997/04/29 20:29:12 pvmsrc
* Support new calling sequences defined in mppmsg.h
*
* Revision 1.22 1997/04/25 19:21:18 pvmsrc
* Handle inplace bodies of zero length correctly
*
* Revision 1.21 1997/04/24 21:06:59 pvmsrc
* Excised unused variables.
* Support for pvm_psend/precv constructs
*
* Revision 1.20 1997/04/21 14:58:28 pvmsrc
* Changed #ifdefs that checked IMA_RS6K,IMA_SP2MPI & IMA_AIX46K
* to see if select.h was needed into single define NEEDSSELECTH.
* New archs need to set this in conf/
*
* Revision 1.19 1997/04/07 21:09:17 pvmsrc
* pvm_addmhf() - new paramter interface
*
* Revision 1.18 1997/04/03 19:23:13 pvmsrc
* Added context for TM_xxx messages
*
* Revision 1.17 1997/04/01 21:28:11 pvmsrc
* Damn Damn Damn.
* - pvm_recvinfo() returns a bufid, not an index. Damn.
*
* Revision 1.16 1997/04/01 20:48:17 pvmsrc
* Fixed tracer mbox usage:
* - pvm_getinfo() -> pvm_recvinfo(), new semantics handled (recvinfo
* sets rbuf implicitly, a la pvm_recv, need to save rbuf).
*
* Revision 1.15 1997/03/27 19:55:27 pvmsrc
* Fixed up pvmbeatask() to go get tracer info if spawned from shell:
* - env var info including trace mask, trace buffer size, trace opts.
* - use PVMTRACERCLASS mbox entry to fill in values, if matches
* on trctid, trcctx, and trctag.
*
* Revision 1.14 1997/03/07 15:12:11 pvmsrc
* Fixed annoying SGI warnings for qsort()'s int_compare() function...
*
* Revision 1.13 1997/03/06 21:50:17 pvmsrc
* Yanked out #includes for <netinet/in.h> and <netinet/tcp.h>.
* - dups with lpvm.h #includes...
*
* Revision 1.12 1997/03/06 21:06:10 pvmsrc
* - added include for lmsg.h, host.h
* - for mpp's added pvmmimd.h, mppmsg.h
* - moved struct ttpcb definition to header file lpvm.h
* - for mpps added call to mpp_ttpcb_find inside of ttpcb_find
* - mxinput rewritten to handle mpp input streams when ifdef'ed on
* - Added a stack of inplace headers instead of just one dummy header.
* so we can do asynch header transmission
* - in mxfer:
* logic written in to handle mpp streams, including bypass of
* reading routes if writing, posting a complete message for
* asynch send before checking for read routes, pass information
* to pvm_node_send (on mpps) about inplace sending so that
* pvm_node_send can be optimized for inplace (not optimized yet :-))
* - in mroute:
* direct routing socket setup turned off for MPPs
* - in pvmbeatask:
* call pvm_mpp_beatask for mpps
* don't do socket auth dance for mpps (sockets not used)
* - added routine ogm_complete to test if an outgoing message is
* complete (when sent asychronously). Returns TRUE for workstations
* since writes are synchronous
*
* Revision 1.11 1997/01/28 19:26:21 pvmsrc
* New Copyright Notice & Authors.
*
* Revision 1.10 1996/12/18 22:27:43 pvmsrc
* Extracted duplicate versions of routines from lpvm/mimd/shmem.c,
* inserted into shared lpvmgen.c:
* - pvmbailout().
* - pvmlogerror().
* - vpvmlogprintf(), pvmlogprintf(). (hope these work on MPP & shmem)
* - pvmlogperror().
*
* Revision 1.9 1996/10/25 13:57:20 pvmsrc
* Replaced old #includes for protocol headers:
* - <pvmsdpro.h>, "ddpro.h", "tdpro.h"
* With #include of new combined header:
* - <pvmproto.h>
*
* Revision 1.8 1996/10/24 22:44:31 pvmsrc
* Modified for New Tracing Facility:
* - moved #include "global.h" below other #include's for typing.
* - removed extra #include <pvm3.h> in lpvm.c...
* - added #include of new "lpvm.h" to replace explicit externs.
* - removed common control message handlers from lpvm.c:
* * extracted to lpvmgen.c for general usage.
* * pvm_tc_noop(), pvm_tc_settmask(), pvm_tc_siblings().
* -> lpvmmimd.c & lpvmshmem.c still need remainder of pvmmctl()
* replaced with control message handlers.
* - arg typing hassles with int_compare() / qsort() exacerbated...
* - modified pvmbeatask():
* * handle new tracing info, unpack tracing and output collection
* parameters into temp storage, and then check for local task
* override before applying.
* * read in new tracing env vars PVMTRCBUF & PVMTRCOPT.
* * install new common message handlers.
* * call new tev_init() routine to set up tracing stuff.
* * use new Pvmtracer structures (pvmtrc & pvmctrc) to store info.
* - removed pvm_getopt() & pvm_setopt() -> moved to common lpvmgen.c.
* - removed old tev_begin(), tev_fin() & tev_do_trace() routines.
* - updated trace event generation for pvm_getfds(), pvm_start_pvmd(),
* pvm_precv(), pvm_psend().
*
* Revision 1.7 1996/10/14 19:17:06 pvmsrc
* Used ARCHFLAG SOCKLENISUINT where socket length is unsigned int
*
* Revision 1.6 1996/10/09 21:48:19 pvmsrc
* Problem:
* --------
* The problem was the result of AIX4.2 changing the expected datatype of
* the XxxxLength parameter in the following system function calls:
*
* int bind (Socket, Name, NameLength)
* int getsockname (Socket, Name, NameLength)
* int recvfrom (Socket, Buffer, Length, Flags, From, FromLength)
* int accept (Socket, Address, AddressLength)
*
* The compiler warning message generated was:
* Function argument assignment between types "unsigned long*" and "int*"
* is not allowed.
*
* Action:
* -------
* In PVM functions where oslen was already declared_
* changed declaration from: int oslen;
* to: unsigned int oslen;
*
* In PVM functions where a local "temporary" variable such as i, j, etc.
* was used_
* added declaration of: unsigned int oslen;
* and then changed to use oslen where necessary.
*
* Revision 1.5 1996/10/04 13:17:23 pvmsrc
* New context was generated on each spawn. This is bad.
*
* Revision 1.4 1996/09/24 15:15:46 pvmsrc
* Test failure... fixed.
*
* Revision 1.3 1996/09/24 15:12:19 kohl
* Test check in...
*
* Revision 1.2 1996/09/23 23:30:53 pvmsrc
* Initial Creation - original lpvm.c.
*
* Revision 1.32 1995/11/02 16:07:10 manchek
* added NEEDSENDIAN switch.
* must declare ptr to sys_errlist const in pvmlogperror for FREEBSD.
* free replies to control messages in mxfer
*
* Revision 1.31 1995/09/06 17:37:24 manchek
* aargh, forgot pvm_precv
*
* Revision 1.30 1995/09/06 17:30:32 manchek
* pvm_psend returns not implemented instead of bad param for string type
*
* Revision 1.29 1995/07/28 16:40:58 manchek
* wrap HASERRORVARS around errno declarations
*
* Revision 1.28 1995/07/28 16:04:05 manchek
* switch endian includes on flag, not arch name
*
* Revision 1.27 1995/07/19 21:43:04 manchek
* better mxfer logging
*
* Revision 1.26 1995/07/19 21:27:49 manchek
* use pvmnametag to give symbolic message tags
*
* Revision 1.25 1995/07/18 16:59:03 manchek
* added code to generate and check crc on each message (MCHECKSUM)
*
* Revision 1.24 1995/07/03 19:05:35 manchek
* removed POWER4 arch ifdefs
*
* Revision 1.23 1995/06/28 18:18:07 manchek
* do-nothing check_for_exit so one can be in lpvmshmem.c
*
* Revision 1.22 1995/06/19 17:35:05 manchek
* addr sometimes wasn't set in pvm_tc_conreq, causing segfault
*
* Revision 1.21 1995/06/12 15:56:14 manchek
* added PGON partition size support
*
* Revision 1.20 1995/06/02 17:19:14 manchek
* added check in mxfer() for when outgoing route closed during input
*
* Revision 1.19 1995/06/01 14:43:54 manchek
* pvm_start_pvmd ignores INT, QUIT, TSTP signals
*
* Revision 1.18 1995/05/30 17:28:08 manchek
* added ifdefs for SP2MPI arch
*
* Revision 1.17 1995/05/17 17:06:54 manchek
* added PVMTASKDEBUG envar
*
* Revision 1.16 1995/05/17 16:16:36 manchek
* use FDSETISINT.
* use umbuf_get and put instead of ALLOC.
* in mksocs, read pvmd sockaddr file only once.
* in pvmbeatask, read altpid from environment each time.
* add PvmPollTime and Type, NotImplemented.
* pvm_start_pvmd reads sockaddr from pvmd instead of sleeping on addr file,
* sets PVMSOCK envar from return.
* allows better synchronization, master host overloading.
*
* Revision 1.15 1995/02/01 21:08:46 manchek
* error 4 is now PvmOverflow.
* connect is retried in case of EINTR
*
* Revision 1.14 1994/12/20 16:28:19 manchek
* added PvmShowTids case to setopt.
* removed EOF message on pvmd socket close
*
* Revision 1.13 1994/10/15 19:07:31 manchek
* don't use fd_sets when select returns -1; will hang reading.
* cast message tags for comparison as integers
*
* Revision 1.12 1994/09/02 15:23:37 manchek
* fixed segfaults in setopt when task not connected
*
* Revision 1.11 1994/07/18 19:20:18 manchek
* fix to call write() with max 4096 length for RS6K
*
* Revision 1.10 1994/06/21 19:36:23 manchek
* forgot to ifdef tt_spath
*
* Revision 1.9 1994/06/21 18:31:31 manchek
* connect in mksocs() tries several times before giving up.
* don't setsockopt() at TCP level on Unix task-task sockets
*
* Revision 1.8 1994/06/04 21:44:57 manchek
* added unix domain sockets
*
* Revision 1.7 1994/06/03 20:38:15 manchek
* version 3.3.0
*
* Revision 1.6 1993/11/30 23:50:01 manchek
* set nodelay and snd, rcv buffer sizes on t-t sockets
*
* Revision 1.5 1993/11/30 15:51:30 manchek
* beatask complains if it can't write d-auth file (fs full?)
*
* Revision 1.4 1993/10/04 20:29:05 manchek
* mksocks() and pvm_start_pvmd() now use pvmdsockfile(), not TDSOCKNAME.
* made pvm_useruid global for pvmcruft.c
*
* Revision 1.3 1993/10/04 19:10:22 manchek
* mctl() conflicts with mctl(2) - declare static for now
*
* Revision 1.2 1993/09/16 21:36:20 manchek
* pvm_start_pvmd() was freeing the return string from pvmgetpvmd()
*
* Revision 1.1 1993/08/30 23:26:48 manchek
* Initial revision
*
*/
#ifdef WIN32
#include "pvmwin.h"
#include <process.h>
#include <stdlib.h>
#include <search.h>
#endif
#include <stdio.h>
#ifdef IMA_TITN
#include <bsd/sys/types.h>
#else
#include <sys/types.h>
#endif
#ifdef NEEDMENDIAN
#include <machine/endian.h>
#endif
#ifdef NEEDENDIAN
#include <endian.h>
#endif
#ifdef NEEDSENDIAN
#include <sys/endian.h>
#endif
#include <pvm3.h>
#if defined(WIN32) || defined(CYGWIN)
#include "..\xdr\types.h"
#include "..\xdr\xdr.h"
#else
#include <rpc/types.h>
#include <rpc/xdr.h>
#endif
#ifndef WIN32
#include <sys/socket.h>
#endif
#ifndef NOUNIXDOM
#include <sys/un.h>
#endif
#ifdef NEEDSSELECTH
#include <sys/select.h>
#endif
#include <sys/stat.h>
#include <fcntl.h>
#ifdef SYSVSTR
#include <string.h>
#else
#include <strings.h>
#endif
#include <errno.h>
#include <signal.h>
#ifdef IMA_BEOLIN
#include <netdb.h>
#endif
#include <pvmproto.h>
#include "pvmalloc.h"
#include "pvmfrag.h"
#include "pmsg.h"
#include "listmac.h"
#include "tvdefs.h"
#include "bfunc.h"
#include "lpvm.h"
#include <pvmtev.h>
#include "tevmac.h"
#include "host.h"
#include "waitc.h"
#include "global.h"
#include "lmsg.h"
#ifdef IMA_MPP
#include "pvmmimd.h"
#include "mppmsg.h"
#endif
#ifdef PVM_SHMD
/* need the prototypes for the pvm_shmd send and receive calls */
#include "../shmd/shmdproto.h"
#endif
#ifndef max
#define max(a,b) ((a)>(b)?(a):(b))
#endif
#ifndef min
#define min(a,b) ((a)<(b)?(a):(b))
#endif
#ifndef TTSOCKBUF
#define TTSOCKBUF 0x8000
#endif
/***************
** Globals **
** **
***************/
char *getenv();
void hex_inadport __ProtoGlarp__ (( char *, struct sockaddr_in * ));
extern char *inadport_decimal();
extern char *inadport_hex();
char *pvmgetpvmd();
char *pvmdsockfile();
char *pvmnametag();
struct pmsg *midtobuf();
struct pmsg *umbuf_new();
char *debug_flags();
struct msgid *pvm_inprecv = (struct msgid *) NULL;
#ifndef HASERRORVARS
extern int errno; /* from libc */
#endif
#if defined(IMA_PGON)
int pvmpgonpartsize = -1; /* FE prog, no partition */
#endif
/***************
** Private **
** **
***************/
static int mxfersingle = 1; /* mxfer returns after single frag */
static struct sockaddr_in pvmourinet; /* our host ip addr */
static struct ttpcb *ttlist = 0; /* dll of connected tasks */
static struct ttpcb *topvmd = 0; /* default route (to pvmd) */
static int pvmnfds = 0; /* 1 + highest bit set in fds */
#ifdef FDSETNOTSTRUCT
static fd_set pvmrfds; /* rd fdset for mxfer() */
#else
static struct fd_set pvmrfds; /* rd fdset for mxfer() */
#endif
static struct pmsg *txlist[100]; /* queue for when mroute reentered */
static int txrp = 0; /* txlist read pointer */
static int txwp = 0; /* txlist write pointer */
#ifdef WIN32
char *username = 0;
int system_loser_win;
#ifndef IMA_WIN32_WATCOM
extern char **environ;
#endif
#ifndef SOCK_DEFINES
WSADATA WSAData;
extern int nAlert;
#define SOCK_DEFINES
#endif
#endif
#ifdef WIN32
int int_compare(const void *i, const void *j);
#endif
/**************************
** Internal Functions **
** **
**************************/
int
pvm_fd_add(fd, sets)
int fd; /* the fd */
int sets; /* which sets */
{
#ifdef SANITY
#ifndef WIN32
if (fd < 0 || fd >= FD_SETSIZE) {
pvmlogprintf("pvm_fd_add() bad fd %d\n", fd);
return 1;
}
#endif
#endif
if (sets & 1)
if ( !FD_ISSET(fd, &pvmrfds) ) {
FD_SET(fd, &pvmrfds);
#ifdef WIN32
pvmnfds++;
#endif
}
/*
if (sets & 2)
if ( !FD_ISSET(fd, &pvmwfds) ) {
FD_SET(fd, &pvmwfds);
#ifdef WIN32
pvmnwfds++;
#endif
}
if (sets & 4)
if ( !FD_ISSET(fd, &pvmefds) ) {
FD_SET(fd, &pvmefds);
#ifdef WIN32
pvmnefds++;
#endif
}
*/
#ifndef WIN32
/* if this is new highest, adjust nfds */
if (fd >= pvmnfds)
pvmnfds = fd + 1;
#endif
return 0;
}
pvm_fd_delete(fd, sets)
int fd; /* the fd */
int sets; /* which sets */
{
#ifdef SANITY
#ifndef WIN32
if (fd < 0 || fd >= FD_SETSIZE) {
pvmlogprintf("pvm_fd_delete() bad fd %d\n", fd);
return 1;
}
#endif
#endif
if (sets & 1)
if ( FD_ISSET(fd, &pvmrfds) ) {
FD_CLR(fd, &pvmrfds);
#ifdef WIN32
pvmnfds--;
#endif
}
/*
if (sets & 2)
if ( FD_ISSET(fd, &pvmwfds) ) {
FD_CLR(fd, &pvmwfds);
#ifdef WIN32
pvmnwfds--;
#endif
}
if (sets & 4)
if ( FD_ISSET(fd, &pvmefds) ) {
FD_CLR(fd, &pvmefds);
#ifdef WIN32
pvmnefds--;
#endif
}
*/
#ifndef WIN32
/* if this was highest, may have to adjust nfds to new highest */
if (fd + 1 == pvmnfds)
while (pvmnfds > 0) {
pvmnfds--;
if (FD_ISSET(pvmnfds, &pvmrfds)
/*
|| FD_ISSET(pvmnefds, &pvmefds)
|| FD_ISSET(pvmnwfds, &pvmwfds)
*/
) {
pvmnfds++;
break;
}
}
#endif
return 0;
}
/* ttpcb_new()
*
* Create a new, blank ttpcb.
*/
struct ttpcb *
ttpcb_new()
{
struct ttpcb *pcbp;
if (pcbp = TALLOC(1, struct ttpcb, "tpcb")) {
BZERO((char*)pcbp, sizeof(struct ttpcb));
pcbp->tt_fd = -1;
pcbp->tt_rxfrag = pmsg_new(1);
BZERO((char*)pcbp->tt_rxfrag, sizeof(struct pmsg));
pcbp->tt_rxfrag->m_link = pcbp->tt_rxfrag->m_rlink = pcbp->tt_rxfrag;
}
return pcbp;
}
/* ttpcb_delete()
*
* Get rid of a ttpcb. Delete it from any list, close the socket,
* free any rx frag heap.
*/
void
ttpcb_delete(pcbp)
struct ttpcb *pcbp;
{
struct pmsg *up;
if (pcbp->tt_link) {
LISTDELETE(pcbp, tt_link, tt_rlink);
}
if (pcbp->tt_fd != -1) {
pvm_fd_delete(pcbp->tt_fd, 3);
(void)close(pcbp->tt_fd);
}
if (up = pcbp->tt_rxfrag) {
while (up->m_link != up)
umbuf_free(up->m_link);
pmsg_unref(up);
}
if (pcbp->tt_rxf)
fr_unref(pcbp->tt_rxf);
#ifndef NOUNIXDOM
if (pcbp->tt_spath)
(void)unlink(pcbp->tt_spath);
#endif
PVM_FREE(pcbp);
}
/* ttpcb_creat()
*
* Create a new task-task entry. Allocates a ttpcb, inserts it in
* ttlist.
*/
struct ttpcb *
ttpcb_creat(tid)
int tid; /* peer tid */
{
struct ttpcb *pcbp, *pcbp2;
if (pcbp = ttpcb_new()) {
pcbp->tt_tid = tid;
for (pcbp2 = ttlist->tt_link; pcbp2 != ttlist; pcbp2 = pcbp2->tt_link)
if (pcbp2->tt_tid > tid)
break;
LISTPUTBEFORE(pcbp2, pcbp, tt_link, tt_rlink);
}
return pcbp;
}
/* ttpcb_find()
*
* Find a task-task entry by tid in ttlist.
*/
struct ttpcb *
ttpcb_find(tid)
int tid; /* peer tid */
{
struct ttpcb *pcbp;
#if defined(IMA_MPP)
if ( pcbp = mpp_ttpcb_find(tid))
return pcbp;
#endif
for (pcbp = ttlist->tt_link; pcbp != ttlist; pcbp = pcbp->tt_link)
if (pcbp->tt_tid >= tid)
break;
return (pcbp->tt_tid == tid) ? pcbp : (struct ttpcb*)0;
}
/* ttpcb_dead()
*
* Mark ttpcb dead, close socket, remove it from fd sets.
*/
void
ttpcb_dead(pcbp)
struct ttpcb *pcbp;
{
struct pmsg *up;
int sbf;
pcbp->tt_state = TTDEAD;
if (pcbp->tt_fd != -1) {
pvm_fd_delete(pcbp->tt_fd, 3);
(void)close(pcbp->tt_fd);
/* notify routing socket closed */
/*
XXX problems with the route notify messages:
XXX they can be delayed / cause deadlock because gotem isn't incremented
*/
check_routedelete(pcbp);
pcbp->tt_fd = -1;
}
#ifndef NOUNIXDOM
if (pcbp->tt_spath) {
(void)unlink(pcbp->tt_spath);
pcbp->tt_spath = 0;
}
#endif
if (pcbp->tt_rxf) {
fr_unref(pcbp->tt_rxf);
pcbp->tt_rxf = 0;
}
if (up = pcbp->tt_rxfrag) {
while (up->m_link != up)
umbuf_free(up->m_link);
}
}
int
ttpcb_dump(pcbp)
struct ttpcb *pcbp;
{
pvmlogprintf("ttpcb_dump() t%x fd=%d sad=%s",
pcbp->tt_tid,
pcbp->tt_fd,
inadport_decimal(&pcbp->tt_sad));
pvmlogprintf(" osad=%s state=%d\n",
inadport_decimal(&pcbp->tt_osad), pcbp->tt_state);
return 0;
}
int
ttpcb_dumpall()
{
struct ttpcb *pcbp;
pvmlogerror("ttpcb_dumpall()\n");
ttpcb_dump(topvmd);
for (pcbp = ttlist->tt_link; pcbp != ttlist; pcbp = pcbp->tt_link)
ttpcb_dump(pcbp);
return 0;
}
/* check_routeadd()
*
* Check if a notify is posted for new route creation, and send us
* a message if so.
*/
int
check_routeadd(pcbp)
struct ttpcb *pcbp;
{
struct waitc *wp, *wp2;
struct pmsg *up;
int sbf;
wp = waitlist->wa_link;
while (wp != waitlist) {
wp2 = wp->wa_link;
if (wp->wa_kind == WT_ROUTEA) {
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
pvm_pkint(&pcbp->tt_tid, 1, 1);
pvm_pkint(&pcbp->tt_fd, 1, 1);
sbf = pvm_setsbuf(sbf);
up = midtobuf(sbf);
up->m_ctx = wp->wa_mesg->m_ctx;
up->m_tag = wp->wa_mesg->m_tag;
mesg_input(up);
if (wp->wa_count != -1 && --wp->wa_count < 1)
wait_delete(wp);
}
wp = wp2;
}
return 0;
}
/* check_routedelete()
*
* Check if a notify is posted for route deletion, and send us
* a message if so.
*/
int
check_routedelete(pcbp)
struct ttpcb *pcbp;
{
struct waitc *wp, *wp2;
struct pmsg *up;
int tid = pcbp->tt_tid;
wp = waitlist->wa_link;
while (wp != waitlist) {
wp2 = wp->wa_link;
if (wp->wa_kind == WT_ROUTED && tid == wp->wa_on) {
up = wp->wa_mesg;
wp->wa_mesg = 0;
mesg_input(up);
wait_delete(wp);
}
wp = wp2;
}
return 0;
}
int
post_routedelete(tid, ctx, tag)
int tid;
int ctx;
int tag;
{
int sbf;
struct waitc *wp;
struct pmsg *up;
int i;
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
pvm_pkint(&tid, 1, 1);
i = -1;
pvm_pkint(&i, 1, 1);
sbf = pvm_setsbuf(sbf);
up = midtobuf(sbf);
up->m_ctx = ctx;
up->m_tag = tag;
if (ttpcb_find(tid)) {
wp = wait_new(WT_ROUTED);
wp->wa_tid = pvmmytid;
wp->wa_on = tid;
wp->wa_mesg = up;
} else {
mesg_input(up);
}
return 0;
}
/* pvm_tc_conreq()
*
* Another task requests a connection with us.
* Reply with a TC_CONACK message.
*
* TC_CONREQ() {
* int tdprotocol // t-d protocol revision number
* string sockaddr // address of other socket
* }
*/
static int
pvm_tc_conreq(mid)
int mid;
{
int src; /* sender of request */
int sbf = 0; /* reply message mid */
int ttpro; /* protocol revision */
struct ttpcb *pcbp; /* pcb for connection */
int ackd; /* allow connection (0) */
char *addr = ""; /* socket address */
int i;
int ictx;
#ifdef SOCKLENISUINT
#if defined(IMA_AIX4SP2) || defined(IMA_AIX5SP2)
unsigned int oslen;
#else
size_t oslen;
#endif
#else
int oslen;
#endif
#ifndef NOUNIXDOM
struct sockaddr_un uns;
char spath[PVMTMPNAMLEN];
#endif
char buf[256];
pvm_bufinfo(mid, (int *)0, (int *)0, &src);
pvm_upkint(&ttpro, 1, 1);
pvm_upkstr(buf);
if (pcbp = ttpcb_find(src)) {
if (pvmdebmask & PDMROUTE) {
pvmlogprintf(
"pvm_tc_conreq() crossed CONREQ from t%x\n", src);
}
if (pcbp->tt_state == TTCONWAIT) {
/* must handle simultaneous connect from both ends:
* lower tid of the two just ignores the CONREQ.
* higher tid pretends it didn't send a CONREQ,
* sends a CONACK back.
* this makes the connection single-sided.
*/
if (pvmdebmask & PDMROUTE)
pvmlogerror( "pvmmctl() handling crossed CONREQ\n");
if (pvmmytid > src) {
if (listen(pcbp->tt_fd, 1) == -1)
pvmlogperror("pvm_tc_conreq() listen");
else {
pcbp->tt_state = TTGRNWAIT;
pvm_fd_add(pcbp->tt_fd, 1);
ackd = 0;
if (buf[0] == '/') {
#ifdef NOUNIXDOM
pvmlogprintf( "%s CONREQ from t%x, ",
"pvm_tc_conreq()", src );
pvmlogprintf(
"Unix domain socket unsupported\n" );
#else
addr = pcbp->tt_spath;
#endif
} else {
hex_inadport(buf, &pcbp->tt_osad);
addr = inadport_hex(&pcbp->tt_sad);
}
check_routeadd(pcbp);
sbf=pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
ttpro = TDPROTOCOL;
pvm_pkint(&ttpro, 1, 1);
pvm_pkint(&ackd, 1, 1);
pvm_pkstr(addr);
i = pvmrescode;
pvmrescode = 1;
ictx = pvm_setcontext(SYSCTX_TC);
pvm_send(src, TC_CONACK);
pvm_setcontext(ictx);
pvmrescode = i;
pvm_freebuf(pvm_setsbuf(sbf));
}
}
} else {
pvmlogprintf(
"pvm_tc_conreq() CONREQ from t%x but state=%d ?\n",
src, pcbp->tt_state);
}
} else {
if (pvmdebmask & PDMROUTE)
pvmlogprintf("pvm_tc_conreq() CONREQ from t%x\n", src);
ackd = 1;
pcbp = ttpcb_creat(src);
if (pvmrouteopt != PvmDontRoute) {
if (buf[0] == '/') {
#ifdef NOUNIXDOM
pvmlogprintf( "%s CONREQ from t%x, ",
"pvm_tc_conreq()", src );
pvmlogprintf( "Unix domain socket unsupported\n" );
#else /*NOUNIXDOM*/
if ((pcbp->tt_fd = socket(AF_UNIX, SOCK_STREAM, 0))
== -1) {
pvmlogperror("pvm_tc_conreq() socket");
} else {
BZERO((char*)&uns, sizeof(uns));
uns.sun_family = AF_UNIX;
spath[0] = 0;
(void)PVMTMPNAMFUN(spath);
strcpy(uns.sun_path, spath);
oslen = sizeof(uns);
if (bind(pcbp->tt_fd, (struct sockaddr*)&uns, oslen)
== -1) {
pvmlogperror("pvm_tc_conreq() bind");
} else {
if (listen(pcbp->tt_fd, 1) == -1)
pvmlogperror("pvm_tc_conreq() listen");
else {
pcbp->tt_state = TTGRNWAIT;
pvm_fd_add(pcbp->tt_fd, 1);
ackd = 0;
addr = pcbp->tt_spath = STRALLOC(spath);
if (pvmdebmask & PDMROUTE)
pvmlogprintf( "%s: %s (t%x)\n",
"pvm_tc_conreq()",
"new route socket listening", src );
/* new route socket listening, */
/* will be accepted later */
check_routeadd(pcbp);
}
}
}
#endif /*NOUNIXDOM*/
} else {
if ((pcbp->tt_fd = socket(AF_INET, SOCK_STREAM, 0))
== -1) {
pvmlogperror("pvm_tc_conreq() socket");
} else {
pcbp->tt_sad = pvmourinet;
oslen = sizeof(pcbp->tt_sad);
if (bind(pcbp->tt_fd,
(struct sockaddr*)&pcbp->tt_sad, oslen)
== -1) {
pvmlogperror("pvm_tc_conreq() bind");
} else {
if (getsockname(pcbp->tt_fd,
(struct sockaddr*)&pcbp->tt_sad, &oslen)
== -1) {
pvmlogperror("pvm_tc_conreq() getsockname");
} else {
if (listen(pcbp->tt_fd, 1) == -1)
pvmlogperror("pvm_tc_conreq() listen");
else {
hex_inadport(buf, &pcbp->tt_osad);
pcbp->tt_state = TTGRNWAIT;
pvm_fd_add(pcbp->tt_fd, 1);
ackd = 0;
addr = inadport_hex(&pcbp->tt_sad);
if (pvmdebmask & PDMROUTE)
pvmlogprintf( "%s: %s (t%x)\n",
"pvm_tc_conreq()",
"new route socket listening",
src );
/* new route socket listening, */
/* will be accepted later */
check_routeadd(pcbp);
}
}
}
}
}
}
if (pvmdebmask & PDMROUTE)
pvmlogprintf( "%s: sending CONACK to t%x\n",
"pvm_tc_conreq()", src );
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
ttpro = TDPROTOCOL;
pvm_pkint(&ttpro, 1, 1);
pvm_pkint(&ackd, 1, 1);
pvm_pkstr(addr);
i = pvmrescode;
pvmrescode = 1;
ictx = pvm_setcontext(SYSCTX_TC);
pvm_send(src, TC_CONACK);
pvm_setcontext(ictx);
pvmrescode = i;
pvm_freebuf(pvm_setsbuf(sbf));
if (ackd)
ttpcb_delete(pcbp);
}
pvm_freebuf(mid);
return 0;
}
/* pvm_tc_conack()
*
* Another task replies to our connection request.
*
* TC_CONACK() {
* int tdprotocol // t-d protocol revision number
* int ack // 0 ok, 1 denied
* string sockaddr // address of other socket
* }
*/
static int
pvm_tc_conack(mid)
int mid;
{
int src; /* sender of reply */
int ttpro; /* protocol revision */
int ackd; /* connection allowed (0) */
struct ttpcb *pcbp; /* pcb for connection */
static int linger[2] = { 1, 60 }; /* XXX arbitrary time */
int i;
#ifndef NOUNIXDOM
struct sockaddr_un uns;
#endif
char buf[256];
pvm_bufinfo(mid, (int *)0, (int *)0, &src);
pvm_upkint(&ttpro, 1, 1);
pvm_upkint(&ackd, 1, 1);
pvm_upkstr(buf);
if (pcbp = ttpcb_find(src)) {
if (pcbp->tt_state == TTCONWAIT) {
if (pvmdebmask & PDMROUTE) {
pvmlogprintf(
"pvm_tc_conack() CONACK from t%x ackd=%d\n",
src, ackd );
}
if (ttpro != TDPROTOCOL) {
pvmlogprintf("pvm_tc_conack() t-t protocol mismatch with t%x\n",
pcbp->tt_tid);
ackd = 1;
} else {
if (ackd != 0) {
if (pvmdebmask & PDMROUTE) {
pvmlogprintf("pvm_tc_conack() route to t%x denied\n",
pcbp->tt_tid);
}
} else {
if (buf[0] == '/') {
#ifdef NOUNIXDOM
pvmlogprintf(
"pvm_tc_conack() CONREQ from t%x, Unix domain socket unsupported\n",
src);
ackd = 1;
#else /*NOUNIXDOM*/
BZERO((char*)&uns, sizeof(uns));
uns.sun_family = AF_UNIX;
strcpy(uns.sun_path, buf);
while ((i = connect(pcbp->tt_fd, (struct sockaddr*)&uns,
sizeof(uns))) == -1
&& errno == EINTR)
;
if (i == -1) {
pvmlogperror("pvm_tc_conack() connect");
ackd = 1;
} else {
pcbp->tt_state = TTOPEN;
#ifndef WIN32
if ((i = fcntl(pcbp->tt_fd, F_GETFL, 0)) == -1)
pvmlogperror("pvm_tc_conack() fcntl");
else {
#ifdef O_NDELAY
i |= O_NDELAY;
#else
i |= FNDELAY;
#endif
(void)fcntl(pcbp->tt_fd, F_SETFL, i);
}
#endif
pvm_fd_add(pcbp->tt_fd, 1);
}
#endif /*NOUNIXDOM*/
} else {
pcbp->tt_osad.sin_family = AF_INET;
hex_inadport(buf, &pcbp->tt_osad);
while ((i = connect(pcbp->tt_fd,
(struct sockaddr*)&pcbp->tt_osad,
sizeof(pcbp->tt_osad))) == -1
&& errno == EINTR)
;
if (i == -1) {
pvmlogperror("pvm_tc_conack() connect");
ackd = 1;
} else {
pcbp->tt_state = TTOPEN;
#ifndef NOSOCKOPT
if (setsockopt(pcbp->tt_fd, SOL_SOCKET, SO_LINGER,
(char*)linger, sizeof(linger)) == -1)
pvmlogperror("pvm_tc_conack() setsockopt");
#endif /*NOSOCKOPT*/
#ifndef WIN32
if ((i = fcntl(pcbp->tt_fd, F_GETFL, 0)) == -1)
pvmlogperror("pvm_tc_conack() fcntl");
else {
#ifdef O_NDELAY
i |= O_NDELAY;
#else
i |= FNDELAY;
#endif
(void)fcntl(pcbp->tt_fd, F_SETFL, i);
}
#endif
pvm_fd_add(pcbp->tt_fd, 1);
}
}
}
}
if (ackd != 0) {
pcbp->tt_state = TTDENY;
(void)close(pcbp->tt_fd);
pcbp->tt_fd = -1;
}
else if (pvmdebmask & PDMROUTE) {
pvmlogprintf( "%s: connection accepted to t%x\n",
"pvm_tc_conack()", src );
}
} else {
pvmlogprintf("pvm_tc_conack() CONACK from t%x but state=%d\n",
src, pcbp->tt_state);
}
} else {
pvmlogprintf("pvm_tc_conack() suprious CONACK from t%x\n", src);
}
pvm_freebuf(mid);
return 0;
}
/* pvm_tc_taskexit()
*
* We are notified that another task (to which we have a direct route)
* has exited.
*/
static int
pvm_tc_taskexit(mid)
int mid;
{
int tid; /* exited task id */
struct ttpcb *pcbp;
pvm_upkint(&tid, 1, 1);
if (pvmdebmask & PDMROUTE) {
pvmlogprintf("pvm_tc_taskexit() TASKEXIT for t%x\n", tid);
}
/*
* v3.3: for now, don't close fully open route as we may lose
* messages. rely on socket getting EOF
*/
if ((pcbp = ttpcb_find(tid))
&& pcbp->tt_state != TTOPEN)
ttpcb_dead(pcbp);
pvm_freebuf(mid);
return 0;
}
/* mxinput()
*
* Input from a connection.
* Reassemble packets into messages and receive them.
* Returns >= 0 the number of messages added to pvmrxlist,
* or negative on error.
*/
static int
mxinput(pcbp, nfr)
struct ttpcb *pcbp;
int *nfr; /* number of frags read */
{
int gotem = 0; /* num msgs added to pvmrxlist */
struct frag *fp; /* shadows pcbp->tt_rxf */
struct frag *fp2;
int n; /* bytes received */
int m; /* length of fragment */
struct pmsg *rxup; /* message containing this frag */
struct pmsg *hdfrgpile;
char *cp; /* gp */
int src;
int dst;
int ff;
static int fairprobe = 0;
*nfr = 0;
#if defined(IMA_MPP)
/* Messages from Tasks and Messages from the daemon are probed
* fairprobe makes sure that neither interface gets starved.
* fairprobe is toggled when an EOM is found
*/
switch (fairprobe)
{
/* pvm_readfrom<xxxx> returns a pointer to a frag, with its
* length properly filled in. cp returns a pointer to the
* the start of the data
*/
case 0:
if ((fp = pvm_readfrompeer()) == (struct frag *) NULL)
fp = pvm_readfrompvmd();
break;
default:
if ((fp = pvm_readfrompvmd()) == (struct frag *) NULL)
fp = pvm_readfrompeer();
break;
}
if (!fp)
return gotem; /* didn't read anything */
#else
if (!pcbp->tt_rxf)
if (!(pcbp->tt_rxf = fr_new(pvmfrgsiz)))
return PvmNoMem;
fp = pcbp->tt_rxf;
/* read fragment header separately from body */
n = (fp->fr_len < TDFRAGHDR) ? 0 : pvmget32(fp->fr_dat + 8);
n += TDFRAGHDR - fp->fr_len;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("mxinput() pcb t%x fr_len=%d fr_dat=+%d n=%d\n",
pcbp->tt_tid, fp->fr_len, fp->fr_dat - fp->fr_buf, n);
}
#ifndef WIN32
n = read(pcbp->tt_fd, fp->fr_dat + fp->fr_len, n);
#else
n = win32_read_socket( pcbp->tt_fd,fp->fr_dat+fp->fr_len,n);
#endif
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("mxinput() read=%d\n", n);
}
/* deal with errors and closes */
if (n == -1 &&
#ifndef WIN32
errno != EWOULDBLOCK &&
#endif
errno != EINTR)
{
if (pvmdebmask & PDMPACKET) {
pvmlogperror("mxinput() read");
pvmlogprintf("mxinput() t%x\n", pcbp->tt_tid);
}
return PvmSysErr;
}
if (!n) {
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("mxinput() t%x read EOF\n", pcbp->tt_tid);
}
return -1;
}
if (n < 1)
return gotem;
if ((fp->fr_len += n) < TDFRAGHDR)
return gotem;
/* realloc buffer if frag won't fit */
m = TDFRAGHDR + pvmget32(fp->fr_dat + 8);
if (fp->fr_len == TDFRAGHDR) {
if (m > fp->fr_max - (fp->fr_dat - fp->fr_buf)) {
fp2 = fr_new(m);
BCOPY(fp->fr_dat, fp2->fr_dat, TDFRAGHDR);
fp2->fr_len = fp->fr_len;
fr_unref(fp);
fp = pcbp->tt_rxf = fp2;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("mxinput() realloc frag max=%d\n", m);
}
}
}
#endif /* defined (IMA_MPP) */
/* Now get information from the frag header to determine
* src, destination, length, etc.
*/
#if !defined(IMA_MPP)
if (fp->fr_len == m)
#else
if (fp) /* got a frag */
#endif
{
(*nfr)++;
#if !defined (IMA_MPP)
pcbp->tt_rxf = 0;
#endif
cp = fp->fr_dat;
dst = pvmget32(cp);
src = pvmget32(cp + 4);
ff = pvmget8(cp + 12);
fp->fr_len -= TDFRAGHDR;
fp->fr_dat += TDFRAGHDR;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("mxinput() pkt src t%x len %d ff %d\n",
src, fp->fr_len, ff);
}
#if defined(IMA_MPP)
if (fp -> fr_rip ) /* frag was received inplace */
{
gotem++;
fr_unref(fp); /* free frag -- mppprecv knows that the msg completed */
return gotem;
}
hdfrgpile = pvm_mpp_pmsgs();
#else
hdfrgpile = pcbp->tt_rxfrag ;
#endif
/*
* if start of message, make new umbuf, add to frag pile
*/
if (ff & FFSOM) {
cp += TDFRAGHDR;
fp->fr_len -= MSGHDRLEN;
fp->fr_dat += MSGHDRLEN;
rxup = umbuf_new();
rxup->m_enc = pvmget32(cp);
rxup->m_tag = pvmget32(cp + 4);
rxup->m_ctx = pvmget32(cp + 8);
rxup->m_wid = pvmget32(cp + 16);
rxup->m_crc = pvmget32(cp + 20);
rxup->m_src = src;
rxup->m_dst = dst;
LISTPUTBEFORE(hdfrgpile, rxup, m_link, m_rlink);
} else {
/* locate frag's message */
for (rxup = hdfrgpile->m_link; rxup != hdfrgpile;
rxup = rxup->m_link)
if (rxup->m_src == src)
break;
}
if (rxup == hdfrgpile) { /* uh oh, no message for it */
pvmlogerror("mxinput() frag with no message\n");
fr_unref(fp);
} else {
LISTPUTBEFORE(rxup->m_frag, fp, fr_link, fr_rlink);
rxup->m_len += fp->fr_len;
/*
* if end of message, move to pvmrxlist and count it
*/
if (ff & FFEOM) {
#if defined(IMA_MPP)
fairprobe = (fairprobe ? 0 : 1);
#endif
LISTDELETE(rxup, m_link, m_rlink);
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf(
"mxinput() src t%x route t%x ctx %d tag %s len %d\n",
rxup->m_src, (pcbp != 0 ? pcbp->tt_tid: -1),
rxup->m_ctx,
pvmnametag(rxup->m_tag, (int *)0), rxup->m_len);
}
#ifdef MCHECKSUM
if (rxup->m_crc != umbuf_crc(rxup)) {
pvmlogprintf(
"mxinput() message src t%x route t%x tag %s bad checksum\n",
rxup->m_src, pcbp->tt_tid,
pvmnametag(rxup->m_tag, (int *)0));
umbuf_free(rxup);
} else {
#endif
pmsg_setenc(rxup, rxup->m_enc);
mesg_input(rxup);
gotem++;
#ifdef MCHECKSUM
}
#endif
}
}
}
return gotem;
}
/* mxfer()
*
* Move message frags between task and pvmd or other tasks.
* Returns when
* Outgoing message (if any) fully sent AND
* (Timed out (tmout) OR
* At least one message fully received) AND
* No message from pvmd partially received.
* Returns >= 0 the number of complete messages downloaded, or
* negative on error.
*/
/* define a preallocated pool of inplace headers */
#define SHORT_PAYLOAD 128
#define HEADER_SIZE (TDFRAGHDR + MSGHDRLEN)
#define NINPLACE_HDRS (8096/(SHORT_PAYLOAD + HEADER_SIZE))
static char inPlaceHdrPool[NINPLACE_HDRS * ( HEADER_SIZE )];
static int hdrPoolStk = 0;
static int
mxfer(txup, tmout)
struct pmsg *txup; /* outgoing message or null */
struct timeval *tmout; /* time limit to get at least one message */
{
struct ttpcb *txpcbp = 0; /* route for ogm */
struct frag *txfp = 0; /* cur ogm frag or null */
struct timeval tin;
struct timeval tnow;
struct timeval *tvp;
#ifdef FDSETNOTSTRUCT
fd_set rfds, wfds;
#else
struct fd_set rfds, wfds;
#endif
int wantmore = 1; /* nothing rxd yet and not timed out */
int gotem = 0; /* count of received msgs downloaded */
int ff;
int n;
#ifdef SOCKLENISUINT
#if defined(IMA_AIX4SP2) || defined(IMA_AIX5SP2)
unsigned int oslen;
#else
size_t oslen;
#endif
#else
int oslen;
#endif
char *txcp = 0; /* point to remainder of txfp */
int txtogo = 0; /* len of remainder of txfp */
struct sockaddr_in sad;
int s;
struct ttpcb *pcbp;
char *inPlaceHeader = (char *) NULL; /* for inplace data */
int inPlaceBodyLen = 0; /* len of inplace body */
int bypassRead = FALSE; /* bypass reading packets before send?*/
int waitForOgmToComplete = FALSE;
int probedForIncomingPkts = FALSE; /* need to probe for incoming pkts */
struct msgid *sendmsg = (struct msgid *) NULL;
char errtxt[64];
int nfr = 0, totfr = 0; /* number frags read my mxinput */
int probe = 0; /* called at a probe (nrecv) */
if (tmout)
{
pvmgetclock(&tin);
probe = ( tmout -> tv_sec == 0 && tmout -> tv_usec == 0);
}
if (txup) { /* transmitting a umsg */
txfp = txup->m_frag->fr_link;
if (!txfp->fr_buf) {
if (!(txfp = fr_new(MAXHDR)))
return PvmNoMem;
txfp->fr_dat += MAXHDR;
LISTPUTBEFORE(txup->m_frag, txfp, fr_link, fr_rlink);
}
/* Get connection info for this dest. If none, go to pvmd */
if (!(txpcbp = ttpcb_find(txup->m_dst)) || txpcbp->tt_state != TTOPEN)
txpcbp = topvmd;
txup->m_ref++;
bypassRead = TRUE; /* bypass reading the first time through */
}
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("mxfer() mid %d", (txup ? txup->m_mid : 0));
if (txup)
pvmlogprintf(" ctx %d tag %s dst t%x route t%x",
txup->m_ctx,
pvmnametag(txup->m_tag, (int *)0),
txup->m_dst, txpcbp->tt_tid);
if (tmout)
pvmlogprintf(" tmout %d.%06d\n",
tmout->tv_sec, tmout->tv_usec);
else
pvmlogprintf(" tmout inf\n");
}
/* XXX we go an extra time through select because wantmore gets set
XXX inside loop. no big deal. also, could avoid calling gettimeofday
XXX if tmout is 0.0 */
/* Do this loop if
* 1) Sending a frag
* OR 2) Received *part* of an incoming frag from the daemon
* OR 3) Wantmore is true
* OR 4) Need the current OGM to complete its asynch send
* OR 5) We haven't probed at least once for incoming packets
*
* wantmore is initialized to TRUE -> loop exec'ed at least once
*/
while (txfp || topvmd->tt_rxf || wantmore || waitForOgmToComplete
|| !probedForIncomingPkts
)
{
totfr = 0;
/* Either we are sending something, or have a partial recv'd frag */
if (txfp || topvmd->tt_rxf) /* gotta block */
{
/* must block if sending any message, or receiving from pvmd */
tvp = (struct timeval *) NULL;
}
else
{
if (wantmore)
{
/* Calculate how long to wait inside the "select"
* tval = NULL means wait forever
* tval = (0,0) means probe
* tval = (x,y) means wait at most x sec + y usec
*/
if (tmout) /* finite time to block */
{
pvmgetclock(&tnow);
TVXSUBY(&tnow, &tnow, &tin);
if (TVXLTY(tmout, &tnow)) /* time's up */
{
TVCLEAR(&tnow);
wantmore = 0;
}
else
{
TVXSUBY(&tnow, tmout, &tnow);
}
tvp = &tnow;
}
else
{ /* forever to block */
tvp = (struct timeval *) NULL;
}
}
else
{ /* return asap */
TVCLEAR(&tnow);
tvp = &tnow;
}
}
/* Here we are setting the read and write file descriptors for
* the following select statement.
*/
#if !defined(IMA_MPP)
rfds = pvmrfds;
FD_ZERO(&wfds);
if (txfp)
FD_SET(txpcbp->tt_fd, &wfds);
#endif
if (pvmdebmask & PDMSELECT) {
if (tvp)
pvmlogprintf("mxfer() select timeout %d.%06d\n",
tvp->tv_sec, tvp->tv_usec);
else
pvmlogprintf("mxfer() select timeout inf\n");
#if !defined(IMA_MPP)
print_fdset("mxfer() rfds=", pvmnfds, &rfds);
print_fdset("mxfer() wfds=", pvmnfds, &wfds);
#endif
}
#if !defined(IMA_MPP)
probedForIncomingPkts = TRUE;
if ((n = select(pvmnfds,
#ifdef FDSETISINT
(int *)&rfds, (int *)&wfds, (int *)0,
#else
(fd_set *)&rfds, (fd_set *)&wfds, (fd_set*)0,
#endif
tvp)) == -1
&& errno != EINTR
#ifdef IMA_LINUX
&& errno != EAGAIN
#endif
) {
pvmlogperror("mxfer() select");
return PvmSysErr;
}
if (pvmdebmask & PDMSELECT) {
pvmlogprintf("mxfer() select returns %d\n", n);
}
if (n == -1)
continue;
/*
* if pvmd conn has data ready, read packets
*/
if (FD_ISSET(topvmd->tt_fd, &rfds)
&& !(mxfersingle && gotem))
{
if ((n = mxinput(topvmd, &nfr)) < 0) {
if (n != -1)
pvmlogerror("mxfer() mxinput bad return on pvmd sock\n");
else if (pvmdebmask & (PDMSELECT|PDMMESSAGE|PDMPACKET))
pvmlogerror("mxfer() EOF on pvmd sock\n");
return PvmSysErr;
}
totfr += nfr;
if (gotem += n)
wantmore = 0;
}
/*
* if task conn has data ready, read packets
*/
/* This goes through *all* of the pcbs that have been defined to
* check for messages. This is too much overhead for IMA_MPP
* machines. Instead all local routes must be probed at one time.
*/
for (pcbp = ttlist->tt_link; pcbp != ttlist; pcbp = pcbp->tt_link) {
if (pcbp->tt_state == TTOPEN && FD_ISSET(pcbp->tt_fd, &rfds)) {
if ((n = mxinput(pcbp, &nfr)) < 0)
ttpcb_dead(pcbp);
else {
totfr += nfr;
if (gotem += n)
wantmore = 0;
}
} else /* Try and complete a direct (socket) Connection rqst */
if (pcbp->tt_state == TTGRNWAIT
&& FD_ISSET(pcbp->tt_fd, &rfds)) {
oslen = sizeof(sad);
s = accept(pcbp->tt_fd, (struct sockaddr*)&sad, &oslen);
if (s == -1) {
pvmlogperror("mxfer() accept");
ttpcb_dead(pcbp);
} else {
#ifndef NOSOCKOPT
#ifndef NOUNIXDOM
if (!pcbp->tt_spath) { /* not unix sock */
#endif
n = 1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
(char*)&n, sizeof(int)) == -1)
pvmlogperror("mxfer() setsockopt");
#if (!defined(IMA_RS6K)) \
&& (!defined(IMA_AIX46K)) && (!defined(IMA_AIX46K64)) \
&& (!defined(IMA_AIX56K)) && (!defined(IMA_AIX56K64))
n = TTSOCKBUF;
if (setsockopt(s, SOL_SOCKET, SO_SNDBUF,
(char*)&n, sizeof(int)) == -1
|| setsockopt(s, SOL_SOCKET, SO_RCVBUF,
(char*)&n, sizeof(n)) == -1)
pvmlogperror(
"mxfer() setsockopt SO_SNDBUF");
#endif
#ifndef NOUNIXDOM
}
#endif
#endif /*NOSOCKOPT*/
if (pvmdebmask & PDMROUTE) {
pvmlogprintf("mxfer() accept from t%x = %d\n",
pcbp->tt_tid, s);
}
(void)close(pcbp->tt_fd);
/*
(void)dup2(s, pcbp->tt_fd);
(void)close(s);
*/
pvm_fd_delete(pcbp->tt_fd, 1);
/* notify route socket closed */
check_routedelete(pcbp);
pcbp->tt_fd = s;
pcbp->tt_state = TTOPEN;
#ifndef WIN32
{
int i;
if ((i = fcntl(pcbp->tt_fd, F_GETFL, 0))
== -1)
pvmlogperror("mxfer() fcntl");
else {
#ifdef O_NDELAY
i |= O_NDELAY;
#else
i |= FNDELAY;
#endif
(void)fcntl(pcbp->tt_fd, F_SETFL, i);
}
}
#endif
pvm_fd_add(s, 1);
/* new route socket created for communication */
check_routeadd(pcbp);
}
}
}
bypassRead = bypassRead; /* sgi compiler */
#else /* !IMA_MPP */
/* For Native Messaging mxinput is used to probe pvmd AND tasks when
* given a NULL ttpcb pointer
*/
if (!bypassRead)
{
probedForIncomingPkts = TRUE;
if ((n = mxinput((struct ttpcb *) NULL, &nfr)) < 0) {
pvmlogerror("mxfer() mxinput bad return on probe\n");
return PvmSysErr;
}
totfr += nfr;
if (gotem += n)
wantmore = 0;
}
#endif /* !IMA_MPP */
/*
* if sending and socket ready to write, send packets
*/
#if !defined(IMA_MPP)
/* XXX Native message direct routes are alway open, if they exist */
if (txfp && txpcbp->tt_state != TTOPEN)
{
/* make sure our route hasn't just been closed */
txpcbp = topvmd;
txcp = 0;
txtogo = 0;
txfp = txup->m_frag->fr_link;
}
else
#endif
{
if (txfp
&& !waitForOgmToComplete
#if !defined(IMA_MPP)
&& FD_ISSET(txpcbp->tt_fd, &wfds)
#endif
){
inPlaceHeader = (char *) NULL; /*sending inplace header ?*/
inPlaceBodyLen = 0; /* length of inplace body, if inplace*/
if (!txtogo) /* We haven't sent anything, yet, Build Header */
{
if (txfp->fr_u.dab) /* packed data */
{
txcp = txfp->fr_dat;
}
else /* inplace data */
{
txcp = inPlaceHeader = inPlaceHdrPool +
hdrPoolStk*(HEADER_SIZE+SHORT_PAYLOAD) + HEADER_SIZE;
if (pvmdebmask & PDMPACKET)
{
sprintf(errtxt,"mxfer() hdrPoolStk is %d\n", hdrPoolStk);
pvmlogerror(errtxt);
}
if (hdrPoolStk++ >= NINPLACE_HDRS)
{
sprintf(errtxt, "mxfer(): > %d inplace frags \n",
NINPLACE_HDRS);
pvmlogerror(errtxt);
return PvmSysErr;
}
}
txtogo = txfp->fr_len;
/*
* if this is first frag, prepend message header
*/
ff = 0;
if (txfp->fr_rlink == txup->m_frag)
{
txcp -= MSGHDRLEN;
txtogo += MSGHDRLEN;
pvmput32(txcp,
(txup->m_enc == 0x20000000 ? pvmmydsig : txup->m_enc));
pvmput32(txcp + 4, txup->m_tag);
pvmput32(txcp + 8, txup->m_ctx);
pvmput32(txcp + 16, txup->m_wid);
#ifdef MCHECKSUM
pvmput32(txcp + 20, umbuf_crc(txup));
#else
pvmput32(txcp + 20, 0);
#endif
ff = FFSOM;
}
if (txfp->fr_link == txup->m_frag)
ff |= FFEOM;
/*
* prepend frag header
*/
txcp -= TDFRAGHDR;
pvmput32(txcp, txup->m_dst);
pvmput32(txcp + 4, pvmmytid);
pvmput32(txcp + 8, txtogo);
pvmput32(txcp + 12, 0); /* to keep putrify happy */
pvmput8(txcp + 12, ff);
txtogo += TDFRAGHDR;
if (!txfp->fr_u.dab) /* inplace data */
{
txtogo -= txfp->fr_len;
inPlaceBodyLen = txfp->fr_len;
}
}
if (pvmdebmask & PDMPACKET)
{
pvmlogprintf("mxfer() dst t%x n=%d\n", txup->m_dst, txtogo);
}
#if defined(IMA_MPP)
/* node_send will do an async send. Feed it a midlist struct.
* when we get to the end of a message we call ogm_done
* to make sure message is really gone, and we can free
* memory
*
*/
n = pvm_node_send(txcp, txtogo, txpcbp, &sendmsg,
inPlaceHeader, inPlaceBodyLen);
#else
inPlaceBodyLen = inPlaceBodyLen; /* sgi compiler */
#if defined(IMA_RS6K) || defined(IMA_SP2MPI) || defined(IMA_AIX4SP2) \
|| defined(IMA_AIX5SP2)
n = write(txpcbp->tt_fd, txcp, min(txtogo, 4096));
#else
#ifndef WIN32
n = write(txpcbp->tt_fd, txcp, txtogo);
#else
n = win32_write_socket(txpcbp->tt_fd,txcp,txtogo);
#endif
#endif
#endif
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("mxfer() wrote %d\n", n);
}
if (n == -1 &&
#ifndef WIN32
errno != EWOULDBLOCK &&
#endif
errno != EINTR)
{
if (txpcbp == topvmd)
{
pvmlogperror("mxfer() write pvmd sock");
return PvmSysErr;
}
else
{
pvmlogperror("mxfer() write tt sock");
/* reset message and route to pvmd */
ttpcb_dead(txpcbp);
txpcbp = topvmd;
txcp = 0;
txtogo = 0;
txfp = txup->m_frag->fr_link;
}
}
if (n > 0)
{
txcp += n;
if ((txtogo -= n) <= 0)
{
if (txcp == inPlaceHeader && txfp->fr_len )
/* inplace, body length > 0 */
{
txcp = txfp->fr_dat;
txtogo = txfp->fr_len;
bypassRead = TRUE; /* try to send body without
doing another read */
}
else /* packed */
{
txcp = 0;
txfp = txfp->fr_link;
if (!txfp->fr_buf) {
txfp = 0;
waitForOgmToComplete = TRUE;
bypassRead = FALSE;
}
}
}
}
else /* we couldn't send, might be a head-head send so ... */
{
bypassRead = FALSE; /* enable reading if couldn't send */
}
}
}
/* We are waiting for an outgoing message to complete. We
can't send the next message until it does complete */
if (waitForOgmToComplete)
{
if (ogm_complete(&sendmsg))
{
pmsg_unref(txup); /* unref the message */
txup = 0;
waitForOgmToComplete = FALSE;
hdrPoolStk = 0; /* recover all the inplace headers */
}
}
/* if not sending, and not waiting for ogm to complete,
flush anything waiting in the reentered-send queue */
if (!txfp && txrp != txwp && !waitForOgmToComplete) {
txup = txlist[txrp];
if (++txrp >= sizeof(txlist)/sizeof(txlist[0]))
txrp = 0;
txfp = txup->m_frag->fr_link;
txfp = txfp->fr_buf ? txfp : 0;
if (!(txpcbp = ttpcb_find(txup->m_dst))
|| txpcbp->tt_state != TTOPEN)
txpcbp = topvmd;
}
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("mxfer() txfp %d gotem %d tt_rxf %d\n",
(txfp ? 1 : 0), gotem, (topvmd->tt_rxf ? 1 : 0));
}
if (probe && !gotem && totfr) /* got frags, keep reading */
wantmore = 1;
}
return gotem;
}
/* mroute()
*
* Route a message to a destination / Receive incoming messages.
* Returns when
* Outgoing message (if any) fully sent AND
* (Timed out waiting (tmout) OR
* At least one message fully received)
* Returns >= 0 the number of complete messages downloaded
* (although some may have been consumed by message-handlers),
* or negative on error.
*
* If mroute() is reentered (mroute -> mxfer -> handler -> pvm_send),
* it queues the message and returns immediately.
* The enclosing mxfer() later sends any messages in this queue.
*/
int
mroute(mid, dtid, tag, tmout)
int mid; /* message (or 0) */
int dtid; /* dest */
int tag; /* type tag */
struct timeval *tmout; /* get at least one message */
{
static int alreadyhere = 0; /* have i called myself? */
struct ttpcb *pcbp; /* pcb for foreign task */
int s; /* socket */
int sbf; /* temp TC_CONREQ message */
struct pmsg *up, *up2;
struct sockaddr_in sad;
int l;
#ifdef SOCKLENISUINT
#if defined(IMA_AIX4SP2) || defined(IMA_AIX5SP2)
unsigned int oslen;
#else
size_t oslen;
#endif
#else
int oslen;
#endif
int cc = 0;
int gotem = 0; /* count of received messages */
static struct timeval ztv = { 0, 0 };
char *addr = 0;
#ifndef NOUNIXDOM
struct sockaddr_un uns;
char spath[PVMTMPNAMLEN];
#endif
if (up = midtobuf(mid)) {
up->m_dst = dtid;
up->m_tag = tag;
}
if (alreadyhere) {
if (up) {
up->m_ref++;
txlist[txwp] = up;
if (++txwp >= sizeof(txlist)/sizeof(txlist[0]))
txwp = 0;
if (txwp == txrp)
abort();
}
/* we can't wait around if reentered */
if (!tmout || tmout->tv_sec || tmout->tv_usec)
return PvmNotImpl;
return 0;
}
alreadyhere = 1;
/* if sending a message, decide whether we need to route it. */
/* Direct Routing is being set up in this section of code
* For workstations (not MPP, SHMEM)
* either Unix domain or TCP sockets are used.
* task exit notifies are set up for route closure
*
* For MPPs and SHMEM,
* connections to local tasks are always direct and require no
* handshake negotiation.
*/
if (up) { /* We've got something to send */
#if !defined(IMA_MPP) /* This is the workstation direct route setup */
/* Checks made (Workstation):
* o route option is direct
* o destination is given
* o destination is a task
* o destination is not me
* o could not find task process control block
*/
if (pvmrouteopt == PvmRouteDirect
&& dtid
&& TIDISTASK(dtid)
&& dtid != pvmmytid
&& !ttpcb_find(dtid))
{
#if !defined(NOUNIXDOM)
/* Create a UNIX domain socket to build the direct route */
if ((pvmmytid & TIDHOST) == (dtid & TIDHOST)) {
if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
pvmlogperror("mroute() socket");
} else {
BZERO((char*)&uns, sizeof(uns));
uns.sun_family = AF_UNIX;
spath[0] = 0;
(void)PVMTMPNAMFUN(spath);
strcpy(uns.sun_path, spath);
oslen = sizeof(uns);
if (bind(s, (struct sockaddr*)&uns, oslen) == -1) {
pvmlogperror("mroute() bind");
(void)close(s);
s = -1;
} else {
pcbp = ttpcb_creat(dtid);
pcbp->tt_fd = s;
addr = pcbp->tt_spath = STRALLOC(spath);
pcbp->tt_state = TTCONWAIT;
}
}
} else {
#endif /* !NOUNIXDOM */
/* Open a TCP direct Connection here, instead */
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
pvmlogperror("mroute() socket");
} else {
sad = pvmourinet;
oslen = sizeof(sad);
if (bind(s, (struct sockaddr*)&sad, oslen) == -1) {
pvmlogperror("mroute() bind");
(void)close(s);
s = -1;
} else {
if (getsockname(s, (struct sockaddr*)&sad, &oslen) == -1) {
pvmlogperror("mroute() getsockname");
(void)close(s);
s = -1;
} else {
#ifndef NOSOCKOPT
l = 1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
(char*)&l, sizeof(int)) == -1)
pvmlogperror("mroute() setsockopt");
#if (!defined(IMA_RS6K)) \
&& (!defined(IMA_AIX46K)) && (!defined(IMA_AIX46K64)) \
&& (!defined(IMA_AIX56K)) && (!defined(IMA_AIX56K64))
l = TTSOCKBUF;
if (setsockopt(s, SOL_SOCKET, SO_SNDBUF,
(char*)&l, sizeof(l)) == -1
|| setsockopt(s, SOL_SOCKET, SO_RCVBUF,
(char*)&l, sizeof(l)) == -1)
pvmlogperror("mroute() setsockopt SO_SNDBUF");
#endif
#endif
pcbp = ttpcb_creat(dtid);
pcbp->tt_fd = s;
pcbp->tt_sad = sad;
pcbp->tt_state = TTCONWAIT;
addr = inadport_hex(&sad);
}
}
}
#if !defined(NOUNIXDOM)
}
#endif
/* 1. Request direct connection (TC_CONREQ)
* 2. Request Notify so that we can close dead routes
*/
if (s >= 0) { /* Socket was opened correctly */
/* Send Connection Request */
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
l = TDPROTOCOL;
pvm_pkint(&l, 1, 1);
pvm_pkstr(addr);
sbf = pvm_setsbuf(sbf);
up2 = midtobuf(sbf);
up2->m_dst = dtid;
up2->m_tag = TC_CONREQ;
up2->m_ctx = SYSCTX_TC;
cc = mxfer(up2, &ztv);
pvm_freebuf(sbf);
/* now send a task exit notify to my pvmd */
if (cc >= 0) {
gotem += cc;
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
l = PvmTaskExit;
pvm_pkint(&l, 1, 1);
l = SYSCTX_TC;
pvm_pkint(&l, 1, 1);
l = TC_TASKEXIT;
pvm_pkint(&l, 1, 1);
l = 1;
pvm_pkint(&l, 1, 1);
pvm_pkint(&dtid, 1, 1);
sbf = pvm_setsbuf(sbf);
up2 = midtobuf(sbf);
up2->m_dst = TIDPVMD;
up2->m_tag = TM_NOTIFY;
up2->m_ctx = SYSCTX_TM;
cc = mxfer(up2, &ztv);
pvm_freebuf(sbf);
}
if (pvmdebmask & PDMROUTE) {
pvmlogprintf("mroute() CONREQ to t%x\n", dtid);
}
/* wait until the route pcb changes state */
while (pcbp->tt_state == TTCONWAIT) {
if ((cc = mxfer((struct pmsg *)0, (struct timeval *)0)) < 0)
break;
gotem += cc;
}
if (pcbp->tt_state == TTOPEN)
check_routeadd(pcbp);
}
}
#endif /* IMA_MPP */
}
/* now send the original message with whatever route we have */
if (cc >= 0)
if ((cc = mxfer(up, tmout)) > 0)
gotem += cc;
/* clean up any stale route pcbs */
#if !defined(IMA_MPP)
pcbp = ttlist->tt_link;
while (pcbp != ttlist)
if (pcbp->tt_state == TTDEAD) {
if (pvmdebmask & PDMROUTE) {
pvmlogprintf("mroute() freeing pcb t%x\n", pcbp->tt_tid);
}
pcbp = pcbp->tt_link;
ttpcb_delete(pcbp->tt_rlink);
} else
pcbp = pcbp->tt_link;
#endif /* IMA_MPP */
alreadyhere = 0;
return (cc < 0 ? cc : gotem);
}
/* msendrecv()
*
* Single op to send a system message (usually to our pvmd) and get
* the reply.
* Returns message handle or negative if error.
*/
int
msendrecv(other, tag, context)
int other; /* dst, src tid */
int tag; /* message tag */
int context; /* context to use for the message */
{
static int nextwaitid = 1;
int cc;
struct pmsg *up;
if (!pvmsbuf)
return PvmNoBuf;
/* send message to other */
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("msendrecv() to t%x tag %s\n", other,
pvmnametag(tag, (int *)0));
}
pvmsbuf->m_wid = nextwaitid++;
pvmsbuf->m_ctx = context;
/*
pvmlogprintf("msendrecv() ctx=%d\n", pvmsbuf->m_ctx);
*/
if ((cc = mroute(pvmsbuf->m_mid, other, tag, (struct timeval *)0)) < 0)
return cc;
/* recv matching message from other */
for (up = pvmrxlist->m_link; 1; up = up->m_link) {
if (up == pvmrxlist) {
up = up->m_rlink;
if ((cc = mroute(0, 0, 0, (struct timeval *)0)) < 0)
return cc;
up = up->m_link;
if (up == pvmrxlist)
continue;
}
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("msendrecv() from t%x tag %s\n",
up->m_src, pvmnametag(up->m_tag, (int *)0));
}
if (up->m_src == other && up->m_tag == tag) {
if (up->m_ctx != pvmsbuf->m_ctx) {
pvmlogprintf("msendrecv() tag %s, context doesn't match\n",
pvmnametag(up->m_tag, (int *)0));
}
if (up->m_wid != pvmsbuf->m_wid) {
pvmlogprintf("msendrecv() tag %s, waitid doesn't match\n",
pvmnametag(up->m_tag, (int *)0));
}
break;
}
}
LISTDELETE(up, m_link, m_rlink);
if (pvmrbuf)
umbuf_free(pvmrbuf);
pvmrbuf = 0;
if (cc = pvm_setrbuf(up->m_mid))
return cc;
return up->m_mid;
}
static int
int_compare(i, j)
#if defined (WIN32) || defined(IMA_SGI) || defined(IMA_SGI5) || defined(IMA_SGI6) || defined(IMA_SGI64) || defined(IMA_SGIMP) || defined(IMA_SGIMP6) || defined(IMA_SGIMP64) || defined(IMA_SUN4SOL2)
const void *i, *j;
#else
void *i, *j;
#endif
{
return *((int *)i) - *((int *)j);
}
/* pvmmcast()
*
* Multicast a message to a list of destinations.
*/
int
pvmmcast(mid, tids, count, tag)
int mid;
int *tids;
int count;
int tag;
{
static struct timeval ztv = { 0, 0 };
int *dst;
int i, j;
int cc = 0;
struct ttpcb *pcbp;
int sbf;
/*
* make sorted list of destinations
*/
dst = TALLOC(count, int, "mcal");
BCOPY(tids, dst, count * sizeof(int));
qsort(
(void *)dst,
(size_t)count,
sizeof(int),
int_compare);
/*
* remove duplicates
*/
j = 0;
for (i = 1; i < count; i++)
if (dst[i] != dst[j])
dst[++j] = dst[i];
count = j + 1;
/* Need to set the context of the message buffer */
pvmsbuf->m_ctx = pvmmyctx;
/*
* remove self from list
* send over any direct routes we have
*
* XXX we should attempt new routes here if RouteDirect is on.
*/
j = 0;
for (i = 0; i < count; i++) {
if (dst[i] == pvmmytid)
continue;
if ((pcbp = ttpcb_find(dst[i])) && pcbp->tt_state == TTOPEN)
mroute(pvmsbuf->m_mid, dst[i], tag, &ztv);
else
dst[j++] = dst[i];
}
count = j; /* destinations still to go */
/*
* send rest of addresses to pvmd for distribution
*/
if (count > 0) {
/* announce multicast address list to pvmd */
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
pvm_pkint(&count, 1, 1);
pvm_pkint(dst, count, 1);
sbf = pvm_setsbuf(sbf);
if ((cc = mroute(sbf, TIDPVMD, TM_MCA, &ztv)) > 0)
cc = 0;
pvm_freebuf(sbf);
/* send message */
if (cc >= 0)
if ((cc = mroute(pvmsbuf->m_mid, pvmmytid | TIDGID, tag, &ztv)) > 0)
cc = 0;
}
PVM_FREE(dst);
return cc;
}
/* mksocs()
*
* Make socket for talking to pvmd. Connect to address advertized
* in pvmd socket-name file.
*
* Returns 0 if okay, else error code.
*/
static int
mksocs()
{
char buf[128];
#ifndef WIN32
int d;
#else
HANDLE d;
#endif
#ifdef SOCKLENISUINT
#if defined(IMA_AIX4SP2) || defined(IMA_AIX5SP2)
unsigned int oslen;
#else
size_t oslen;
#endif
#else
int oslen;
#endif
int n;
int try;
char *p;
#ifndef NOUNIXDOM
struct sockaddr_un uns;
#endif
if (topvmd)
return 0;
/*
* get addr of pvmd, make socket to talk.
* first try envar PVMSOCK, then try sockaddr file.
*/
if (!(p = getenv("PVMSOCK"))) {
if (!(p = pvmdsockfile())) {
pvmlogerror("mksocs() pvmdsockfile() failed\n");
goto bail;
}
#ifndef WIN32
if ((d = open(p, O_RDONLY, 0)) == -1) {
#else
d = win32_open_file(p);
if (d == (HANDLE) -2) {
system_loser_win=TRUE;
d = (HANDLE) _open(p,O_RDONLY,0);
}
if (d== (HANDLE) -1) {
#endif
pvmlogperror(p);
goto bail;
}
#ifdef WIN32
if (!system_loser_win) {
n = win32_read_file(d,buf,sizeof(buf));
win32_close_file(d);
}
else {
n = (int)_read((int)d, (void *)buf, (unsigned int)sizeof(buf));
(void)_close((int)d);
}
#else
n = read(d, buf, sizeof(buf));
(void)close(d);
#endif
if (n == -1) {
pvmlogperror("mksocs() read addr file");
goto bail;
}
if (n == 0) {
pvmlogerror("mksocs() read addr file: wrong length read\n");
goto bail;
}
buf[n] = 0;
p = buf;
}
FD_ZERO(&pvmrfds);
/*
FD_ZERO(&pvmwfds);
*/
pvmnfds = 0;
topvmd = ttpcb_new();
topvmd->tt_tid = TIDPVMD;
if (p[0] == '/') {
#ifdef NOUNIXDOM
pvmlogerror("mksocs() no support for Unix domain socket\n");
goto bail;
#else /*NOUNIXDOM*/
if ((topvmd->tt_fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
pvmlogperror("mksocs() socket");
goto bail;
}
/*
* XXX these 2 loops are a hack -
* XXX it keeps too many tasks from overflowing the pvmd listen q.
* XXX maybe the pvmd should make a socket pair for the task instead.
*/
try = 5;
while (1) {
BZERO((char*)&uns, sizeof(uns));
uns.sun_family = AF_UNIX;
strcpy(uns.sun_path, p);
n = sizeof(uns);
if (connect(topvmd->tt_fd, (struct sockaddr*)&uns, n) == -1) {
if (--try <= 0) {
pvmlogperror("mksocs() connect");
pvmlogprintf("\tsocket address tried: %s\n",p);
goto bail;
}
pvmsleep(1); /* XXX hmm again */
} else
break;
}
#endif /*NOUNIXDOM*/
} else {
if ((topvmd->tt_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
pvmlogperror("mksocs() socket");
goto bail;
}
try = 5;
while (1) { /* goto is at the root of all good programming style */
hex_inadport(p, &topvmd->tt_osad);
topvmd->tt_osad.sin_family = AF_INET;
n = sizeof(topvmd->tt_osad);
if (connect(topvmd->tt_fd, (struct sockaddr*)&topvmd->tt_osad, n)
== -1) {
if (--try <= 0) {
pvmlogperror("mksocs() connect");
pvmlogprintf("\tsocket address tried: %s\n",p);
goto bail;
}
pvmsleep(1); /* XXX hmm again */
} else
break;
}
#ifndef NOSOCKOPT
#ifdef WIN32
d = (HANDLE)1;
#else
d = 1;
#endif
if (setsockopt(topvmd->tt_fd, IPPROTO_TCP, TCP_NODELAY, (char*)&d, sizeof(int))
== -1) {
pvmlogperror("mksocs() setsockopt");
goto bail;
}
#endif
oslen = sizeof(topvmd->tt_sad);
if (getsockname(topvmd->tt_fd, (struct sockaddr*)&topvmd->tt_sad, &oslen) == -1) {
pvmlogperror("mksocs() getsockname");
goto bail;
}
}
topvmd->tt_state = TTOPEN;
pvm_fd_add(topvmd->tt_fd, 1);
return 0;
bail:
if (topvmd)
ttpcb_delete(topvmd);
topvmd = 0;
return PvmSysErr;
}
/* unmksocs()
*
* Close socket to local pvmd.
*/
static int
unmksocs()
{
if (!topvmd)
return 1;
while (ttlist->tt_link != ttlist)
ttpcb_delete(ttlist->tt_link);
ttpcb_delete(topvmd);
topvmd = 0;
return 0;
}
/* pvmbeatask()
*
* Initialize libpvm, config process as a task.
* This is called as the first step of each libpvm function so no
* explicit initialization is required.
*
* Makes socket to talk to local pvmd, connects to pvmd, trades
* config information and authentication.
*
* Returns 0 if okay, else error code.
*/
int
pvmbeatask()
{
int sbf = 0, rbf = 0; /* saved rx and tx message handles */
int prver; /* protocol version */
int cookie; /* cookie assigned by pvmd for ident */
int cc;
char **ep=0;
char authfn[PVMTMPNAMLEN]; /* auth file name */
int authfd = -1; /* auth fd to validate pvmd ident */
int i;
char buf[16]; /* for converting sockaddr */
char *p;
struct pvmminfo minfo;
int outtid, outctx, outtag;
int trctid, trcctx, trctag;
int need_trcinfo = 0;
int new_tracer = 0;
char tmask[ 2 * TEV_MASK_LENGTH ];
int tbuf, topt;
int mid;
#ifdef IMA_BEOLIN
struct hostent *hostaddr;
char namebuf[128];
#endif
TEV_DECLS
#ifdef WIN32
/* WSAStartup has to be called before any socket command
can be executed. Why ? Ask Bill
*/
for (ep=environ;*ep;ep++)
pvmputenv(*ep);
if (WSAStartup(0x0101, &WSAData) != 0)
{
fprintf(stderr,"\nWSAStartup() failed\n");
ExitProcess(1);
}
/* setsockopt(INVALID_SOCKET,SOL_SOCKET,SO_OPENTYPE, */
/* (char *)&nAlert,sizeof(int)); */
#endif
/*
pvm_setopt(PvmDebugMask, -1);
*/
if (pvmmytid != -1)
return 0;
pvmmydsig = pvmgetdsig();
TEV_EXCLUSIVE;
authfn[0] = 0;
#ifndef WIN32
pvmmyupid = getpid();
#else
pvmmyupid = _getpid(); /* .... */
#endif
/*
* get expected pid from environment in case we were started by
* the pvmd and someone forked again
*/
if (p = getenv("PVMEPID"))
cookie = atoi(p);
else
cookie = 0;
/* if ((pvm_useruid = getuid()) == -1) */
#ifndef WIN32
if ((pvm_useruid = geteuid()) == -1) {
pvmlogerror("can't getuid()\n");
cc = PvmSysErr;
goto bail2;
}
#else
if (!username)
username = MyGetUserName();
#endif
if (p = getenv("PVMTASKDEBUG")) {
pvmdebmask = pvmstrtoi(p);
if (pvmdebmask) {
pvmlogprintf("task debug mask is 0x%x (%s)\n",
pvmdebmask, debug_flags(pvmdebmask));
}
}
#ifndef IMA_MPP
if (cc = mksocs()) /* make socket to talk to pvmd */
goto bail2;
#endif
/*
* initialize received-message list
*/
pvmrxlist = pmsg_new(1);
BZERO((char*)pvmrxlist, sizeof(struct pmsg));
pvmrxlist->m_link = pvmrxlist->m_rlink = pvmrxlist;
ttlist = TALLOC(1, struct ttpcb, "tpcb");
BZERO((char*)ttlist, sizeof(struct ttpcb));
ttlist->tt_link = ttlist->tt_rlink = ttlist;
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
rbf = pvm_setrbuf(0);
/*
* create empty t-auth file that pvmd must write and we'll check later
*/
#ifndef IMA_MPP
#ifndef NOPROT
(void)PVMTMPNAMFUN(authfn);
#ifdef IMA_OS2
if ((authfd = open(authfn, O_RDWR|O_CREAT|O_TRUNC, 0600)) == -1) {
#else
if ((authfd = open(authfn, O_RDONLY|O_CREAT|O_TRUNC, 0600)) == -1) {
#endif
pvmlogperror(authfn);
pvmlogerror("pvmbeatask() can't creat t-auth file\n");
cc = PvmSysErr;
goto bail2;
}
#endif
/*
* send first connect message to pvmd
*/
mxfersingle = 1;
prver = TDPROTOCOL;
pvm_pkint(&prver, 1, 1);
pvm_pkstr(authfn);
if ((cc = msendrecv(TIDPVMD, TM_CONNECT, SYSCTX_TM)) <= 0)
goto bail;
pvm_upkint(&prver, 1, 1);
if (prver != TDPROTOCOL) {
pvmlogprintf("pvmbeatask() t-d protocol mismatch (%d/%d)\n",
TDPROTOCOL, prver);
cc = PvmSysErr;
goto bail;
}
pvm_upkint(&cc, 1, 1);
if (!cc) {
pvmlogerror("pvmbeatask() pvmd refuses connection\n");
goto bail;
}
/*
* check our t-auth file; write in pvmd d-auth file
*/
#ifndef NOPROT
#ifdef IMA_BEOLIN
sleep(15); /* because of race condition over NFS--yuck! (PLS) */
#endif
if ((cc = read(authfd, (char*)&cc, 1)) == -1) {
pvmlogperror("pvmbeatask() read authfile");
cc = PvmSysErr;
goto bail;
}
if (cc != 1) {
pvmlogerror("pvmbeatask() pvmd didn't validate itself\n");
cc = PvmSysErr;
goto bail;
}
(void)close(authfd);
(void)unlink(authfn);
pvm_upkstr(authfn);
if ((authfd = open(authfn, O_WRONLY, 0)) == -1) {
pvmlogperror(authfn);
pvmlogerror("pvmbeatask() failed to open d-auth file\n");
authfn[0] = 0;
cc = PvmSysErr;
goto bail;
}
cc = write(authfd, authfn, 1);
(void)close(authfd);
if (cc != 1) {
if (cc == -1)
pvmlogperror(authfn);
pvmlogerror("pvmbeatask() can't write d-auth file\n");
authfn[0] = 0;
cc = PvmSysErr;
goto bail;
}
authfd = -1;
authfn[0] = 0;
#endif
/*
* send second connect message to pvmd
*/
pvm_initsend(PvmDataFoo);
pvm_pkint(&pvmmyupid, 1, 1);
pvm_pkint(&cookie, 1, 1);
if ( pvmmytaskname != NULL )
pvm_pkstr(pvmmytaskname);
if ((cc = msendrecv(TIDPVMD, TM_CONN2, SYSCTX_TM)) <= 0)
goto bail;
pvm_upkint(&cc, 1, 1);
if (!cc) {
pvmlogerror("pvmbeatask() pvmd refuses connection\n");
goto bail;
}
#endif /* IMA_MPP */
#ifdef IMA_MPP
pvm_mpp_beatask(&pvmmytid, &pvmmyptid, &outtid, &outctx,
&outtag, &trctid, &trcctx,&trctag, &pvmudpmtu,
&pvmschedtid, &topvmd);
#else
pvm_upkint(&pvmmytid, 1, 1);
pvm_upkint(&pvmmyptid, 1, 1);
pvm_upkint(&outtid, 1, 1);
pvm_upkint(&outctx, 1, 1);
pvm_upkint(&outtag, 1, 1);
#endif
if (!pvmtrc.outtid) {
pvmtrc.outtid = outtid;
pvmtrc.outctx = outctx;
pvmtrc.outtag = outtag;
pvmctrc.outtid = pvmtrc.outtid;
pvmctrc.outctx = pvmtrc.outctx;
pvmctrc.outtag = pvmtrc.outtag;
}
#ifndef IMA_MPP
pvm_upkint(&trctid, 1, 1);
pvm_upkint(&trcctx, 1, 1);
pvm_upkint(&trctag, 1, 1);
#endif
if (!pvmtrc.trctid) {
pvmtrc.trctid = trctid;
pvmtrc.trcctx = trcctx;
pvmtrc.trctag = trctag;
pvmctrc.trctid = pvmtrc.trctid;
pvmctrc.trcctx = pvmtrc.trcctx;
pvmctrc.trctag = pvmtrc.trctag;
new_tracer++;
}
if (p = getenv("PVMCTX"))
pvmmyctx = pvmstrtoi(p);
/*
if (pvmmyctx == 0)
pvmmyctx = pvm_newcontext();
*/
/* get trace mask from envar or zero it */
if ( (p = getenv("PVMTMASK")) ) {
if ( strlen(p) + 1 == TEV_MASK_LENGTH )
BCOPY(p, pvmtrc.tmask, TEV_MASK_LENGTH);
else
TEV_MASK_INIT(pvmtrc.tmask);
} else {
TEV_MASK_INIT(pvmtrc.tmask);
if ( new_tracer ) need_trcinfo++;
}
BCOPY(pvmtrc.tmask, pvmctrc.tmask, TEV_MASK_LENGTH);
/* get trace buffering from envar */
if ((p = getenv("PVMTRCBUF")))
pvmtrc.trcbuf = atoi( p );
else {
pvmtrc.trcbuf = 0;
if ( new_tracer ) need_trcinfo++;
}
pvmctrc.trcbuf = pvmtrc.trcbuf;
/* get trace options from envar */
if ((p = getenv("PVMTRCOPT")))
pvmtrc.trcopt = atoi( p );
else {
pvmtrc.trcopt = 0;
if ( new_tracer ) need_trcinfo++;
}
pvmctrc.trcopt = pvmtrc.trcopt;
#ifndef IMA_MPP
pvm_upkint(&pvmudpmtu, 1, 1);
#endif
pvmfrgsiz = pvmudpmtu;
#ifndef IMA_MPP
pvm_upkint(&i, 1, 1); /* XXX data signature */
pvm_upkstr(buf);
#ifdef IMA_BEOLIN
if (gethostname(namebuf, sizeof(namebuf)-1) == -1) {
pvmlogerror("pvmbeatask() can't gethostname()\n");
hex_inadport(buf, &pvmourinet);
} else {
/* got name, now get addr */
if (!(hostaddr = gethostbyname( namebuf ))) {
pvmlogprintf( "pvmbeatask() can't gethostbyname() for %s\n",
namebuf );
hex_inadport(buf, &pvmourinet);
} else {
/* got addr, now save it */
BCOPY( hostaddr->h_addr_list[0],
(char*)&pvmourinet.sin_addr,
sizeof(struct in_addr));
}
}
#else
hex_inadport(buf, &pvmourinet);
#endif
pvmourinet.sin_family = AF_INET;
pvmourinet.sin_port = 0;
pvm_upkint(&pvmschedtid, 1, 1);
#endif
wait_init(pvmmytid, TIDLOCAL);
BZERO(&minfo, sizeof(minfo));
minfo.src = -1;
minfo.ctx = SYSCTX_TC;
minfo.tag = TC_CONREQ;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_conreq);
minfo.tag = TC_CONACK;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_conack);
minfo.tag = TC_TASKEXIT;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_taskexit);
minfo.tag = TC_NOOP;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_noop);
minfo.tag = TC_SETTRACE;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrace);
minfo.tag = TC_SETTRCBUF;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrcbuf);
minfo.tag = TC_SETTRCOPT;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrcopt);
minfo.tag = TC_SETTMASK;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settmask);
minfo.tag = TC_SIBLINGS;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_siblings);
pvm_freebuf(pvm_setrbuf(rbf));
pvm_freebuf(pvm_setsbuf(sbf));
mxfersingle = 0;
if ( need_trcinfo )
{
rbf = pvm_setrbuf( 0 );
if ( pvm_recvinfo( PVMTRACERCLASS, 0, PvmMboxDefault ) > 0 )
{
pvm_upkint(&trctid, 1, 1);
pvm_upkint(&trcctx, 1, 1);
pvm_upkint(&trctag, 1, 1);
pvm_upkint(&outctx, 1, 1); /* unused here */
pvm_upkint(&outtag, 1, 1); /* unused here */
pvm_upkstr(tmask);
pvm_upkint(&tbuf, 1, 1);
pvm_upkint(&topt, 1, 1);
if ( pvmtrc.trctid == trctid && pvmtrc.trcctx == trcctx
&& pvmtrc.trctag == trctag )
{
if ( strlen(tmask) + 1 == TEV_MASK_LENGTH ) {
BCOPY(tmask, pvmtrc.tmask, TEV_MASK_LENGTH);
BCOPY(pvmtrc.tmask, pvmctrc.tmask, TEV_MASK_LENGTH);
}
pvmtrc.trcbuf = tbuf;
pvmctrc.trcbuf = pvmtrc.trcbuf;
pvmtrc.trcopt = topt;
pvmctrc.trcopt = pvmtrc.trcopt;
}
pvm_freebuf(pvm_setrbuf(rbf));
}
else
pvm_setrbuf(rbf);
}
tev_init();
if (TEV_AMEXCL) {
TEV_ENDEXCL;
}
return 0;
bail:
if (pvm_getrbuf() > 0)
pvm_freebuf(pvm_getrbuf());
if (pvm_getsbuf() > 0)
pvm_freebuf(pvm_getsbuf());
pvm_setrbuf(rbf);
pvm_setsbuf(sbf);
#ifndef IMA_MPP
if (authfd != -1)
(void)close(authfd);
if (authfn[0])
(void)unlink(authfn);
unmksocs();
#endif
bail2:
if (TEV_AMEXCL) {
TEV_ENDEXCL;
}
return cc;
}
int
pvmendtask()
{
if (pvmmytid != -1) {
#if !defined(IMA_MPP)
unmksocs();
#endif
pvmmytid = -1;
pvmtrc.trctid = 0;
}
return 0;
}
void
check_for_exit(src)
int src;
{
}
/************************
** Libpvm Functions **
** **
************************/
int
pvm_getfds(fds) /* XXX this function kinda sucks */
int **fds; /* fd list return */
{
static int *fdlist = 0;
static int fdlen = 0;
int cc;
int nfds;
struct ttpcb *pcbp;
TEV_DECLS
if (TEV_EXCLUSIVE) {
if (TEV_DO_TRACE(TEV_GETFDS,TEV_EVENT_ENTRY))
TEV_FIN;
}
if (!(cc = BEATASK)) {
nfds = 1;
for (pcbp = ttlist->tt_link; pcbp != ttlist; pcbp = pcbp->tt_link)
if (pcbp->tt_state == TTOPEN || pcbp->tt_state == TTGRNWAIT)
nfds++;
if (fdlen < nfds) {
fdlen = (nfds * 3) / 2 + 1;
if (fdlist)
fdlist = TREALLOC(fdlist, fdlen, int);
else
fdlist = TALLOC(fdlen, int, "fdlist");
}
fdlist[0] = topvmd->tt_fd;
nfds = 1;
for (pcbp = ttlist->tt_link; pcbp != ttlist; pcbp = pcbp->tt_link)
if (pcbp->tt_state == TTOPEN || pcbp->tt_state == TTGRNWAIT)
fdlist[nfds++] = pcbp->tt_fd;
*fds = fdlist;
cc = nfds;
}
if (TEV_AMEXCL) {
if (TEV_DO_TRACE(TEV_GETFDS,TEV_EVENT_EXIT)) {
TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
if (cc > 0) {
TEV_PACK_INT( TEV_DID_FDS, TEV_DATA_ARRAY,
(int *)fdlist, nfds, 1 );
}
TEV_FIN;
}
TEV_ENDEXCL;
}
return (cc < 0 ? lpvmerr("pvm_getfds", cc) : cc);
}
int
pvm_start_pvmd(argc, argv, block)
int argc; /* number of args to pass to pvmd (>= 0) */
char **argv; /* args for pvmd or null */
int block; /* if true, don't return until add hosts are started */
{
char *sfn;
struct stat sb;
int cc;
char *fn; /* file to exec */
char **av;
int pfd[2];
int n;
FILE *ff;
char buf[128];
#ifdef WIN32
char *pathending = "\\lib\\win32\\pvmd3.exe";
SECURITY_ATTRIBUTES saPipe;
PROCESS_INFORMATION pi;
STARTUPINFO si; /* for CreateProcess call */
HANDLE pvmdid;
int i = 0;
char cmd[256];
#endif
TEV_DECLS;
if (TEV_EXCLUSIVE) {
if (pvmmytid != -1
&& TEV_DO_TRACE(TEV_START_PVMD,TEV_EVENT_ENTRY)) {
TEV_PACK_INT( TEV_DID_BF, TEV_DATA_SCALAR, &block, 1, 1 );
TEV_PACK_STRING( TEV_DID_AS, TEV_DATA_ARRAY,
argv, argc, 1 );
TEV_FIN;
}
}
if (argc < 0 || !argv)
argc = 0;
#ifndef WIN32
if ((pvm_useruid = getuid()) == -1) {
pvmlogerror("can't getuid()\n");
cc = PvmSysErr;
goto bail;
}
#else
if (!username)
username = MyGetUserName();
#endif
if (!(sfn = pvmdsockfile())) {
pvmlogerror("pvm_start_pvmd() pvmdsockfile() failed\n");
cc = PvmSysErr;
goto bail;
}
if (stat(sfn, &sb) != -1) {
cc = PvmDupHost;
goto bail;
}
#ifdef WIN32
cc = PvmSysErr;
fn = malloc(128 * sizeof(char));
strcpy(fn,(char *) pvmgetroot());
strcat(fn,(char *) pathending);
av = TALLOC(argc + 2, char *, "argv");
if (argc > 0)
BCOPY((char *)&argv[0], (char *)&av[1], argc * sizeof(char*));
av[0] = fn;
if (stat(av[0],&sb) == -1) {
fprintf(stderr,"Couldn't find daemon executable !\n",
"It should be stored in %PVM_ROOT%\\lib\\win32");
goto bail;
}
av[argc + 1] = 0;
strcpy(cmd,"");
strcat(cmd, "pvmd3.exe ");
for (i=1;i <= argc ;i++) {
strcat(cmd,av[i]);
strcat(cmd," ");
}
av[argc + 1] = 0;
/* pvmdid gives us back the process handle
but it is an int anyway */
saPipe.nLength = sizeof(SECURITY_ATTRIBUTES);
saPipe.lpSecurityDescriptor = NULL;
saPipe.bInheritHandle = FALSE;
memset(&si, 0, sizeof(si));
si.cb = sizeof(si);
pvmdid = (HANDLE) CreateProcess(
av[0], /* filename */
cmd, /* full command line for child */
/* for example : hostfiles */
NULL, /* process security descriptor */
NULL, /* thread security descriptor */
FALSE, /* inherit handles? */
DETACHED_PROCESS, /* creation flags */
NULL, /* inherited environment address */
NULL, /* startup dir */
/* NULL = start in current */
&si, /* pointer to startup info (input) */
&pi); /* pointer to process info (output) */
if (!pvmdid) {
fprintf(stderr,"CreateProcess() failed \n");
fprintf(stderr,"enough memory ?\n");
goto bail;
return 1;
}
CloseHandle(pi.hThread);
CloseHandle(pi.hProcess);
PVM_FREE(av);
while (stat(sfn, &sb) == -1)
Sleep(5000); /* what a hack */
#else
#ifdef IMA_TITN
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pfd) == -1)
#else
if (pipe(pfd) == -1)
#endif
{
cc = PvmSysErr;
goto bail;
}
fn = pvmgetpvmd();
av = TALLOC(argc + 2, char *, "argv");
if (argc > 0)
BCOPY((char *)&argv[0], (char *)&av[1], argc * sizeof(char*));
av[0] = fn;
av[argc + 1] = 0;
if (!fork()) {
(void)close(pfd[0]);
/* fork again so the pvmd is not the child - won't have to wait() for it */
if (!fork()) {
if (pfd[1] != 1)
dup2(pfd[1], 1);
for (n = getdtablesize(); n-- > 0; )
if (n != 1)
(void)close(n);
(void)open("/dev/null", O_RDONLY, 0); /* should be 0 */
(void)open("/dev/null", O_WRONLY, 0); /* should be 2 */
(void)signal(SIGINT, SIG_IGN);
(void)signal(SIGQUIT, SIG_IGN);
#ifndef IMA_OS2
(void)signal(SIGTSTP, SIG_IGN);
#endif
execvp(av[0], av);
}
_exit(0);
}
(void)close(pfd[1]);
(void)wait(0);
PVM_FREE(av);
if (!(ff = fdopen(pfd[0], "r"))) {
cc = PvmSysErr;
(void)close(pfd[0]);
goto bail;
}
strcpy(buf, "PVMSOCK=");
n = strlen(buf);
if (!fgets(buf + n, sizeof(buf) - n - 1, ff)) {
cc = PvmCantStart;
fclose(ff);
goto bail;
}
fclose(ff);
if (strlen(buf + n) < 2) {
cc = PvmCantStart;
goto bail;
}
n = strlen(buf);
if (buf[n - 1] == '\n')
buf[n - 1] = 0;
pvmputenv(STRALLOC(buf));
/* fprintf(stderr, "pvm_start_pvmd: %s\n", buf); */
#endif
if (cc = BEATASK)
goto bail;
if (block) {
struct pvmhostinfo *hip;
int t = 1;
pvm_config((int*)0, (int*)0, &hip);
while ((cc = pvm_addhosts(&hip[0].hi_name, 1, (int*)0)) == PvmAlready) {
pvmsleep(t);
if (t < 8)
t *= 2;
}
if (cc != PvmDupHost)
goto bail;
cc = BEATASK;
}
bail:
if (TEV_AMEXCL) {
if (TEV_DO_TRACE(TEV_START_PVMD,TEV_EVENT_EXIT)) {
TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
TEV_FIN;
}
TEV_ENDEXCL;
}
return (cc < 0 ? lpvmerr("pvm_start_pvmd", cc) : 0);
}
#ifndef PVM_SHMD
int
pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen)
int tid;
int tag;
void *cp;
int len;
int dt;
int *rtid;
int *rtag;
int *rlen;
{
int nb, mc, src;
int rbf;
int cc = 0;
long ad;
TEV_DECLS
if (TEV_EXCLUSIVE) {
if (TEV_DO_TRACE(TEV_PRECV,TEV_EVENT_ENTRY)) {
TEV_PACK_INT( TEV_DID_RST, TEV_DATA_SCALAR, &tid, 1, 1 );
TEV_PACK_INT( TEV_DID_RMC, TEV_DATA_SCALAR, &tag, 1, 1 );
TEV_PACK_INT( TEV_DID_RCX, TEV_DATA_SCALAR,
&pvmmyctx, 1, 1 );
ad = (long)cp;
TEV_PACK_LONG( TEV_DID_PDA, TEV_DATA_SCALAR, &ad, 1, 1 );
TEV_PACK_INT( TEV_DID_PC, TEV_DATA_SCALAR, &len, 1, 1 );
TEV_PACK_INT( TEV_DID_PDT, TEV_DATA_SCALAR, &dt, 1, 1 );
TEV_FIN;
}
}
switch (dt) {
case PVM_BYTE:
len *= sizeof(char);
break;
case PVM_SHORT:
case PVM_USHORT:
len *= sizeof(short);
break;
case PVM_INT:
case PVM_UINT:
len *= sizeof(int);
break;
case PVM_LONG:
case PVM_ULONG:
len *= sizeof(long);
break;
case PVM_FLOAT:
len *= sizeof(float);
break;
case PVM_CPLX:
len *= sizeof(float) * 2;
break;
case PVM_DOUBLE:
len *= sizeof(double);
break;
case PVM_DCPLX:
len *= sizeof(double) * 2;
break;
case PVM_STR:
cc = PvmNotImpl;
break;
default:
cc = PvmBadParam;
break;
}
if (!cc) {
#if !defined(IMA_MPP)
rbf = pvm_setrbuf(0);
cc = pvm_recv(tid, tag);
if (cc > 0) {
pvm_bufinfo(cc, &nb, &mc, &src);
if (rlen)
*rlen = nb;
if (nb < len)
len = nb;
if (rtag)
*rtag = mc;
if (rtid)
*rtid = src;
pvm_upkbyte((char *)cp, len, 1);
pvm_freebuf(cc);
cc = 0;
}
pvm_setrbuf(rbf);
#else
cc = pvm_mppprecv(tid, tag, cp ,len, dt, rtid, rtag, rlen);
#endif
}
if (TEV_AMEXCL) {
if (TEV_DO_TRACE(TEV_PRECV,TEV_EVENT_EXIT)) {
TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
if ( cc < 0 )
nb = mc = src = -1;
TEV_PACK_INT( TEV_DID_MNB, TEV_DATA_SCALAR, &nb, 1, 1 );
TEV_PACK_INT( TEV_DID_MC, TEV_DATA_SCALAR, &mc, 1, 1 );
TEV_PACK_INT( TEV_DID_MCX, TEV_DATA_SCALAR,
&pvmmyctx, 1, 1 );
TEV_PACK_INT( TEV_DID_SRC, TEV_DATA_SCALAR, &src, 1, 1 );
TEV_FIN;
}
TEV_ENDEXCL;
}
if (cc < 0)
lpvmerr("pvm_precv", cc);
return cc;
}
#else /* PVM_SHMD */
/* Do the PVM_SHMD version of precv */
int
pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen)
int tid;
int tag;
void *cp;
int len;
int dt;
int *rtid;
int *rtag;
int *rlen;
{
return (shmd_pvm_precv (tid, tag, cp, len, dt, rtid, rtag, rlen));
}
#endif /* PVM_SHMD */
#ifndef PVM_SHMD
int
pvm_psend(tid, tag, cp, len, dt)
int tid;
int tag;
void *cp;
int len;
int dt;
{
int sbf;
int cc = 0;
long ad;
TEV_DECLS
if (TEV_EXCLUSIVE) {
if (TEV_DO_TRACE(TEV_PSEND,TEV_EVENT_ENTRY)) {
TEV_PACK_INT( TEV_DID_DST, TEV_DATA_SCALAR, &tid, 1, 1 );
TEV_PACK_INT( TEV_DID_MC, TEV_DATA_SCALAR, &tag, 1, 1 );
TEV_PACK_INT( TEV_DID_MCX, TEV_DATA_SCALAR,
&pvmmyctx, 1, 1 );
ad = (long)cp;
TEV_PACK_LONG( TEV_DID_PDA, TEV_DATA_SCALAR, &ad, 1, 1 );
TEV_PACK_INT( TEV_DID_PC, TEV_DATA_SCALAR, &len, 1, 1 );
TEV_PACK_INT( TEV_DID_PDT, TEV_DATA_SCALAR, &dt, 1, 1 );
TEV_FIN;
}
}
switch (dt) {
case PVM_BYTE:
len *= sizeof(char);
break;
case PVM_SHORT:
case PVM_USHORT:
len *= sizeof(short);
break;
case PVM_INT:
case PVM_UINT:
len *= sizeof(int);
break;
case PVM_LONG:
case PVM_ULONG:
len *= sizeof(long);
break;
case PVM_FLOAT:
len *= sizeof(float);
break;
case PVM_CPLX:
len *= sizeof(float) * 2;
break;
case PVM_DOUBLE:
len *= sizeof(double);
break;
case PVM_DCPLX:
len *= sizeof(double) * 2;
break;
case PVM_STR:
cc = PvmNotImpl;
break;
default:
cc = PvmBadParam;
break;
}
#if !defined(IMA_MPP)
if (!cc) {
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataInPlace));
pvm_pkbyte((char *)cp, len, 1);
if ((cc = pvm_send(tid, tag)) > 0)
cc = 0;
pvm_freebuf(pvm_setsbuf(sbf));
}
#else
cc = pvm_mpppsend((char *) cp, len, tid, tag);
#endif
if (TEV_AMEXCL) {
if (TEV_DO_TRACE(TEV_PSEND,TEV_EVENT_EXIT)) {
TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
TEV_FIN;
}
TEV_ENDEXCL;
}
if (cc < 0)
lpvmerr("pvm_psend", cc);
return cc;
}
#else
/* i.e. PVM_SHMD */
int
pvm_psend(tid, tag, cp, len, dt)
int tid;
int tag;
void *cp;
int len;
int dt;
{
/* Go do the shmd thing instead */
return (shmd_pvm_psend(tid, tag, cp, len, dt));
}
#endif /* PVM_SHMD */
/* ----- ogm_complete ------ */
int
ogm_complete( ogmlist )
struct msgid **ogmlist;
{
struct msgid *mp, *oldmp, *head;
#if defined(IMA_MPP)
if (!(mp = *ogmlist) )
return TRUE; /* there is nothing to check */
/* go through the list message ids. checking if the message has
complete. If the list is empty at the end, then return that
message has finished */
head = mp;
mp = mp->ms_link;
while (mp != head)
{
if ((*mp->mfunc->msgdone)(0, &(mp->id), mp->info))
{
mp = mp->ms_link;
oldmp = mp->ms_rlink;
LISTDELETE(oldmp,ms_link, ms_rlink)
msgid_free(oldmp);
}
else
mp = mp->ms_link;
}
/* mp now points to the head of the list */
if ((*mp->mfunc->msgdone)(0, &(mp->id), mp->info))
{
if (mp->ms_link == mp) /* the only one left */
{
msgid_free(mp);
*ogmlist = (struct msgid *) NULL;
return TRUE;
}
else
{
mp = mp->ms_link;
oldmp = mp->ms_rlink;
LISTDELETE(oldmp, ms_link, ms_rlink);
msgid_free(oldmp);
*ogmlist = mp; /* new head of the message id list */
}
}
return FALSE;
#else
return TRUE; /* for workstations, the write is sychronous */
#endif
}
syntax highlighted by Code2HTML, v. 0.9.1