/* factorMPI.c */
#include "../spoolesMPI.h"
#include "../../timings.h"
#define MYDIAGNOSTIC 0
#define AGGREGATE_CHV 1
#define POSTPONED_NOTIFICATION 2
#define POSTPONED_CHV 3
#define AGGREGATE_CHV_TAG(K) (firsttag + (K))
#define POSTPONED_NOTIFICATION_TAG(K) (firsttag + nfront + (K))
#define POSTPONED_CHV_TAG(K) (firsttag + 2*nfront + 1 + (K))
#define ERROR_TAG (firsttag + 3*nfront + 2)
/*--------------------------------------------------------------------*/
typedef struct _FactorMsg FactorMsg ;
struct _FactorMsg {
int info[3] ;
void *base ;
Chv *chv ;
MPI_Request req ;
FactorMsg *next ;
} ;
/*--------------------------------------------------------------------*/
static FactorMsg * FactorMsg_new ( void ) ;
static void FactorMsg_setDefaultFields ( FactorMsg *msg ) ;
static void FactorMsg_clearData ( FactorMsg *msg ) ;
static void FactorMsg_free ( FactorMsg *msg ) ;
static FactorMsg * wakeupFront ( FrontMtx *frontmtx, int J, int myid,
int frontOwners[], int firsttag, ChvManager *chvmanager,
ChvList *aggList, int stats[], double cpus[], MPI_Comm comm,
int msglvl, FILE *msgFile ) ;
static FactorMsg * checkMessages ( FrontMtx *frontmtx, int J,
FactorMsg *firstmsg, ChvList *aggList, ChvList *postList,
ChvManager *chvmanager, int firsttag, int stats[], MPI_Comm comm,
int msglvl, FILE *msgFile ) ;
static FactorMsg * sendMessages ( FrontMtx *frontmtx, int J, int myid,
int nfront, int par[], int owners[], int firsttag,
ChvManager*chvmanager, ChvList *aggList, ChvList *postList,
FactorMsg *firstmsgsent, int stats[], MPI_Comm comm,
int msglvl, FILE *msgFile ) ;
static FactorMsg * checkSentMessages ( FactorMsg *firstmsgsent,
ChvManager *chvmanager, int msglvl, FILE *msgFile ) ;
static FactorMsg * sendErrorMessages ( int nfront, int J, int nproc,
int myid, int firsttag, FactorMsg *firstmsgsent, int stats[],
MPI_Comm comm, int msglvl, FILE *msgFile ) ;
static void cancelAndFreeMessages ( FactorMsg *head,
ChvManager *chvmanager, int msglvl, FILE *msgFile ) ;
/*--------------------------------------------------------------------*/
/*
-------------------------------------------------------------------
this is the method that computes the
parallel factorization of a sparse matrix A using MPI.
input ---
frontmtx -- front matrix object that holds the factors
inpmtx -- matrix object for A
tau -- tolerance used in pivoting
droptol -- tolerance used in the approximate factorization
chvmanager -- ChvManager object to handle working storage
frontOwnersIV -- map from fronts to owning processes
lookahead -- parameter that governs the ``lookahead''
behavior, use lookahead = 0 as a default
cpus -- vector to store breakdown of cpu times
cpus[ 0] -- initialize fronts
cpus[ 1] -- load original entries
cpus[ 2] -- update fronts
cpus[ 3] -- insert aggregate data
cpus[ 4] -- assemble aggregate data
cpus[ 5] -- assemble postponed data
cpus[ 6] -- factor fronts
cpus[ 7] -- extract postponed data
cpus[ 8] -- store factor entries
cpus[ 9] -- post initial receives
cpus[10] -- check for received messages
cpus[11] -- post initial sends
cpus[12] -- check for sent messages
stats -- vector to store statistics
stats[ 0] -- # of pivots
stats[ 1] -- # of pivot tests
stats[ 2] -- # of delayed rows and columns
stats[ 3] -- # of entries in D
stats[ 4] -- # of entries in L
stats[ 5] -- # of entries in U
stats[ 6] -- # of aggregate sends
stats[ 7] -- # of bytes in the aggregate sends
stats[ 8] -- # of aggregate received
stats[ 9] -- # of bytes in the aggregate received
stats[10] -- # of postponed data sends
stats[11] -- # of bytes in the postponed data sends
stats[12] -- # of postponed data received
stats[13] -- # of bytes in the postponed data received
stats[14] -- # of active Chv objects (working storage)
stats[15] -- # of active bytes in working storage
stats[16] -- # of requested bytes in working storage
msglvl -- message level
msgFile -- message file
firsttag -- first tag to use during the factorization,
reserved tags are tag, ..., tag + 4*nfront + 2
comm -- MPI communicator
return value -- if process id is zero, a pointer to the first
Chv object in a list that contains postponed rows and columns
that could not be eliminated.
created -- 98may21, cca
-------------------------------------------------------------------
*/
Chv *
FrontMtx_MPI_factorInpMtx (
FrontMtx *frontmtx,
InpMtx *inpmtx,
double tau,
double droptol,
ChvManager *chvmanager,
IV *frontOwnersIV,
int lookahead,
int *perror,
double cpus[],
int stats[],
int msglvl,
FILE *msgFile,
int firsttag,
MPI_Comm comm
) {
Chv *rootchv ;
double zero[2] = {0.0, 0.0} ;
int lasttag, tagbound ;
Pencil pencil ;
lasttag = 3*frontmtx->nfront + 2 ;
tagbound = maxTagMPI(comm) ;
if ( firsttag < 0 || lasttag > tagbound ) {
fprintf(stderr, "\n fatal error in FrontMtx_MPI_factorInpMtx()"
"\n tag range is [%d,%d], tag_bound = %d",
firsttag, lasttag, tagbound) ;
exit(-1) ;
}
Pencil_setDefaultFields(&pencil) ;
Pencil_init(&pencil, frontmtx->type, frontmtx->symmetryflag,
inpmtx, zero, NULL) ;
rootchv = FrontMtx_MPI_factorPencil(frontmtx, &pencil, tau, droptol,
chvmanager, frontOwnersIV, lookahead, perror, cpus, stats,
msglvl, msgFile, firsttag, comm) ;
return(rootchv) ; }
/*--------------------------------------------------------------------*/
/*
-------------------------------------------------------------------
this is the method that computes the
parallel factorization of A + sigma*B using MPI.
input ---
frontmtx -- front matrix object that holds the factors
pencil -- matrix pencil object, contains A + sigma*B
tau -- tolerance used in pivoting
droptol -- tolerance used in the approximate factorization
chvmanager -- ChvManager object to handle working storage
frontOwnersIV -- map from fronts to owning processes
lookahead -- parameter that governs the ``lookahead''
behavior, use lookahead = 0 as a default
cpus -- vector to store breakdown of cpu times
cpus[ 0] -- initialize fronts
cpus[ 1] -- load original entries
cpus[ 2] -- update fronts
cpus[ 3] -- insert aggregate data
cpus[ 4] -- assemble aggregate data
cpus[ 5] -- assemble postponed data
cpus[ 6] -- factor fronts
cpus[ 7] -- extract postponed data
cpus[ 8] -- store factor entries
cpus[ 9] -- post initial receives
cpus[10] -- check for received messages
cpus[11] -- post initial sends
cpus[12] -- check for sent messages
stats -- vector to store statistics
stats[ 0] -- # of pivots
stats[ 1] -- # of pivot tests
stats[ 2] -- # of delayed rows and columns
stats[ 3] -- # of entries in D
stats[ 4] -- # of entries in L
stats[ 5] -- # of entries in U
stats[ 6] -- # of aggregate sends
stats[ 7] -- # of bytes in the aggregate sends
stats[ 8] -- # of aggregate received
stats[ 9] -- # of bytes in the aggregate received
stats[10] -- # of postponed data sends
stats[11] -- # of bytes in the postponed data sends
stats[12] -- # of postponed data received
stats[13] -- # of bytes in the postponed data received
stats[14] -- # of active Chv objects (working storage)
stats[15] -- # of active bytes in working storage
stats[16] -- # of requested bytes in working storage
msglvl -- message level
msgFile -- message file
firsttag -- first tag to use during the factorization,
reserved tags are tag, ..., tag + 4*nfront + 2
comm -- MPI communicator
return value -- if process id is zero, a pointer to the first
Chv object in a list that contains postponed rows and columns
that could not be eliminated.
created -- 98may21, cca
-------------------------------------------------------------------
*/
Chv *
FrontMtx_MPI_factorPencil (
FrontMtx *frontmtx,
Pencil *pencil,
double tau,
double droptol,
ChvManager *chvmanager,
IV *frontOwnersIV,
int lookahead,
int *perror,
double cpus[],
int stats[],
int msglvl,
FILE *msgFile,
int firsttag,
MPI_Comm comm
) {
char *status, *recvsPosted ;
Chv *rootchv ;
Chv **fronts ;
ChvList *aggList, *postList ;
double t1, t2 ;
DV workDV ;
FactorMsg *firstmsgsent, *head, *msg ;
FactorMsg **p_msg ;
Ideq *dequeue ;
int flag, iproc, J, K, lasttag, myid, nfront, nproc, tagbound ;
int *nactiveChild, *owners, *par, *term_flags ;
int commstats[4] ;
IP **heads ;
IV pivotsizesIV ;
MPI_Status mpistatus ;
/*
--------------
check the data
--------------
*/
if ( frontmtx == NULL || pencil == NULL || tau < 1.0 || droptol < 0.0
|| chvmanager == NULL || frontOwnersIV == NULL || cpus == NULL
|| stats == NULL || (msglvl > 0 && msgFile == NULL) ) {
fprintf(stderr, "\n fatal error in FrontMtx_MPI_factorPencil()"
"\n frontmtx = %p, pencil = %p, tau = %f, droptol = %f"
"\n chvmanager = %p, frontOwnersIV = %p, firsttag = %d"
"\n cpus = %p, stats = %p, msglvl = %d, msgFile = %p"
"\n bad input\n",
frontmtx, pencil, tau, droptol, chvmanager, frontOwnersIV,
firsttag, cpus, stats, msglvl, msgFile) ;
exit(-1) ;
}
nfront = frontmtx->nfront ;
lasttag = 3*nfront + 2 ;
tagbound = maxTagMPI(comm) ;
if ( firsttag < 0 || lasttag > tagbound ) {
fprintf(stderr, "\n fatal error in FrontMtx_MPI_factorPencil()"
"\n tag range is [%d,%d], tag_bound = %d",
firsttag, lasttag, tagbound) ;
exit(-1) ;
}
MPI_Comm_rank(comm, &myid) ;
MPI_Comm_size(comm, &nproc) ;
par = frontmtx->tree->par ;
owners = IV_entries(frontOwnersIV) ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n chvmanager = %p, firsttag = %d",
chvmanager, firsttag) ;
fflush(msgFile) ;
}
if ( FRONTMTX_IS_PIVOTING(frontmtx) ) {
/*
--------------------------------------------
create a ChvList object to handle the
postponed data. it does not require a lock.
--------------------------------------------
*/
postList = FrontMtx_postList(frontmtx, frontOwnersIV, NO_LOCK) ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n postList = %p", postList) ;
ChvList_writeForHumanEye(postList, msgFile) ;
fflush(msgFile) ;
}
} else {
postList = NULL ;
}
/*
------------------------------------------------------------
create a ChvList object to handle aggregate fronts. no lock
is necessary. this object must be created cooperatively,
since the aggregate counts are a function of the owners map
and the symbolic factorization, and the latter is not global
------------------------------------------------------------
*/
IVzero(4, commstats) ;
aggList = FrontMtx_MPI_aggregateList(frontmtx, frontOwnersIV,
commstats, msglvl, msgFile, firsttag, comm) ;
firsttag += 2 ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n aggList = %p", aggList) ;
ChvList_writeForHumanEye(aggList, msgFile) ;
fflush(msgFile) ;
}
/*
---------------------------------------------------
initialize the heads[] vector for the owned updates
---------------------------------------------------
*/
heads = FrontMtx_factorSetup(frontmtx, frontOwnersIV, myid,
msglvl, msgFile) ;
/*
----------------------------------------------------------------
initialize the Ideq object that holds the initial fronts
of the active paths, owned fronts with no children that
are owned or updates by this thread.
status[J] == 'W' --> J belongs to an active path for this thread
----------------------------------------------------------------
*/
status = CVinit(nfront, 'F') ;
dequeue = FrontMtx_setUpDequeue(frontmtx, owners, myid, status, heads,
'W', 'F', msglvl, msgFile) ;
FrontMtx_loadActiveLeaves(frontmtx, status, 'W', dequeue) ;
if ( msglvl > 2 ) {
fprintf(msgFile, "\n\n status") ;
CVfprintf(msgFile, nfront, status) ;
fprintf(msgFile, "\n fronts in initial dequeue") ;
Ideq_writeForHumanEye(dequeue, msgFile) ;
fflush(msgFile) ;
}
/*
-----------------------------------------------
initialize the nactiveChild[] vector,
nactiveChild[J] measures the number of children
that belong to active paths of this thread
-----------------------------------------------
*/
nactiveChild = FrontMtx_nactiveChild(frontmtx, status, myid) ;
if ( msglvl > 2 ) {
fprintf(msgFile, "\n\n nactiveChild") ;
IVfprintf(msgFile, nfront, nactiveChild) ;
fflush(msgFile) ;
}
/*
------------------------------
initialize the working storage
------------------------------
*/
ALLOCATE(fronts, Chv *, nfront) ;
for ( J = 0 ; J < nfront ; J++ ) {
fronts[J] = NULL ;
}
DV_setDefaultFields(&workDV) ;
IV_setDefaultFields(&pivotsizesIV) ;
ALLOCATE(p_msg, struct _FactorMsg *, nfront+1) ;
for ( J = 0 ; J <= nfront ; J++ ) {
p_msg[J] = NULL ;
}
recvsPosted = CVinit(nfront, 'N') ;
/*
---------------------------
loop while a path is active
---------------------------
*/
*perror = -1 ;
firstmsgsent = NULL ;
while ( (J = Ideq_removeFromHead(dequeue)) != -1 ) {
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n ### checking out front %d", J) ;
fprintf(msgFile, "\n remaining fronts in dequeue") ;
Ideq_writeForHumanEye(dequeue, msgFile) ;
fflush(msgFile) ;
}
if ( recvsPosted[J] == 'N' && owners[J] == myid ) {
MARKTIME(t1) ;
/*
-----------------------------------
post any receives for owned front J
-----------------------------------
*/
p_msg[J] = wakeupFront(frontmtx, J, myid, owners, firsttag,
chvmanager, aggList, stats, cpus,
comm, msglvl, msgFile) ;
recvsPosted[J] = 'Y' ;
MARKTIME(t2) ;
cpus[10] += t2 - t1 ;
}
if ( p_msg[J] != NULL ) {
/*
---------------------------------------
check for received messages for front J
---------------------------------------
*/
MARKTIME(t1) ;
p_msg[J] = checkMessages(frontmtx, J, p_msg[J], aggList,
postList, chvmanager, firsttag,
stats, comm, msglvl, msgFile) ;
MARKTIME(t2) ;
cpus[11] += t2 - t1 ;
}
/*
----------------------------------
check for numeric work for front J
----------------------------------
*/
FrontMtx_factorVisit(frontmtx, pencil, J, myid, owners, fronts,
lookahead, tau, droptol, status, heads,
&pivotsizesIV, &workDV, par, aggList, postList,
chvmanager, stats, cpus, msglvl, msgFile) ;
if ( status[J] == 'E' ) {
/*
---------------------------------------
error detected while processing front J
---------------------------------------
*/
*perror = J ;
firstmsgsent = sendErrorMessages(nfront, J, nproc, myid, firsttag,
firstmsgsent, stats, comm, msglvl, msgFile) ;
break ;
} else if ( status[J] == 'F' ) {
#if MYDIAGNOSTIC > 0
{
double t1 ;
MARKTIME(t1) ;
fprintf(stdout,
"\n proc %d, time %8.3f : front %d is finished",
myid, t1, J) ;
fflush(stdout) ;
fprintf(msgFile,
"\n proc %d, time %8.3f : front %d is finished",
myid, t1, J) ;
fflush(msgFile) ;
}
#endif
if ( msglvl > 1 ) {
fprintf(msgFile, "\n front %d is finished", J) ;
fflush(msgFile) ;
}
if ( (K = par[J]) != -1 && --nactiveChild[K] == 0 ) {
/*
------------------------
load parent into dequeue
------------------------
*/
Ideq_insertAtHead(dequeue, K) ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n 2. adding front %d to dequeue", K) ;
fprintf(msgFile, "\n fronts in dequeue") ;
Ideq_writeForHumanEye(dequeue, msgFile) ;
fflush(msgFile) ;
}
}
/*
------------------------------------
send any messages to other processes
------------------------------------
*/
MARKTIME(t1) ;
firstmsgsent = sendMessages(frontmtx, J, myid, nfront, par,
owners, firsttag, chvmanager, aggList,
postList, firstmsgsent, stats,
comm, msglvl, msgFile) ;
MARKTIME(t2) ;
cpus[12] += t2 - t1 ;
} else {
/*
---------------------------------------------
front is still active, place on tail of queue
---------------------------------------------
*/
Ideq_insertAtTail(dequeue, J) ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n front %d is not finished", J) ;
fprintf(msgFile, "\n fronts in dequeue") ;
Ideq_writeForHumanEye(dequeue, msgFile) ;
fflush(msgFile) ;
}
}
if ( firstmsgsent != NULL ) {
/*
------------------------------------------------------
check the list of sent messages to recycle Chv objects
------------------------------------------------------
*/
MARKTIME(t1) ;
firstmsgsent = checkSentMessages(firstmsgsent, chvmanager,
msglvl, msgFile) ;
MARKTIME(t2) ;
cpus[13] += t2 - t1 ;
}
/*
--------------------------------------
probe for an error termination message
--------------------------------------
*/
MPI_Iprobe(MPI_ANY_SOURCE, ERROR_TAG, comm, &flag, &mpistatus) ;
if ( flag == 1 ) {
/*
---------------------------------------------------------
an error message has been received, break out of the loop
---------------------------------------------------------
*/
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n ERROR MESSAGE RECEIVED FROM %d",
mpistatus.MPI_SOURCE) ;
fflush(msgFile) ;
}
break ;
}
}
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n done with main loop") ;
fflush(msgFile) ;
}
/*
--------------------------------------------
now do an allgather on the termination flags
--------------------------------------------
*/
term_flags = IVinit(nproc, -1) ;
MPI_Allgather((void *) perror, 1, MPI_INT,
(void *) term_flags, 1, MPI_INT, comm) ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n allgather on error code complete") ;
IVfprintf(msgFile, nproc, term_flags) ;
fflush(msgFile) ;
}
if ( *perror < 0 ) {
for ( iproc = 0 ; iproc < nproc ; iproc++ ) {
if ( term_flags[iproc] >= 0 ) {
*perror = term_flags[iproc] ;
break ;
}
}
}
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n error code = %d", *perror) ;
fflush(msgFile) ;
}
rootchv = NULL ;
if ( *perror >= 0 ) {
/*
----------------------------------------
error termination has occured,
link all messages together into one list
cancel all pending messages
----------------------------------------
*/
head = firstmsgsent ;
for ( J = 0 ; J < nfront ; J++ ) {
if ( (msg = p_msg[J]) != NULL ) {
while ( msg->next != NULL ) {
msg = msg->next ;
}
msg->next = head ;
head = p_msg[J] ;
}
}
cancelAndFreeMessages(head, chvmanager, msglvl, msgFile) ;
} else {
/*
-------------------------------------------
normal termination, wait until all messages
have been received and free their storage
-------------------------------------------
*/
while ( firstmsgsent != NULL ) {
/*
-------------------------------
check the list of sent messages
-------------------------------
*/
MARKTIME(t1) ;
firstmsgsent = checkSentMessages(firstmsgsent, chvmanager,
msglvl, msgFile) ;
MARKTIME(t2) ;
cpus[13] += t2 - t1 ;
}
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n all sent messages have been free'd") ;
fflush(msgFile) ;
}
if ( postList != NULL ) {
/*
-----------------------------------------
set the root chevron of the factorization
-----------------------------------------
*/
rootchv = ChvList_getList(postList, nfront) ;
}
}
/*
--------------------
load some statistics
--------------------
*/
stats[3] = frontmtx->nentD ;
stats[4] = frontmtx->nentL ;
stats[5] = frontmtx->nentU ;
stats[14] = chvmanager->nactive ;
stats[15] = chvmanager->nbytesactive ;
stats[16] = chvmanager->nbytesrequested ;
/*
------------------------
free the working storage
------------------------
*/
if ( postList != NULL ) {
ChvList_free(postList) ;
}
ChvList_free(aggList) ;
CVfree(status) ;
IVfree(nactiveChild) ;
IP_free(heads[nfront+1]) ;
FREE(heads) ;
FREE(fronts) ;
Ideq_free(dequeue) ;
DV_clearData(&workDV) ;
IV_clearData(&pivotsizesIV) ;
CVfree(recvsPosted) ;
FREE(p_msg) ;
IVfree(term_flags) ;
return(rootchv) ; }
/*--------------------------------------------------------------------*/
/*
-------------------------
initialize a new instance
created -- 97nov13, cca
-------------------------
*/
static FactorMsg *
FactorMsg_new (
void
) {
FactorMsg *msg ;
ALLOCATE(msg, struct _FactorMsg, 1) ;
FactorMsg_setDefaultFields(msg) ;
return(msg) ; }
/*--------------------------------------------------------------------*/
/*
-----------------------
set the default fields
created -- 97nov13, cca
-----------------------
*/
static void
FactorMsg_setDefaultFields (
FactorMsg *msg
) {
msg->info[0] = 0 ;
msg->info[1] = 0 ;
msg->info[2] = 0 ;
msg->base = NULL ;
msg->chv = NULL ;
msg->req = NULL ;
msg->next = NULL ;
return ; }
/*--------------------------------------------------------------------*/
/*
-----------------------
clear the data
created -- 97nov13, cca
-----------------------
*/
static void
FactorMsg_clearData (
FactorMsg *msg
) {
FactorMsg_setDefaultFields(msg) ;
return ; }
/*--------------------------------------------------------------------*/
/*
-----------------------
free the object
created -- 97nov13, cca
-----------------------
*/
static void
FactorMsg_free (
FactorMsg *msg
) {
FactorMsg_clearData(msg) ;
FREE(msg) ;
return ; }
/*--------------------------------------------------------------------*/
/*
-----------------------------------
when a front is initially active,
post its necessary receive messages
created -- 97nov13, cca
-----------------------------------
*/
static FactorMsg *
wakeupFront (
FrontMtx *frontmtx,
int J,
int myid,
int frontOwners[],
int firsttag,
ChvManager *chvmanager,
ChvList *aggList,
int stats[],
double cpus[],
MPI_Comm comm,
int msglvl,
FILE *msgFile
) {
Chv *chv ;
FactorMsg *first, *msg ;
int aggcount, ii, I, nbytes, nD, nfront, nL, nU, tag ;
int *fch, *sib ;
/*
---------------
check the input
---------------
*/
if ( frontmtx == NULL || J < 0 || J > (nfront = frontmtx->nfront)
|| chvmanager == NULL || aggList == NULL
|| (msglvl > 1 && msgFile == NULL) ) {
fprintf(stderr, "\n fatal error in wakeupFront()"
"\n bad input\n") ;
exit(-1) ;
}
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n inside wakeupFront, J = %d", J) ;
fflush(msgFile) ;
}
first = NULL ;
aggcount = aggList->counts[J] ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n aggcount for %d = %d", J, aggcount) ;
fflush(msgFile) ;
}
if ( aggcount > 0 ) {
/*
-----------------------------------------------
post receives for the incoming aggregate fronts
----------------------------------------------
*/
tag = AGGREGATE_CHV_TAG(J) ;
for ( ii = 0 ; ii < aggcount ; ii++ ) {
chv = FrontMtx_setupFront(frontmtx, NULL, J, myid, frontOwners,
chvmanager, cpus, msglvl, msgFile) ;
msg = FactorMsg_new() ;
msg->info[0] = AGGREGATE_CHV ;
msg->info[1] = J ;
msg->info[2] = nbytes = Chv_nbytesInWorkspace(chv) ;
msg->chv = chv ;
msg->base = (void *) Chv_workspace(chv) ;
msg->next = first ;
first = msg ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n 1. posting Irecv, msg %p, type = %d, J = %d, tag = %d",
msg, AGGREGATE_CHV, J, tag) ;
fflush(msgFile) ;
}
MPI_Irecv(msg->base, nbytes, MPI_BYTE, MPI_ANY_SOURCE,
tag, comm, &msg->req) ;
FrontMtx_initialFrontDimensions(frontmtx, J,
&nD, &nL, &nU, &nbytes) ;
stats[8]++ ;
stats[9] += nbytes ;
if ( msglvl > 1 ) {
fprintf(msgFile, ", return") ;
fflush(msgFile) ;
}
}
}
if ( FRONTMTX_IS_PIVOTING(frontmtx) && frontOwners[J] == myid ) {
/*
---------------------------------------------
post receives for child notification messages
---------------------------------------------
*/
fch = frontmtx->tree->fch ;
if ( fch[J] != -1 ) {
sib = frontmtx->tree->sib ;
for ( I = fch[J] ; I != -1 ; I = sib[I] ) {
if ( frontOwners[I] != myid ) {
msg = FactorMsg_new() ;
msg->info[0] = POSTPONED_NOTIFICATION ;
msg->info[1] = I ;
msg->info[2] = 0 ;
msg->next = first ;
first = msg ;
stats[12]++ ;
stats[13] += 3*sizeof(int) ;
tag = POSTPONED_NOTIFICATION_TAG(I) ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n 2. posting Irecv, msg %p, type = %d, I = %d, tag = %d",
msg, POSTPONED_NOTIFICATION, I, tag) ;
fflush(msgFile) ;
}
MPI_Irecv((void *) msg->info, 3, MPI_INT, frontOwners[I],
tag, comm, &msg->req) ;
if ( msglvl > 1 ) {
fprintf(msgFile, ", return") ;
fflush(msgFile) ;
}
}
}
}
}
return(first) ; }
/*--------------------------------------------------------------------*/
/*
--------------------------------------------
check for completion of messages for front J
created -- 97nov13, cca
--------------------------------------------
*/
static FactorMsg *
checkMessages (
FrontMtx *frontmtx,
int J,
FactorMsg *firstmsg,
ChvList *aggList,
ChvList *postList,
ChvManager *chvmanager,
int firsttag,
int stats[],
MPI_Comm comm,
int msglvl,
FILE *msgFile
) {
FactorMsg *msg, *nextmsg ;
int flag, nbytes, nfront, source, tag, type ;
MPI_Status status ;
/*
----------------------------------
loop over the messages in the list
----------------------------------
*/
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n inside checkMessages, J = %d, firstmsg = %p",
J, firstmsg) ;
fflush(msgFile) ;
}
nfront = frontmtx->nfront ;
for ( msg = firstmsg, firstmsg = NULL ;
msg != NULL ;
msg = nextmsg ) {
/*
---------------------------
set next message to look at
---------------------------
*/
nextmsg = msg->next ;
msg->next = NULL ;
/*
--------------------------------------
extract the type, front id and # bytes
--------------------------------------
*/
type = msg->info[0] ;
nbytes = msg->info[2] ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n checking msg %p : type %d, front %d, nbytes %d",
msg, type, msg->info[1], nbytes) ;
fflush(msgFile) ;
}
MPI_Test(&msg->req, &flag, &status) ;
if ( msglvl > 1 ) {
fprintf(msgFile, ", flag = %d", flag) ;
fflush(msgFile) ;
}
if ( flag != 1 ) {
/*
-----------------------------------------------
message has not yet been received, keep in list
-----------------------------------------------
*/
msg->next = firstmsg ;
firstmsg = msg ;
} else {
/*
-------------------------
message has been received
-------------------------
*/
source = status.MPI_SOURCE ;
tag = status.MPI_TAG ;
nbytes = msg->info[2] ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n message received: source %d, tag %d, nbytes %d",
source, tag, nbytes) ;
fflush(msgFile) ;
}
switch ( type ) {
case AGGREGATE_CHV :
/*
-----------------
aggregate chevron
-----------------
*/
Chv_initFromBuffer(msg->chv) ;
if ( msglvl > 2 ) {
fprintf(msgFile, "\n received aggregate front %d", J) ;
Chv_writeForHumanEye(msg->chv, msgFile) ;
fflush(msgFile) ;
}
if ( msglvl > 1 ) {
fprintf(msgFile, "\n adding chv to aggregate list"
"\n before count is %d", aggList->counts[J]) ;
fflush(msgFile) ;
}
ChvList_addObjectToList(aggList, msg->chv, J) ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n count is now %d", aggList->counts[J]) ;
fflush(msgFile) ;
}
FactorMsg_free(msg) ;
break ;
case POSTPONED_NOTIFICATION :
if ( nbytes > 0 ) {
/*
--------------------------------------------------
post receive for message to contain postponed data
--------------------------------------------------
*/
msg->info[0] = POSTPONED_CHV ;
msg->chv = ChvManager_newObjectOfSizeNbytes(chvmanager,
nbytes) ;
msg->base = (void *) Chv_workspace(msg->chv) ;
msg->next = nextmsg ;
nextmsg = msg ;
tag += nfront + 1 ;
stats[12]++ ;
stats[13] += nbytes ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n 2. posting Irecv, msg %p, type = %d, I = %d, tag = %d",
msg, POSTPONED_NOTIFICATION, msg->info[1], tag) ;
fflush(msgFile) ;
}
MPI_Irecv(msg->base, nbytes, MPI_BYTE,
source, tag, comm, &msg->req) ;
if ( msglvl > 1 ) {
fprintf(msgFile, ", return") ;
fflush(msgFile) ;
}
} else {
/*
------------------------
postponed data is empty
------------------------
*/
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n adding NULL object to postponed list"
"\n before, count is %d", postList->counts[J]) ;
fflush(msgFile) ;
}
ChvList_addObjectToList(postList, NULL, J) ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n count is now %d", postList->counts[J]) ;
fflush(msgFile) ;
}
FactorMsg_free(msg) ;
}
break ;
case POSTPONED_CHV :
/*
------------------------------
put postponed data on its list
------------------------------
*/
Chv_initFromBuffer(msg->chv) ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n received postponed data %d", J) ;
Chv_writeForHumanEye(msg->chv, msgFile) ;
fflush(msgFile) ;
}
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n before, count is %d", postList->counts[J]) ;
fflush(msgFile) ;
}
ChvList_addObjectToList(postList, msg->chv, J) ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n count is now %d", postList->counts[J]) ;
fflush(msgFile) ;
}
FactorMsg_free(msg) ;
break ;
default :
break ;
}
}
}
return(firstmsg) ; }
/*--------------------------------------------------------------------*/
/*
------------------------------------------------------------------
after front J has completed, remove relevant data from the
aggregage and/or postponed lists and send to the owning processor.
created -- 97jul10, cca
------------------------------------------------------------------
*/
static FactorMsg *
sendMessages (
FrontMtx *frontmtx,
int J,
int myid,
int nfront,
int par[],
int owners[],
int firsttag,
ChvManager *chvmanager,
ChvList *aggList,
ChvList *postList,
FactorMsg *firstmsgsent,
int stats[],
MPI_Comm comm,
int msglvl,
FILE *msgFile
) {
Chv *chv ;
FactorMsg *msg ;
int destination, K, nbytes, tag ;
destination = owners[J] ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n\n inside sendMessages, J = %d, firsttag = %d, destination %d",
J, firsttag, destination) ;
fflush(msgFile) ;
}
if ( destination != myid ) {
/*
------------------------------------------
this process does not own the front, so
an aggregate front will have been created.
------------------------------------------
*/
chv = ChvList_getList(aggList, J) ;
if ( chv != NULL ) {
/*
------------------------------------
an aggregate front has been created,
send to the owning processor
------------------------------------
*/
nbytes = Chv_nbytesNeeded(chv->nD, chv->nL, chv->nU,
frontmtx->type, chv->symflag) ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n proc %d does not own front %d, nbytes = %d",
myid, J, nbytes) ;
fflush(msgFile) ;
}
msg = FactorMsg_new() ;
msg->info[0] = AGGREGATE_CHV ;
msg->info[1] = J ;
msg->info[2] = nbytes ;
msg->chv = chv ;
msg->base = (void *) Chv_workspace(chv) ;
tag = AGGREGATE_CHV_TAG(J) ;
msg->next = firstmsgsent ;
firstmsgsent = msg ;
stats[6]++ ;
stats[7] += nbytes ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n posting Isend, msg %p, type %d, J %d, nbytes %d, dest %d, tag %d",
msg, msg->info[0], msg->info[1], msg->info[2], destination, tag) ;
fprintf(msgFile,
"\n wrk %p, nD %d, nL %d, nU %d, symflag %d",
Chv_workspace(chv), chv->nD, chv->nL,
chv->nU, chv->symflag) ;
fflush(msgFile) ;
}
if ( msglvl > 2 ) {
fprintf(msgFile, "\n aggregate front %d, chv %p", J, chv) ;
if ( chv != NULL ) {
Chv_writeForHumanEye(msg->chv, msgFile) ;
}
fflush(msgFile) ;
}
MPI_Isend(msg->base, nbytes, MPI_BYTE, destination, tag,
comm, &msg->req) ;
}
} else if ( postList != NULL ) {
if ( msglvl > 1 ) {
fprintf(msgFile, "\n checking for postponed messages") ;
fflush(msgFile) ;
}
/*
----------------------------------------------
pivoting enabled, look for a postponed chevron
----------------------------------------------
*/
K = par[J] ;
if ( msglvl > 1 ) {
fprintf(msgFile, "\n J = %d, K = %d", J, K) ;
fflush(msgFile) ;
}
if ( K != -1 && (destination = owners[K]) != myid ) {
if ( msglvl > 1 ) {
fprintf(msgFile, ", destination = %d", destination) ;
fflush(msgFile) ;
}
/*
---------------------------------------
owner of the parent is not this process
---------------------------------------
*/
if ( ChvList_isListNonempty(postList, K) == 1 ) {
/*
----------------------------------------
there is a postponed chevron on the list
----------------------------------------
*/
chv = ChvList_getList(postList, K) ;
nbytes = Chv_nbytesNeeded(chv->nD, chv->nL, chv->nU,
frontmtx->type, chv->symflag) ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n nD = %d, nL = %d, nU = %d, nbytes = %d",
chv->nD, chv->nL, chv->nU, nbytes) ;
fflush(msgFile) ;
}
} else {
chv = NULL ;
nbytes = 0 ;
}
/*
----------------------------------------
owner of the parent is not this process,
send a notification message
----------------------------------------
*/
msg = FactorMsg_new() ;
msg->info[0] = POSTPONED_NOTIFICATION ;
msg->info[1] = J ;
msg->info[2] = nbytes ;
tag = POSTPONED_NOTIFICATION_TAG(J) ;
msg->next = firstmsgsent ;
firstmsgsent = msg ;
stats[10]++ ;
stats[11] += 3*sizeof(int) ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n posting Isend, msg %p, type %d, J %d, nbytes %d, dest %d, tag %d",
msg, msg->info[0], msg->info[1], msg->info[2], destination, tag) ;
fflush(msgFile) ;
}
MPI_Isend((void *) msg->info, 3, MPI_INT, destination, tag,
comm, &msg->req) ;
if ( nbytes > 0 ) {
/*
----------------------------------------------------------
there is postponed data, send the data in a second message
----------------------------------------------------------
*/
msg = FactorMsg_new() ;
msg->info[0] = POSTPONED_CHV ;
msg->info[1] = J ;
msg->info[2] = nbytes ;
msg->chv = chv ;
msg->base = (void *) Chv_workspace(chv) ;
tag = POSTPONED_CHV_TAG(J) ;
msg->next = firstmsgsent ;
firstmsgsent = msg ;
stats[10]++ ;
stats[11] += nbytes ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n posting Isend, msg %p, type %d, J %d, nbytes %d, dest %d, tag %d",
msg, msg->info[0], msg->info[1], msg->info[2], destination, tag) ;
fprintf(msgFile,
"\n wrk %p, nD %d, nL %d, nU %d, symflag %d",
Chv_workspace(chv), chv->nD, chv->nL,
chv->nU, chv->symflag) ;
fflush(msgFile) ;
}
MPI_Isend(msg->base, nbytes, MPI_BYTE, destination,
tag, comm, &msg->req) ;
}
}
}
return(firstmsgsent) ; }
/*--------------------------------------------------------------------*/
/*
-------------------------------------------------------
check the list of posted sends.
if a send has completed, free the message structure
and return the Chv object (if present) to its manager.
created -- 97nov13, cca
-------------------------------------------------------
*/
static FactorMsg *
checkSentMessages (
FactorMsg *firstmsgsent,
ChvManager *chvmanager,
int msglvl,
FILE *msgFile
) {
FactorMsg *msg, *nextmsg ;
int flag, J, nbytes, type ;
MPI_Status status ;
for ( msg = firstmsgsent, firstmsgsent = NULL ;
msg != NULL ;
msg = nextmsg ) {
nextmsg = msg->next ;
type = msg->info[0] ;
J = msg->info[1] ;
nbytes = msg->info[2] ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n checking sent msg %p : type %d, front %d, nbytes %d",
msg, type, J, nbytes) ;
fflush(msgFile) ;
}
MPI_Test(&msg->req, &flag, &status) ;
if ( msglvl > 1 ) {
fprintf(msgFile, ", flag = %d", flag) ;
fflush(msgFile) ;
}
if ( flag == 1 ) {
if ( msg->chv != NULL ) {
if ( msglvl > 1 ) {
fprintf(msgFile, "\n chv %p released", msg->chv) ;
fflush(msgFile) ;
}
ChvManager_releaseObject(chvmanager, msg->chv) ;
}
FactorMsg_free(msg) ;
} else {
msg->next = firstmsgsent ;
firstmsgsent = msg ;
}
}
return(firstmsgsent) ; }
/*--------------------------------------------------------------------*/
/*
-------------------------------------------------
purpose -- an error has been detected at front J,
send a message to all other processors
created -- 98aug29, cca
-------------------------------------------------
*/
static FactorMsg *
sendErrorMessages (
int nfront,
int J,
int nproc,
int myid,
int firsttag,
FactorMsg *firstmsgsent,
int stats[],
MPI_Comm comm,
int msglvl,
FILE *msgFile
) {
int iproc, tag ;
FactorMsg *msg ;
for ( iproc = 0 ; iproc < nproc ; iproc++ ) {
if ( iproc != myid ) {
msg = FactorMsg_new() ;
msg->info[0] = tag = ERROR_TAG ;
msg->info[1] = J ;
msg->info[2] = 0 ;
msg->chv = NULL ;
msg->base = (void *) msg->info ;
msg->next = firstmsgsent ;
firstmsgsent = msg ;
if ( msglvl > 1 ) {
fprintf(msgFile,
"\n posting error Isend, info = {%d, %d, %d}, dest %d, tag %d",
msg->info[0], msg->info[1], msg->info[2], iproc, tag) ;
fflush(msgFile) ;
}
MPI_Isend(msg->base, 2, MPI_INT, iproc, tag, comm, &msg->req) ;
}
}
return(firstmsgsent) ; }
/*--------------------------------------------------------------------*/
/*
-------------------------------------------
purpose -- when an error has been detected,
cancel all outstanding sends and
receives and free their storage
created -- 98aug29, cca
-------------------------------------------
*/
static void
cancelAndFreeMessages (
FactorMsg *head,
ChvManager *chvmanager,
int msglvl,
FILE *msgFile
) {
FactorMsg *msg, *nextmsg ;
int flag ;
MPI_Status status ;
/*
------------------------------------------------
cancel all the messages and test for termination
------------------------------------------------
*/
if ( msglvl > 1 ) {
fprintf(msgFile, "\n\n inside cancelAndFreeMessages()") ;
fflush(msgFile) ;
}
for ( msg = head, head = NULL ; msg != NULL ; msg = nextmsg ) {
if ( msglvl > 1 ) {
fprintf(msgFile, "\n msg %p, info = {%d,%d,%d}",
msg, msg->info[0], msg->info[1], msg->info[2]) ;
fflush(msgFile) ;
}
nextmsg = msg->next ;
MPI_Cancel(&msg->req) ;
if ( msglvl > 1 ) {
fprintf(msgFile, ", canceled") ;
fflush(msgFile) ;
}
MPI_Test(&msg->req, &flag, &status) ;
if ( msglvl > 1 ) {
fprintf(msgFile, ", tested, flag = %d", flag) ;
fflush(msgFile) ;
}
if ( flag == 1 ) {
/*
----------------------
message has terminated
----------------------
*/
if ( msg->chv != NULL ) {
if ( msglvl > 1 ) {
fprintf(msgFile, ", releasing chv %p", msg->chv) ;
fflush(msgFile) ;
}
ChvManager_releaseObject(chvmanager, msg->chv) ;
}
FactorMsg_free(msg) ;
} else {
msg->next = head ;
head = msg ;
}
}
/*
--------------------------------------------
loop until all messages have been terminated
--------------------------------------------
*/
while ( head != NULL ) {
for ( msg = head, head = NULL ; msg != NULL ; msg = nextmsg ) {
if ( msglvl > 1 ) {
fprintf(msgFile, "\n msg %p, info = {%d,%d,%d}",
msg, msg->info[0], msg->info[1], msg->info[2]) ;
fflush(msgFile) ;
}
nextmsg = msg->next ;
MPI_Test(&msg->req, &flag, &status) ;
if ( msglvl > 1 ) {
fprintf(msgFile, ", tested, flag = %d", flag) ;
fflush(msgFile) ;
}
if ( flag == 1 ) {
/*
----------------------
message has terminated
----------------------
*/
if ( msg->chv != NULL ) {
if ( msglvl > 1 ) {
fprintf(msgFile, ", releasing chv %p", msg->chv) ;
fflush(msgFile) ;
}
ChvManager_releaseObject(chvmanager, msg->chv) ;
}
FactorMsg_free(msg) ;
} else {
msg->next = head ;
head = msg ;
}
}
}
if ( msglvl > 1 ) {
fprintf(msgFile, "\n leaving cancelAndFreeMessages()") ;
fflush(msgFile) ;
}
return ; }
/*--------------------------------------------------------------------*/
syntax highlighted by Code2HTML, v. 0.9.1