#include	"BSprivate.h"

/* ******************************************************* */
/* routines for manipulating the bulletin board structures */
/* ******************************************************* */

/*+ BSinit_bb - initialize a bulletin board

    Input Parameters:
.   length - the length of the data in the BB
.   map - pointer to the mapping to use

    Returns:
    the new BB

+*/
BSbb	*BSinit_bb(int length, BSmapping *map)
{
	BSbb	*bb;

	MY_MALLOCN(bb,(BSbb *),sizeof(BSbb),1);
	MY_MALLOCN(bb->info,(int *),sizeof(int)*length,2);
	bb->length = length;
	bb->map = map;
	return(bb);
}

/*+ BSfree_bb - free a bulletin board

    Input Parameters:
.   bb - the BB to free

    Returns:
    void

+*/
void	BSfree_bb(BSbb *bb)
{
	MY_FREE(bb->info);
	MY_FREE(bb);
}

/*+ BSpost_bb - post information with addresses into the BB

    Input Parameters:
.   bb - the BB
.   length - the length of the data to be posted
.   address - the address into the BB to post to
.   info - the array of information to post

    Returns:
    void

+*/
void	BSpost_bb(BSbb *bb, int length, int *address, int *info)
{
	int	i;

	for (i=0;i<length;i++) {
		bb->info[address[i]] = info[i];
	}
}

/*+ BSpost_noaddr_bb - post information w/o addresses into the BB

    Input Parameters:
.   bb - the BB
.   length - the length of the data to be posted
.   info - the array of information to post

    Returns:
    void

+*/
void	BSpost_noaddr_bb(BSbb *bb, int length, int *info)
{
	int	i;

	for (i=0;i<length;i++) {
		bb->info[i] = info[i];
	}
}

/*+ BSquery_match_bb - Mutual BB query

    Input Parameters:
.   bb - the BB
.   length - the length of the query
.   address - the array of global addresses to query
.   procinfo - the usual processor info

    Output Parameters:
.   info - on exit, this array contains the query answers

    Returns:
    void

    Notes: 
    The queries are symmetric in nature.  If one processor queries
    another, it assumed that that other processor will also be
    querying the first processor.

+*/
void	BSquery_match_bb(BSbb *bb, int length, int *address, 
			int *info, BSprocinfo *procinfo)
{
	int	i, j;
	BSpermutation *perm;
	int	*proc_id;
	int	*p_address;
	int	*p_info;
	int	from, size;
	int	cur_len, cur_addr, cur_proc;
	int	num_msg_sent;
	int	*in_msg;
	int	*out_msg;
	BSmsg_list	*msg_list = NULL;
	MPI_Status	mpi_status;
	void *msg_buf;

	GSYNC(procinfo->procset);

	if (length == 0) return;
	/* build query list */
	/* these NEED to be sorted by processor, such that each processor */
	/* gets only ONE list */
	MY_MALLOC(proc_id,(int *),sizeof(int)*length,1);
	/* get processor numbers */
	(*bb->map->fglobal2proc)(length,address,proc_id,
		procinfo,bb->map); CHKERR(0);

	/* allocate a permutation vector and copies of other vectors */
	perm = BSalloc_permutation(length); CHKERR(0);
	MY_MALLOC(p_address,(int *),sizeof(int)*length,2);
	MY_MALLOC(p_info,(int *),sizeof(int)*length,3);
	
	/* initialize the perm. to 1 thru length and copy vectors */
	for (i=0;i<length;i++) {
		perm->perm[i] = i;
		p_address[i] = address[i];
	}

	/* sort vectors by address */
	BSheap_sort2(length,proc_id,p_address,perm->perm); CHKERR(0);

	/* send query lists */
	cur_addr = 0;
	cur_proc = proc_id[0];
	cur_len = 0;
	num_msg_sent = 0;
	for (i=0;i<length;i++) {
		if (cur_proc != proc_id[i]) {
			MY_SEND_SYNC(msg_list,BB_SQ_MSG,&(p_address[cur_addr]),
				cur_len,cur_proc,MPI_INT,procinfo);
			/* after this we don't need the information in p_address */
			/* or in proc_id.  we will use this space to store */
			/* information on where to receive the messages */
			/* proc_id will store the processor number */
			/* p_address will store the address of the message */
			p_address[num_msg_sent] = cur_addr;
			proc_id[num_msg_sent] = cur_proc;
			/* end of reuse area */
			num_msg_sent++;
			cur_len = 0;
			cur_proc = proc_id[i];
			cur_addr = i;
		}
		cur_len++;
	}
	MY_SEND_SYNC(msg_list,BB_SQ_MSG,&(p_address[cur_addr]),
		cur_len,cur_proc,MPI_INT,procinfo);
	p_address[num_msg_sent] = cur_addr;
	proc_id[num_msg_sent] = cur_proc;
	num_msg_sent++;

	/* receive queries and send answers */
	for (i=0;i<num_msg_sent;i++) {
		RECVSYNCUNSZ(BB_SQ_MSG,msg_buf,size,MPI_INT,procinfo,mpi_status); 
		CHKERR(0);
		in_msg = (int *)msg_buf;
		from = mpi_status.MPI_SOURCE;
		CHECK_SEND_LIST(msg_list);
		cur_proc = -1;
		for (j=0;j<num_msg_sent;j++) {
			if (proc_id[j] == from) {
				cur_proc = proc_id[j];
				break;
			}
		}
		if (cur_proc == -1) {
			MY_SETERRC(BB_ERROR,"Received message from unexpected source");
		}
		/* translate into local addrs */
		MY_MALLOC(out_msg,(int *),sizeof(int)*size,4+i);
		(*bb->map->fglobal2local)(size,in_msg,out_msg,procinfo,bb->map);
		CHKERR(0);
		/* free up message */
		MSGFREERECV(msg_buf);CHKERR(0);
		/* get requested info at local addresses */
		for (j=0;j<size;j++) {
			if (out_msg[j] < 0) {
				MY_SETERRC(BB_ERROR,"Processor does not own information");
			}
			out_msg[j] = bb->info[out_msg[j]];
		}
		/* send back info */
		MY_SEND_SYNC(msg_list,BB_SA_MSG,out_msg,size,from,MPI_INT,procinfo);
		/* free up message */
		MY_FREE(out_msg);
	}

	/* receive answers */
	for (i=0;i<num_msg_sent;i++) {
		RECVSYNCUNSZ(BB_SA_MSG,msg_buf,size,MPI_INT,procinfo,mpi_status); 
		CHKERR(0);
		in_msg = (int *)msg_buf;
		from = mpi_status.MPI_SOURCE;
		CHECK_SEND_LIST(msg_list);
		/* find place to receive in */
		cur_addr = -1;
		for (j=0;j<num_msg_sent;j++) {
			if (from == proc_id[j]) {
				cur_addr = p_address[j];
				break;
			}
		}
		if (cur_addr == -1) {
			MY_SETERRC(BB_ERROR,"Not expecting message from processor");
		}
		for (j=0;j<size;j++) {
			p_info[cur_addr+j] = in_msg[j];
		}
		MSGFREERECV(msg_buf);CHKERR(0);
	}

	/* now sort the information according to the permutation */
	BSperm_ivec(p_info,info,perm); CHKERR(0);

	MY_FREE(p_info);
	MY_FREE(p_address);
	MY_FREE(proc_id);
	BSfree_permutation(perm); CHKERR(0);
	FINISH_SEND_LIST(msg_list);
}

/*+ BSquery_nomatch_bb - BB query

    Input Parameters:
.   bb - the BB
.   length - the length of the query
.   address - the array of global addresses to query
.   procinfo - the usual processor info

    Output Parameters:
.   info - on exit, this array contains the query answers

    Returns:
    void

    Notes: 
    Unlike BSquery_match_bb, no assumptions are made about other
    processors queries (other than that everyone will call this routine).
    This routine is not currently used in the package and hence is
    not well-tested (use at your own risk).

+*/
void	BSquery_nomatch_bb(BSbb *bb, int length, int *address,
			int *info, BSprocinfo *procinfo)
{
	int	i, j;
	BSpermutation	*perm;
	int	*proc_id;
	int	*p_address;
	int	*p_info;
	int	from, size;
	int	cur_len, cur_addr, cur_proc;
	int	num_msg_sent;
	int	*in_msg;
	int	*out_msg;
	int parent, l_child, r_child, num_children;
	int	dummy;
	int	num_real_msg, num_term_msg, sent_parent_term, done, msg_avail;
	int	type;
	BSmsg_list	*msg_list = NULL;
	MPI_Status	mpi_status;
	void *msg_buf;

	GSYNC(procinfo->procset);

	/* build query list */
	/* these NEED to be sorted by processor, such that each processor */
	/* gets only ONE list */
	if (length > 0) {
		MY_MALLOC(proc_id,(int *),sizeof(int)*length,1);
		/* get processor numbers */
		(*bb->map->fglobal2proc)(length,address,proc_id,
			procinfo,bb->map); CHKERR(0);

		/* allocate a permutation vector and copies of other vectors */
		perm = BSalloc_permutation(length); CHKERR(0);
		MY_MALLOC(p_address,(int *),sizeof(int)*length,2);
		MY_MALLOC(p_info,(int *),sizeof(int)*length,3);
	
		/* initialize the perm. to 1 thru length and copy vectors */
		for (i=0;i<length;i++) {
			perm->perm[i] = i;
			p_address[i] = address[i];
		}

		/* sort vectors by address */
		BSheap_sort2(length,proc_id,p_address,perm->perm); CHKERR(0);

		/* send query lists */
		cur_addr = 0;
		cur_proc = proc_id[0];
		cur_len = 0;
		num_msg_sent = 0;
		for (i=0;i<length;i++) {
			if (cur_proc != proc_id[i]) {
				MY_SEND_SYNC(msg_list,BB_SQ_MSG,&(p_address[cur_addr]),
					cur_len,cur_proc,MPI_INT,procinfo);
				/* after this we don't need the information in p_address */
				/* or in proc_id.  we will use this space to store */
				/* information on where to receive the messages */
				/* proc_id will store the processor number */
				/* p_address will store the address of the message */
				p_address[num_msg_sent] = cur_addr;
				proc_id[num_msg_sent] = cur_proc;
				/* end of reuse area */
				num_msg_sent++;
				cur_len = 0;
				cur_proc = proc_id[i];
				cur_addr = i;
			}
			cur_len++;
		}
		MY_SEND_SYNC(msg_list,BB_SQ_MSG,&(p_address[cur_addr]),
			cur_len,cur_proc,MPI_INT,procinfo);
		p_address[num_msg_sent] = cur_addr;
		proc_id[num_msg_sent] = cur_proc;
		num_msg_sent++;
	} else {
		num_msg_sent = 0;
	}

	/* receive queries and send answers */
	/* do this until the terminating message has been received from parent */
	/* find my place in the termination tree */
	PSNbrTree(PS_PARENT,parent,procinfo->procset);
	PSNbrTree(PS_LCHILD,l_child,procinfo->procset);
	PSNbrTree(PS_RCHILD,r_child,procinfo->procset);
	num_children = 0;
    if (l_child >= 0) num_children++;
    if (r_child >= 0) num_children++;
	num_term_msg = 0;
	num_real_msg = 0;
	sent_parent_term = FALSE;
	done = FALSE;
	while (! done ) {
		/* if we have all our answers and we haven't done so, tell everyone */
		if ((num_real_msg == num_msg_sent) && (num_children == num_term_msg)
			&& (sent_parent_term == FALSE)) {
			if (PSISROOT(procinfo)) {
				/* tell children */
				if (l_child >= 0) {
					MY_SEND_SYNC(msg_list,BB_SQ_MSG,&dummy,0,l_child,MPI_INT,
						procinfo);
				}
				if (r_child >= 0) {
					MY_SEND_SYNC(msg_list,BB_SQ_MSG,&dummy,0,r_child,MPI_INT,
						procinfo);
				}
				done = TRUE;
				break;
			} else {
				/* tell parent */
				MY_SEND_SYNC(msg_list,BB_SQ_MSG,&dummy,0,parent,MPI_INT,
						procinfo);
			}
			sent_parent_term = TRUE;
		}

		/* get either a request or an answer */
		msg_avail = FALSE;
		while (TRUE) {
			type = BB_SQ_MSG;
			MPI_Iprobe(MPI_ANY_SOURCE,type,procinfo->procset,
				&msg_avail,&mpi_status);
			if (msg_avail) break;
			type = BB_SA_MSG;
			MPI_Iprobe(MPI_ANY_SOURCE,type,procinfo->procset,
				&msg_avail,&mpi_status);
			if (msg_avail) break;
			CHECK_SEND_LIST(msg_list);
		}
		if (type == BB_SQ_MSG) {
			/* receive a request */
			RECVSYNCUNSZ(BB_SQ_MSG,msg_buf,size,MPI_INT,procinfo,mpi_status); 
			CHKERR(0);
			in_msg = (int *)msg_buf;
			from = mpi_status.MPI_SOURCE;
			CHECK_SEND_LIST(msg_list);
			/* if empty message, then it is a termination message */
			if (size == 0) {
				if (from == l_child) {
					num_term_msg++;
				} else if (from == r_child) {
					num_term_msg++;
				} else {
					/* tell children */
					if (l_child >= 0) {
						MY_SEND_SYNC(msg_list,BB_SQ_MSG,&dummy,0,l_child,
							MPI_INT,procinfo);
					}
					if (r_child >= 0) {
						MY_SEND_SYNC(msg_list,BB_SQ_MSG,&dummy,0,r_child,
							MPI_INT,procinfo);
					}
					done = TRUE;
				}
				MSGFREERECV(msg_buf);CHKERR(0);
			} else {
				/* translate into local addrs */
				MY_MALLOC(out_msg,(int *),sizeof(int)*size,4+i);
				(*bb->map->fglobal2local)(size,in_msg,out_msg,procinfo,bb->map);
				CHKERR(0);
				/* free up message */
				MSGFREERECV(msg_buf);CHKERR(0);
				/* get requested info at local addresses */
				for (j=0;j<size;j++) {
					if (out_msg[j] < 0) {
						MY_SETERRC(BB_ERROR,
							"Processor does not own information");
					}
					out_msg[j] = bb->info[out_msg[j]];
				}
				/* send back info */
				MY_SEND_SYNC(msg_list,BB_SA_MSG,out_msg,size,from,
					MPI_INT,procinfo);
				/* free up message */
				MY_FREE(out_msg);
			}
		} else {
			/* receive an answer */
			RECVSYNCUNSZ(BB_SA_MSG,msg_buf,size,MPI_INT,procinfo,mpi_status); 
			CHKERR(0);
			in_msg = (int *)msg_buf;
			from = mpi_status.MPI_SOURCE;
			CHECK_SEND_LIST(msg_list);
			/* find place to receive in */
			for (j=0;j<num_msg_sent;j++) {
				if (from == proc_id[j]) {
					cur_addr = p_address[j];
					break;
				}
			}
			for (j=0;j<size;j++) p_info[cur_addr+j] = in_msg[j];
			MSGFREERECV(msg_buf);CHKERR(0);
			num_real_msg++;
		}
	}

	if (length > 0) {
		/* now sort the information according to the permutation */
		BSperm_ivec(p_info,info,perm); CHKERR(0);
	
		/* free up work vectors */
		MY_FREE(p_info);
		MY_FREE(p_address);
		MY_FREE(proc_id);
		BSfree_permutation(perm); CHKERR(0);
	}
	FINISH_SEND_LIST(msg_list);
	GSYNC(procinfo->procset);
	GSYNC(procinfo->procset);
}


syntax highlighted by Code2HTML, v. 0.9.1