#include "BSprivate.h"
/*@ BSoffset - Find a consistent global numbering
Input Parameters:
. length - number of classes of things to order
. array - the number of things in each class on this processor
. procinfo - the usual processor stuff
Output Parameters:
. offsets - the beginning number for each class on this processor
in the global numbering
Returns:
the number of classes across all the processors
Notes:
This routine, given a list of totals, finds where we should be
numbered. For example, if proc 0 submits a list of 10 and 20
and proc 1 submits a list of 5 and 11 and proc 2 submits a
list of 6 and 9, we want to do a tree ordering where
proc 0 gets back 5 and 32, proc 1 gets back 0 and 21 and
proc 2 gets back 15 and 52.
@*/
int BSoffset(int length, int *array, int **offsets, BSprocinfo *procinfo)
{
int global_len, work;
int parent, l_child, r_child;
int *t_offsets;
int *recv_buf;
int i;
int num_msg, num_children;
int *l_buffer, *r_buffer;
int count;
BSmsg_list *msg_list = NULL;
MPI_Status mpi_status;
/* first, agree on global length */
global_len = length;
GIMAX(&global_len,1,&work,procinfo->procset); CHKERRN(0);
/* allocate and initialize offsets buffer */
MY_MALLOCN(t_offsets,(int *),sizeof(int)*global_len,1);
(*offsets) = t_offsets;
for (i=0;i<global_len;i++) t_offsets[i] = 0;
for (i=0;i<length;i++) t_offsets[i] = array[i];
/* find my place in the tree */
PSNbrTree(PS_PARENT,parent,procinfo->procset);
PSNbrTree(PS_LCHILD,l_child,procinfo->procset);
PSNbrTree(PS_RCHILD,r_child,procinfo->procset);
/* collect messages from children */
num_children = 0;
if (l_child >= 0) num_children++;
if (r_child >= 0) num_children++;
l_buffer = NULL;
r_buffer = NULL;
num_msg = 0;
while (num_msg < num_children) {
MY_MALLOCN(recv_buf,(int *),sizeof(int)*global_len,2);
RECVSYNCNOMEM(OFFSETU_MSG,recv_buf,global_len,MPI_INT,procinfo,
mpi_status);
CHKERRN(0);
num_msg++;
for (i=0;i<global_len;i++) t_offsets[i] += recv_buf[i];
if (mpi_status.MPI_SOURCE == l_child) l_buffer = recv_buf;
if (mpi_status.MPI_SOURCE == r_child) r_buffer = recv_buf;
}
/* send message to parent, if not root */
if (!(PSISROOT(procinfo))) {
MY_SEND_SYNC(msg_list,OFFSETU_MSG,t_offsets,global_len,parent,MPI_INT,procinfo);
}
/* receive from parent my base numbering */
MY_MALLOCN(recv_buf,(int *),sizeof(int)*global_len,3);
if (PSISROOT(procinfo)) {
count = 0;
for (i=0;i<global_len;i++) {
recv_buf[i] = count;
count += t_offsets[i];
}
} else {
RECVSYNCNOMEM(OFFSETD_MSG,recv_buf,global_len,MPI_INT,procinfo,
mpi_status);
CHKERRN(0);
}
/* tell left child its offsets */
if (l_buffer != NULL) {
MY_SEND_SYNC(msg_list,OFFSETD_MSG,recv_buf,global_len,l_child,MPI_INT,procinfo);
}
/* get my offsets */
if (l_buffer != NULL) {
for (i=0;i<global_len;i++) t_offsets[i] = l_buffer[i]+recv_buf[i];
} else {
for (i=0;i<global_len;i++) t_offsets[i] = recv_buf[i];
}
/* tell right child its offsets */
if (r_buffer != NULL) {
for (i=0;i<global_len;i++) r_buffer[i] = t_offsets[i];
for (i=0;i<length;i++) r_buffer[i] += array[i];
MY_SEND_SYNC(msg_list,OFFSETD_MSG,r_buffer,global_len,r_child,MPI_INT,procinfo);
}
MY_FREE(recv_buf);
if (r_buffer != NULL) MY_FREE(r_buffer);
if (l_buffer != NULL) MY_FREE(l_buffer);
FINISH_SEND_LIST(msg_list);
return(global_len);
}
syntax highlighted by Code2HTML, v. 0.9.1